3. Código fuente para los ejemplos de optimización de control difuso con PSO#
En este anexo se incluye el código fuente utilizado para optimizar un controlador difuso (FIS) mediante Particle Swarm Optimization (PSO). El flujo de trabajo general es:
tunable_fuzzy.pyconstruye un FIS parametrizable y genera un controlador callable.evaluate_controller.pyevalúa el controlador como una caja negra y devuelve un valor escalar (fitness), típicamente el RMSE.pso.pyejecuta PSO (con DEAP) para minimizar el RMSE del controlador parametrizable.best_controller.jsonguarda el mejor vector de parámetros encontrado, también las ruta utilizadas para el entrenamiento.show_controller.pycarga el mejor controlador y permite visualizar una ruta específica (animación, gráficas y funciones de membresía).
3.1. Estructura de archivos#
El directorio del ejemplo contiene los siguientes archivos:
fuzzy_code/
├── tunable_fuzzy.py
├── evaluate_controller.py
├── pso.py
├── show_controller.py
└── best_controller.json
Nota
Este anexo documenta los archivos específicos de la optimización con PSO.
La simulación del robot tipo bicicleta y la generación de rutas (por ejemplo
rear_wheel_sim.py y path.py) se describen en el anexo anterior de la
simulación y se reutilizan sin cambios.
3.2. tunable_fuzzy.py#
Este archivo define el FIS ajustable. Su responsabilidad es:
Construir el sistema de inferencia difusa a partir de parámetros (por ejemplo, un vector de
float).Proveer una función
get_controller(params)que regresa un controlador callable con interfaz:
tunable_fuzzy.py)# 1
2import numpy as np
3import skfuzzy as fuzz
4from skfuzzy import control as ctrl
5
6def clamp(x, lo, hi):
7 return max(lo, min(hi, x))
8
9def decode_params(x):
10 """
11 Convierte un vector (lista de floats) en un diccionario de parámetros.
12
13 x : list[float]
14 Vector que produce PSO. En este ejemplo se asume longitud 6.
15 """
16 assert len(x) == 6
17
18 # Interpretación: parámetros simétricos por variable en universo [-1, 1]
19 e_z, e_a, eth_z, eth_a, w_z, w_a = x
20
21 # Límites básicos para mantener MFs válidas y estables
22 # - z controla el ancho de la zona "cero" (debe ser pequeño)
23 # - a controla dónde empieza la saturación de NS/PS (debe ser mayor que z)
24 e_z = clamp(e_z, 0.05, 0.60)
25 e_a = clamp(e_a, 0.10, 1.00)
26 eth_z = clamp(eth_z, 0.05, 0.60)
27 eth_a = clamp(eth_a, 0.10, 1.00)
28 w_z = clamp(w_z, 0.05, 0.80)
29 w_a = clamp(w_a, 0.10, 1.00)
30
31 # Asegurar orden: a >= z + margen
32 margin = 0.05
33 e_a = max(e_a, e_z + margin)
34 eth_a = max(eth_a, eth_z + margin)
35 w_a = max(w_a, w_z + margin)
36
37 params = {
38 "e_z": e_z, "e_a": e_a,
39 "eth_z": eth_z, "eth_a": eth_a,
40 "w_z": w_z, "w_a": w_a,
41 }
42
43 return params
44
45def build_fis(params=None):
46 """
47 Construye el sistema de inferencia difusa (FIS).
48
49 params: dict | None
50 Parámetros opcionales para modificar las funciones de membresía.
51 En esta versión base, se ignoran o se usan valores por defecto.
52 """
53 p = decode_params(params)
54 eth_z = p["eth_z"]
55 eth_a = p["eth_a"]
56 e_z = p["e_z"]
57 e_a = p["e_a"]
58 w_z = p["w_z"]
59 w_a = p["w_a"]
60
61 # Universos (normalizados)
62 e_th = ctrl.Antecedent(np.linspace(-1.0, 1.0, 201), "e_th") # adimensional
63 e = ctrl.Antecedent(np.linspace(-1.0, 1.0, 201), "e") # adimensional
64 omega = ctrl.Consequent(np.linspace(-1.0, 1.0, 201), "omega") # adimensional
65
66 e_th["NS"] = fuzz.trapmf(e_th.universe, [-1.0, -1.0, -eth_a, 0.0])
67 e_th["Z"] = fuzz.trimf(e_th.universe, [-eth_z, 0.0, eth_z])
68 e_th["PS"] = fuzz.trapmf(e_th.universe, [ 0.0, eth_a, 1.0, 1.0])
69
70 e["NS"] = fuzz.trapmf(e.universe, [-1.0, -1.0, -e_a, 0.0])
71 e["Z"] = fuzz.trimf(e.universe, [-e_z, 0.0, e_z])
72 e["PS"] = fuzz.trapmf(e.universe, [ 0.0, e_a, 1.0, 1.0])
73
74 omega["NS"] = fuzz.trapmf(omega.universe, [-1.0, -1.0, -w_a, 0.0])
75 omega["Z"] = fuzz.trimf(omega.universe, [-w_z, 0.0, w_z])
76 omega["PS"] = fuzz.trapmf(omega.universe, [ 0.0, w_a, 1.0, 1.0])
77
78
79 # Reglas explícitas (3x3)
80 rules = [
81 ctrl.Rule(e_th['NS'] & e['NS'], omega['PS']),
82 ctrl.Rule(e_th['NS'] & e['Z'], omega['PS']),
83 ctrl.Rule(e_th['NS'] & e['PS'], omega['Z']),
84
85 ctrl.Rule(e_th['Z'] & e['NS'], omega['PS']),
86 ctrl.Rule(e_th['Z'] & e['Z'], omega['Z']),
87 ctrl.Rule(e_th['Z'] & e['PS'], omega['NS']),
88
89 ctrl.Rule(e_th['PS'] & e['NS'], omega['Z']),
90 ctrl.Rule(e_th['PS'] & e['Z'], omega['NS']),
91 ctrl.Rule(e_th['PS'] & e['PS'], omega['NS']),
92 ]
93
94 fis = ctrl.ControlSystem(rules)
95 return fis
96
97
98def get_controller(params=None):
99 fis = build_fis(params)
100 sim = ctrl.ControlSystemSimulation(fis)
101
102 # Escalas físicas (baseline)
103 ETH_MAX = 1.5 # rad
104 E_MAX = 3.0 # m
105 OMEGA_MAX = 8.0 # rad/s
106
107 def controller(e_th, e):
108 # 1) Normalizar entradas
109 e_th_n = clamp(float(e_th) / ETH_MAX,-1.0,1.0)
110 e_n = clamp(float(e) / E_MAX,-1.0,1.0)
111
112 # 2) Evaluar FIS
113 sim.reset()
114 sim.input["e_th"] = e_th_n
115 sim.input["e"] = e_n
116 sim.compute()
117
118 # 3) Desnormalizar salida
119 omega_n = float(sim.output["omega"])
120 omega = OMEGA_MAX * omega_n
121 return omega
122
123 return controller
124
125
126def plot_mfs(params=None):
127 """
128 Visualiza las funciones de membresía (modo exploración).
129 """
130 fis = build_fis(params)
131 for variable in fis.fuzzy_variables:
132 variable.view()
133
134
135if __name__ == "__main__":
136 # Ejecuta: python my_fis.py
137 # para inspeccionar las funciones de membresía.
138 plot_mfs()
139
3.3. evaluate_controller.py#
Este script implementa la función objetivo (fitness). En la práctica:
Recibe un vector de parámetros (o
Nonepara usar el controlador base).Construye el controlador difuso mediante
tunable_fuzzy.get_controller.Ejecuta la simulación sobre un conjunto fijo de rutas.
Calcula el desempeño (por ejemplo, RMSE del error lateral).
Penaliza escenarios donde el robot no llega a la meta o pierde la referencia.
evaluate_controller.py)# 1import tunable_fuzzy as fc
2import rear_wheel_sim as rw_sim
3import path
4import numpy as np
5
6def compute_rmse(traces):
7 errors = np.array([tr.error for tr in traces])
8 return np.sqrt(np.mean(errors**2))
9
10def evaluate_controller(params):
11
12 paths = [
13 ([0.0, 6.0, 12.5, 5.0, 7.5, 3.0, -1.0], [0.0, 0.0, 5.0, 6.5, 3.0, 5.0, -2.0]),
14 ([0.0, 1.0, 2.5, 5.0, 7.5, 3.0, -1.0], [0.0, -4.0, 6.0, 6.5, 3.0, 5.0, -2.0]),
15 ([0.0, 2.0, 2.5, 5.0, 7.5, -3.0, -1.0], [0.0, 3.0, 6.0, 6.5, 5.0, 5.0, -2.0]),
16 ]
17
18 controller = None
19
20 if params:
21 controller = fc.get_controller(params)
22
23 rmses = []
24 for ax, ay in paths:
25 goal = [ax[-1], ay[-1]]
26 reference_path = path.CubicSplinePath(ax, ay)
27
28 result = rw_sim.simulacion(
29 reference_path,
30 goal,
31 controller=controller,
32 )
33 if not result['goal_flag']:
34 return float(10.0), # Return Tuple for DEAP
35 if result['error_flag']:
36 return float(10.0), # Return Tuple for DEAP
37
38 traces = result["traces"]
39 rmse = compute_rmse(traces)
40 rmses.append(rmse)
41 #print(result['goal_flag'], result['error_flag'], rmse)
42 # Visualización (opcional): se ejecuta después de la simulación
43 #rw_sim.animate(result, pause=0.001)
44 #rw_sim.plot(result)
45 #fc.plot_mfs(params=params)
46 #print(np.mean(rmses))
47 return float(np.mean(rmses)), # Return Tuple for DEAP
48
3.4. pso.py#
Implementación del algoritmo PSO utilizando DEAP. Su función es:
Definir la representación de la partícula (posición/velocidad/mejor histórico).
Definir el espacio de búsqueda (rangos mínimos y máximos por parámetro).
Ejecutar el ciclo de optimización llamando a
evaluate_controller.py.Imprime el mejor vector de parámetros.
pso.py)# 1import operator
2import random
3
4import numpy
5import math
6
7from deap import base
8from deap import creator
9from deap import tools
10
11from evaluate_controller import evaluate_controller
12creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
13creator.create("Particle", list, fitness=creator.FitnessMin, speed=list,
14 smin=None, smax=None, best=None)
15
16def generate(size, pmin, pmax, smin, smax):
17 part = creator.Particle(random.uniform(pmin, pmax) for _ in range(size))
18 part.speed = [random.uniform(smin, smax) for _ in range(size)]
19 part.smin = smin
20 part.smax = smax
21 return part
22
23def updateParticle(part, best, phi1, phi2):
24 u1 = (random.uniform(0, phi1) for _ in range(len(part)))
25 u2 = (random.uniform(0, phi2) for _ in range(len(part)))
26 v_u1 = map(operator.mul, u1, map(operator.sub, part.best, part))
27 v_u2 = map(operator.mul, u2, map(operator.sub, best, part))
28 part.speed = list(map(operator.add, part.speed, map(operator.add, v_u1, v_u2)))
29 for i, speed in enumerate(part.speed):
30 if abs(speed) < part.smin:
31 part.speed[i] = math.copysign(part.smin, speed)
32 elif abs(speed) > part.smax:
33 part.speed[i] = math.copysign(part.smax, speed)
34 part[:] = list(map(operator.add, part, part.speed))
35
36toolbox = base.Toolbox()
37toolbox.register("particle", generate, size=6, pmin=-0.05, pmax=1.5, smin=-0.1, smax=0.1)
38toolbox.register("population", tools.initRepeat, list, toolbox.particle)
39toolbox.register("update", updateParticle, phi1=2.0, phi2=2.0)toolbox.register("evaluate", evaluate_controller)
40
41def main():
42 pop = toolbox.population(n=50)
43 stats = tools.Statistics(lambda ind: ind.fitness.values)
44 stats.register("avg", numpy.mean)
45 stats.register("std", numpy.std)
46 stats.register("min", numpy.min)
47 stats.register("max", numpy.max)
48
49 logbook = tools.Logbook()
50 logbook.header = ["gen", "evals"] + stats.fields
51
52 GEN = 20
53 best = None
54
55 for g in range(GEN):
56 for part in pop:
57 part.fitness.values = toolbox.evaluate(part)
58 if not part.best or part.best.fitness < part.fitness:
59 part.best = creator.Particle(part)
60 part.best.fitness.values = part.fitness.values
61 if not best or best.fitness < part.fitness:
62 best = creator.Particle(part)
63 best.fitness.values = part.fitness.values
64
65 for part in pop:
66 toolbox.update(part, best)
67 # Gather all the fitnesses in one list and print the stats
68 logbook.record(gen=g, evals=len(pop), **stats.compile(pop))
69 print(logbook.stream)
70 print(best.fitness, best.fitness.values, part.best.fitness, part.fitness.values)
71 return pop, logbook, best
72
73if __name__ == "__main__":
74 main()
3.5. best_controller.json#
Archivo de configuración que guarda el mejor controlador encontrado. Contiene principalmente:
params: vector de parámetros (lista defloat) que define el FIS.(Opcional) rutas utilizadas, métricas y metadatos para reproducibilidad.
best_controller.json)#1{
2 "params": [0.02906318670445844, 0.5689901881492568, 0.959185644460144, 0.11181303651379565, 0.09595559801638323, 0.3875820435302194],
3 "paths": [
4 {"ax": [0.0, 6.0, 12.5, 5.0, 7.5, 3.0, -1.0], "ay": [0.0, 0.0, 5.0, 6.5, 3.0, 5.0, -2.0]},
5 {"ax": [0.0, 1.0, 2.5, 5.0, 7.5, 3.0, -1.0], "ay": [0.0, -4.0, 6.0, 6.5, 3.0, 5.0, -2.0]},
6 {"ax": [0.0, 2.0, 2.5, 5.0, 7.5, -3.0, -1.0], "ay": [0.0, 3.0, 6.0, 6.5, 5.0, 5.0, -2.0]}
7 ],
8 "notes": "Best found by PSO"
9}
3.6. show_controller.py#
Script de visualización y diagnóstico. Este archivo:
Carga
best_controller.json.Construye el controlador difuso con esos parámetros.
Ejecuta la simulación sobre una ruta específica (por índice).
Muestra animación y gráficas del seguimiento.
Despliega las funciones de membresía resultantes.
show_controller.py)# 1import json
2import tunable_fuzzy as fc
3from evaluate_controller import compute_rmse
4import rear_wheel_sim as rw_sim
5import path
6import argparse
7
8def load_config(path_cfg):
9 with open(path_cfg, "r") as f:
10 cfg = json.load(f)
11 params = cfg["params"]
12 paths = [(p["ax"], p["ay"]) for p in cfg["paths"]]
13 return params, paths, cfg
14
15def show_controller(params, paths, path_id):
16
17 controller = fc.get_controller(params)
18
19 ax, ay = paths[path_id]
20 goal = [ax[-1], ay[-1]]
21 reference_path = path.CubicSplinePath(ax, ay)
22 result = rw_sim.simulacion(
23 reference_path,
24 goal,
25 controller=controller,
26 )
27 if not result['goal_flag']:
28 print("⚠️ No se alcanzó la meta")
29 if result['error_flag']:
30 print("⚠️ Error en la simulación")
31
32 traces = result["traces"]
33 rmse = compute_rmse(traces)
34 print('rmse: {rmse}')
35
36 rw_sim.animate(result, pause=0.001)
37 rw_sim.plot(result)
38 fc.plot_mfs(params=params)
39
40if __name__ == "__main__":
41 parser = argparse.ArgumentParser(
42 description="Visualiza el controlador difuso para una ruta específica"
43 )
44 parser.add_argument(
45 "path_id",
46 type=int,
47 nargs="?",
48 default=0,
49 help="Índice de la ruta a mostrar (default: 0)",
50 )
51 parser.add_argument(
52 "--config",
53 type=str,
54 default="best_controller.json",
55 help="Archivo de configuración del controlador",
56 )
57
58 args = parser.parse_args()
59
60 params, paths, cfg = load_config(args.config)
61
62 if args.path_id < 0 or args.path_id >= len(paths):
63 raise ValueError(f"path_id debe estar entre 0 y {len(paths)-1}")
64
65 show_controller(params, paths, path_id=args.path_id)
66
3.7. Ejemplo de uso#
Para mostrar el mejor controlador sobre una ruta específica (por ejemplo, la ruta con índice 2) y la ubicación del archivo con la configuración del FIS:
python show_controller.py 2 --configure ./best_controller.json
Para ejecutar la optimización con PSO (puede tomar tiempo dependiendo del número de partículas y generaciones):
python pso.py
4. Código fuente para los ejemplos de Escalado Computacional#
El siguiente archivo muestra la versión del algoritmo PSO donde la evaluación de la aptitud de cada partícula se realiza en paralelo utilizando tareas remotas de Ray. El resto del algoritmo se ejecuta de manera secuencial en un solo proceso coordinador.
pso_ray.py)# 1import operator
2import random
3
4import numpy
5import math
6
7from deap import base
8from deap import creator
9from deap import tools
10
11import ray
12from evaluate_controller import evaluate_controller
13
14@ray.remote
15def remote_ev_controller(particle):
16 return evaluate_controller(particle)
17
18
19creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
20creator.create("Particle", list, fitness=creator.FitnessMin, speed=list,
21 smin=None, smax=None, best=None)
22
23
24
25def generate(size, pmin, pmax, smin, smax):
26 part = creator.Particle(random.uniform(pmin, pmax) for _ in range(size))
27 part.speed = [random.uniform(smin, smax) for _ in range(size)]
28 part.smin = smin
29 part.smax = smax
30 return part
31
32def updateParticle(part, best, phi1, phi2):
33 u1 = (random.uniform(0, phi1) for _ in range(len(part)))
34 u2 = (random.uniform(0, phi2) for _ in range(len(part)))
35 v_u1 = map(operator.mul, u1, map(operator.sub, part.best, part))
36 v_u2 = map(operator.mul, u2, map(operator.sub, best, part))
37 part.speed = list(map(operator.add, part.speed, map(operator.add, v_u1, v_u2)))
38 for i, speed in enumerate(part.speed):
39 if abs(speed) < part.smin:
40 part.speed[i] = math.copysign(part.smin, speed)
41 elif abs(speed) > part.smax:
42 part.speed[i] = math.copysign(part.smax, speed)
43 part[:] = list(map(operator.add, part, part.speed))
44
45toolbox = base.Toolbox()
46toolbox.register("particle", generate, size=6, pmin=-0.05, pmax=1.5, smin=-0.1, smax=0.1)
47toolbox.register("population", tools.initRepeat, list, toolbox.particle)
48toolbox.register("update", updateParticle, phi1=2.0, phi2=2.0)
49#toolbox.register("evaluate", ev_controller)
50
51
52def evaluate_population_ray(pop):
53 futures_fitness_values = [remote_ev_controller.remote(list(particle)) for particle in pop]
54 results = ray.get(futures_fitness_values)
55 assert len(pop) == len(results)
56
57 for i, part in enumerate(pop):
58 part.fitness.values = results[i]
59 return pop
60
61def main():
62 pop = toolbox.population(n=50)
63 stats = tools.Statistics(lambda ind: ind.fitness.values)
64 stats.register("avg", numpy.mean)
65 stats.register("std", numpy.std)
66 stats.register("min", numpy.min)
67 stats.register("max", numpy.max)
68
69 logbook = tools.Logbook()
70 logbook.header = ["gen", "evals"] + stats.fields
71
72 GEN = 20
73 best = None
74
75
76 for g in range(GEN):
77 evaluate_population_ray(pop)
78 for part in pop:
79 # No calculamos el fitnes utilizando el toolbox
80 # lo hacemos con la función evaluate_population_ray(pop)
81
82 # part.fitness.values = toolbox.evaluate(part)
83
84 # Se compara el fitness no el valor del RMSE
85 # en este caso los controladores con menor RMSE
86 # tienen un mayor fitness. DEAP internamente
87 # hace la multiplicación por -1 ya que estamos minimizando.
88 if not part.best or part.best.fitness < part.fitness:
89 part.best = creator.Particle(part)
90 part.best.fitness.values = part.fitness.values
91 if not best or best.fitness < part.fitness:
92 best = creator.Particle(part)
93 best.fitness.values = part.fitness.values
94
95 for part in pop:
96 toolbox.update(part, best)
97 # Gather all the fitnesses in one list and print the stats
98 logbook.record(gen=g, evals=len(pop), **stats.compile(pop))
99 print(logbook.stream)
100
101 print(best.fitness.values, best)
102 return pop, logbook, best
103
104if __name__ == "__main__":
105 ray.init()
106 main()
107 ray.shutdown()
108
109
110
111
112
A continuación se muestra el código completo del ejemplo de PSO
multi-enjambre, donde cada población se implementa como un actor con estado
(SwarmAgent). En este caso, Ray se utiliza tanto para ejecutar evaluaciones
stateless como para mantener poblaciones persistentes que evolucionan y
migran soluciones de manera coordinada.
pso_agent.py)# 1import random
2
3import ray
4from dataclasses import dataclass, field
5from typing import List, Tuple, Dict
6from evaluate_controller import evaluate_controller
7
8
9@ray.remote
10def remote_ev_controller(x: List[float]) -> float:
11 fitness = evaluate_controller(x)
12 # Normaliza a float aunque venga como (val,)
13 if isinstance(fitness, tuple):
14 return float(fitness[0])
15 return float(fitness)
16
17def clip(value: float, vmin: float, vmax: float) -> float:
18 return max(vmin, min(vmax, value))
19
20@dataclass
21class Particle:
22 """
23 Partícula para PSO.
24
25 Attributes
26 ----------
27 x : list[float]
28 Posición (solución candidata).
29 v : list[float]
30 Velocidad.
31 best_x : list[float]
32 Mejor posición personal encontrada hasta el momento.
33 best_f : float
34 Mejor valor de fitness (a minimizar) asociado a best_x.
35 """
36 x: List[float] = field(default_factory=list)
37 speed: List[float] = field(default_factory=list)
38 fitness: float = float('inf')
39 best_x: List[float] = field(default_factory=list)
40 best_f: float = float("inf")
41
42 def __repr__(self) -> str:
43 return f"Particle(f={self.fitness:.4f}, x={[round(v, 3) for v in self.x]})"
44
45 def __str__(self) -> str:
46 return (
47 "Particle(\n"
48 f" fitness = {self.fitness:.6f}\n"
49 f" x = {[round(v, 4) for v in self.x]}\n"
50 f" speed = {[round(v, 4) for v in self.speed]}\n"
51 f" best_f = {self.best_f:.6f}\n"
52 f" best_x = {[round(v, 4) for v in self.best_x]}\n"
53 ")"
54 )
55
56@ray.remote
57class SwarmAgent:
58 def __init__(self, config: Dict) -> None:
59 self.config = config
60 self.swarm_size = int(config["swarm_size"])
61 self.size = int(config['dim'])
62
63 self.smin = float(config['smin'])
64 self.smax = float(config['smax'])
65 self.pmax = float(config['pmax'])
66 self.pmin = float(config['pmin'])
67 self.weight = float(config.get("w", 0.7))
68 self.phi1 = float(config['phi1'])
69 self.phi2 = float(config['phi2'])
70 self.pop: List[Particle] = [self.generate() for _ in range(self.swarm_size)]
71 self.gbest = Particle(
72 x=self.pop[0].x.copy(),
73 speed=[0.0] * self.size,
74 fitness=float("inf"),
75 best_x=self.pop[0].x.copy(),
76 best_f=float("inf"),
77 )
78
79 def generate(self) -> Particle:
80 x = [random.uniform(self.pmin, self.pmax) for _ in range(self.size)]
81 speed = [random.uniform(self.smin, self.smax) for _ in range(self.size)]
82 return Particle(x=x, speed=speed, fitness=float("inf"), best_x=x.copy(), best_f=float("inf"))
83
84 def _update_particle(self, p: Particle) -> None:
85 for d in range(self.size):
86 r1 = random.random()
87 r2 = random.random()
88
89 cognitive = self.phi1 * r1 * (p.best_x[d] - p.x[d])
90 social = self.phi2 * r2 * (self.gbest.x[d] - p.x[d])
91
92 p.speed[d] = self.weight * p.speed[d] + cognitive + social
93 p.speed[d] = clip(p.speed[d], self.smin, self.smax)
94
95 p.x[d] = p.x[d] + p.speed[d]
96 p.x[d] = clip(p.x[d], self.pmin, self.pmax)
97
98
99 def evaluate_population_ray(self):
100 futures_fitness_values = [remote_ev_controller.remote(list(particle.x)) for particle in self.pop]
101 results = ray.get(futures_fitness_values)
102 assert len(self.pop) == len(results)
103
104 for p, f in zip(self.pop, results):
105 p.fitness = f
106
107 if f < p.best_f:
108 p.best_f = f
109 p.best_x = p.x.copy()
110
111 if f < self.gbest.fitness:
112 self.gbest.fitness = f
113 self.gbest.x = p.x.copy()
114 self.gbest.best_f = f
115 self.gbest.best_x = p.x.copy()
116
117
118 def migrate(self, candidates: List[Tuple[float, List[float]]]) -> None:
119 """
120 candidates: lista de (fitness, x) (tipos planos).
121 Reemplaza las peores partículas por estos candidatos.
122 """
123 # Asegurar fitness actualizado para identificar peores
124 self.evaluate_population_ray()
125
126 # Peores primero
127 worst_idx = sorted(range(self.swarm_size), key=lambda i: self.pop[i].fitness, reverse=True)
128 k = min(len(candidates), self.swarm_size)
129
130 for j in range(k):
131 f_cand, x_cand = candidates[j]
132 i = worst_idx[j]
133 # Insertar candidato con velocidad nueva (para no heredar dinámica ajena)
134 self.pop[i] = Particle(
135 x=list(x_cand),
136 speed=[random.uniform(self.smin, self.smax) for _ in range(self.size)],
137 fitness=float(f_cand),
138 best_x=list(x_cand),
139 best_f=float(f_cand),
140 )
141
142 if f_cand < self.gbest.fitness:
143 self.gbest.fitness = float(f_cand)
144 self.gbest.x = list(x_cand)
145
146
147 def step(self, n_iters: int) -> Particle:
148 for _ in range(n_iters):
149 self.evaluate_population_ray()
150 for part in self.pop:
151 self._update_particle(part)
152 # Gather all the fitnesses in one list and print the stats
153 # print(logbook.stream)
154 return self.gbest
155
156
157config = {
158 "smin": -0.2,
159 "smax": 0.20,
160 "pmin": 0.0,
161 "pmax": 1.0,
162 "phi1": 2.0,
163 "phi2": 2.0,
164
165 "dim": 6,
166 "swarm_size": 10,
167
168 "ngen": 4,
169 "migrate_interval":2,
170 "migrate_k":2,
171 "num_swarms": 6,
172 "num_rounds": 8,
173}
174
175
176import os
177os.environ["RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO"] = "0"
178os.environ["RAY_DISABLE_DASHBOARD"] = "1"
179os.environ["RAY_USAGE_STATS_ENABLED"] = "0"
180
181ray.init(ignore_reinit_error=True, include_dashboard=False)
182
183try:
184 agents = [SwarmAgent.remote(config) for i in range(config['num_swarms'])
185 ]
186
187 best_global = Particle()
188
189 for r in range(config["num_rounds"]):
190 # 1) Cada agente ejecuta varias iteraciones locales en paralelo
191 futures = [a.step.remote(config['ngen']) for a in agents]
192 bests = [best for best in ray.get(futures)] # [(f, x), ...]
193 bests.sort(key=lambda best: best.fitness)
194
195 # 2) Actualizar mejor global
196 best = bests[0]
197 if best_global.fitness > best.fitness:
198 best_global.x = best.x[:]
199 best_global.fitness = best.fitness
200 print(f"Ronda {r:02d} | Mejor: {best_global.fitness:.6f}")
201
202 # 3) Migración: intercambio de candidatos entre agentes
203 if (r + 1) % int(config["migrate_interval"]) == 0:
204 print("Migrando")
205 migration_futures = []
206 migration_particles = [(particle.fitness, particle.x) for particle in bests[:config['migrate_k']]]
207 for a in agents:
208 # Candidatos: enviamos los k mejores a los otros agentes
209 migration_futures.append(a.migrate.remote(migration_particles))
210 ray.get(migration_futures)
211 print(f"Mejor global: {best_global.fitness:.6f} | {best_global.x}")
212finally:
213 ray.shutdown()
214