3. Código fuente para los ejemplos de optimización de control difuso con PSO#

En este anexo se incluye el código fuente utilizado para optimizar un controlador difuso (FIS) mediante Particle Swarm Optimization (PSO). El flujo de trabajo general es:

  1. tunable_fuzzy.py construye un FIS parametrizable y genera un controlador callable.

  2. evaluate_controller.py evalúa el controlador como una caja negra y devuelve un valor escalar (fitness), típicamente el RMSE.

  3. pso.py ejecuta PSO (con DEAP) para minimizar el RMSE del controlador parametrizable.

  4. best_controller.json guarda el mejor vector de parámetros encontrado, también las ruta utilizadas para el entrenamiento.

  5. show_controller.py carga el mejor controlador y permite visualizar una ruta específica (animación, gráficas y funciones de membresía).

3.1. Estructura de archivos#

El directorio del ejemplo contiene los siguientes archivos:

fuzzy_code/
├── tunable_fuzzy.py
├── evaluate_controller.py
├── pso.py
├── show_controller.py
└── best_controller.json

Nota

Este anexo documenta los archivos específicos de la optimización con PSO. La simulación del robot tipo bicicleta y la generación de rutas (por ejemplo rear_wheel_sim.py y path.py) se describen en el anexo anterior de la simulación y se reutilizan sin cambios.

3.2. tunable_fuzzy.py#

Este archivo define el FIS ajustable. Su responsabilidad es:

  • Construir el sistema de inferencia difusa a partir de parámetros (por ejemplo, un vector de float).

  • Proveer una función get_controller(params) que regresa un controlador callable con interfaz:

\[(e_{th}, e) \;\longrightarrow\; \omega\]
FIS parametrizable para optimización (tunable_fuzzy.py)#
  1
  2import numpy as np
  3import skfuzzy as fuzz
  4from skfuzzy import control as ctrl
  5
  6def clamp(x, lo, hi):
  7   return max(lo, min(hi, x))
  8
  9def decode_params(x):
 10   """
 11   Convierte un vector (lista de floats) en un diccionario de parámetros.
 12
 13   x : list[float]
 14       Vector que produce PSO. En este ejemplo se asume longitud 6.
 15   """
 16   assert len(x) == 6
 17
 18   # Interpretación: parámetros simétricos por variable en universo [-1, 1]
 19   e_z, e_a, eth_z, eth_a, w_z, w_a = x
 20
 21   # Límites básicos para mantener MFs válidas y estables
 22   # - z controla el ancho de la zona "cero" (debe ser pequeño)
 23   # - a controla dónde empieza la saturación de NS/PS (debe ser mayor que z)
 24   e_z   = clamp(e_z,   0.05, 0.60)
 25   e_a   = clamp(e_a,   0.10, 1.00)
 26   eth_z = clamp(eth_z, 0.05, 0.60)
 27   eth_a = clamp(eth_a, 0.10, 1.00)
 28   w_z   = clamp(w_z,   0.05, 0.80)
 29   w_a   = clamp(w_a,   0.10, 1.00)
 30
 31   # Asegurar orden: a >= z + margen
 32   margin = 0.05
 33   e_a   = max(e_a,   e_z   + margin)
 34   eth_a = max(eth_a, eth_z + margin)
 35   w_a   = max(w_a,   w_z   + margin)
 36
 37   params = {
 38       "e_z": e_z, "e_a": e_a,
 39       "eth_z": eth_z, "eth_a": eth_a,
 40       "w_z": w_z, "w_a": w_a,
 41   }
 42
 43   return params
 44
 45def build_fis(params=None):
 46    """
 47    Construye el sistema de inferencia difusa (FIS).
 48
 49    params: dict | None
 50        Parámetros opcionales para modificar las funciones de membresía.
 51        En esta versión base, se ignoran o se usan valores por defecto.
 52    """
 53    p = decode_params(params)
 54    eth_z = p["eth_z"]
 55    eth_a = p["eth_a"]
 56    e_z   = p["e_z"]
 57    e_a   = p["e_a"]
 58    w_z   = p["w_z"]
 59    w_a   = p["w_a"]
 60
 61    # Universos (normalizados)
 62    e_th = ctrl.Antecedent(np.linspace(-1.0, 1.0, 201), "e_th")   # adimensional
 63    e    = ctrl.Antecedent(np.linspace(-1.0, 1.0, 201), "e")      # adimensional
 64    omega = ctrl.Consequent(np.linspace(-1.0, 1.0, 201), "omega") # adimensional
 65
 66    e_th["NS"] = fuzz.trapmf(e_th.universe, [-1.0, -1.0, -eth_a, 0.0])
 67    e_th["Z"]  = fuzz.trimf(e_th.universe,  [-eth_z,  0.0,  eth_z])
 68    e_th["PS"] = fuzz.trapmf(e_th.universe, [ 0.0,  eth_a,  1.0, 1.0])
 69
 70    e["NS"] = fuzz.trapmf(e.universe, [-1.0, -1.0, -e_a, 0.0])
 71    e["Z"]  = fuzz.trimf(e.universe,  [-e_z,  0.0,  e_z])
 72    e["PS"] = fuzz.trapmf(e.universe, [ 0.0,  e_a,  1.0, 1.0])
 73
 74    omega["NS"] = fuzz.trapmf(omega.universe, [-1.0, -1.0, -w_a, 0.0])
 75    omega["Z"]  = fuzz.trimf(omega.universe,  [-w_z,  0.0,  w_z])
 76    omega["PS"] = fuzz.trapmf(omega.universe, [ 0.0,  w_a,  1.0, 1.0])
 77
 78
 79    # Reglas explícitas (3x3)
 80    rules = [
 81        ctrl.Rule(e_th['NS'] & e['NS'], omega['PS']),
 82        ctrl.Rule(e_th['NS'] & e['Z'], omega['PS']),
 83        ctrl.Rule(e_th['NS'] & e['PS'], omega['Z']),
 84
 85        ctrl.Rule(e_th['Z'] & e['NS'], omega['PS']),
 86        ctrl.Rule(e_th['Z'] & e['Z'], omega['Z']),
 87        ctrl.Rule(e_th['Z'] & e['PS'], omega['NS']),
 88
 89        ctrl.Rule(e_th['PS'] & e['NS'], omega['Z']),
 90        ctrl.Rule(e_th['PS'] & e['Z'], omega['NS']),
 91        ctrl.Rule(e_th['PS'] & e['PS'], omega['NS']),
 92    ]
 93
 94    fis = ctrl.ControlSystem(rules)
 95    return fis
 96
 97
 98def get_controller(params=None):
 99    fis = build_fis(params)
100    sim = ctrl.ControlSystemSimulation(fis)
101
102    # Escalas físicas (baseline)
103    ETH_MAX = 1.5   # rad
104    E_MAX = 3.0     # m
105    OMEGA_MAX = 8.0 # rad/s
106
107    def controller(e_th, e):
108        # 1) Normalizar entradas
109        e_th_n = clamp(float(e_th) / ETH_MAX,-1.0,1.0)
110        e_n = clamp(float(e) / E_MAX,-1.0,1.0)
111
112        # 2) Evaluar FIS
113        sim.reset()
114        sim.input["e_th"] = e_th_n
115        sim.input["e"] = e_n
116        sim.compute()
117
118        # 3) Desnormalizar salida
119        omega_n = float(sim.output["omega"])
120        omega = OMEGA_MAX * omega_n
121        return omega
122
123    return controller
124
125
126def plot_mfs(params=None):
127    """
128   Visualiza las funciones de membresía (modo exploración).
129   """
130    fis = build_fis(params)
131    for variable in fis.fuzzy_variables:
132        variable.view()
133    
134
135if __name__ == "__main__":
136    # Ejecuta: python my_fis.py
137    # para inspeccionar las funciones de membresía.
138    plot_mfs()
139

3.3. evaluate_controller.py#

Este script implementa la función objetivo (fitness). En la práctica:

  • Recibe un vector de parámetros (o None para usar el controlador base).

  • Construye el controlador difuso mediante tunable_fuzzy.get_controller.

  • Ejecuta la simulación sobre un conjunto fijo de rutas.

  • Calcula el desempeño (por ejemplo, RMSE del error lateral).

  • Penaliza escenarios donde el robot no llega a la meta o pierde la referencia.

Evaluación del controlador como función objetivo (evaluate_controller.py)#
 1import tunable_fuzzy as fc
 2import rear_wheel_sim as rw_sim
 3import path
 4import numpy as np
 5
 6def compute_rmse(traces):
 7    errors = np.array([tr.error for tr in traces])
 8    return np.sqrt(np.mean(errors**2))
 9
10def evaluate_controller(params):
11
12    paths = [
13        ([0.0, 6.0, 12.5, 5.0, 7.5, 3.0, -1.0], [0.0, 0.0, 5.0, 6.5, 3.0, 5.0, -2.0]),
14        ([0.0, 1.0, 2.5, 5.0, 7.5, 3.0, -1.0], [0.0, -4.0, 6.0, 6.5, 3.0, 5.0, -2.0]),
15        ([0.0, 2.0, 2.5, 5.0, 7.5, -3.0, -1.0], [0.0, 3.0, 6.0, 6.5, 5.0, 5.0, -2.0]),
16    ]
17    
18    controller = None
19    
20    if params:
21        controller = fc.get_controller(params)
22    
23    rmses = []
24    for ax, ay in paths:
25        goal = [ax[-1], ay[-1]]
26        reference_path = path.CubicSplinePath(ax, ay)
27
28        result = rw_sim.simulacion(
29            reference_path,
30            goal,
31            controller=controller,
32        )
33        if not result['goal_flag']:
34            return float(10.0), # Return Tuple for DEAP
35        if result['error_flag']:
36            return float(10.0), # Return Tuple for DEAP
37
38        traces = result["traces"]
39        rmse = compute_rmse(traces)
40        rmses.append(rmse)
41        #print(result['goal_flag'], result['error_flag'], rmse)
42        # Visualización (opcional): se ejecuta después de la simulación
43        #rw_sim.animate(result, pause=0.001)
44        #rw_sim.plot(result)
45        #fc.plot_mfs(params=params)
46    #print(np.mean(rmses))
47    return float(np.mean(rmses)), # Return Tuple for DEAP
48

3.4. pso.py#

Implementación del algoritmo PSO utilizando DEAP. Su función es:

  • Definir la representación de la partícula (posición/velocidad/mejor histórico).

  • Definir el espacio de búsqueda (rangos mínimos y máximos por parámetro).

  • Ejecutar el ciclo de optimización llamando a evaluate_controller.py.

  • Imprime el mejor vector de parámetros.

Optimización del FIS mediante PSO (pso.py)#
 1import operator
 2import random
 3
 4import numpy
 5import math
 6
 7from deap import base
 8from deap import creator
 9from deap import tools
10
11from evaluate_controller import evaluate_controller
12creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
13creator.create("Particle", list, fitness=creator.FitnessMin, speed=list,
14    smin=None, smax=None, best=None)
15
16def generate(size, pmin, pmax, smin, smax):
17    part = creator.Particle(random.uniform(pmin, pmax) for _ in range(size))
18    part.speed = [random.uniform(smin, smax) for _ in range(size)]
19    part.smin = smin
20    part.smax = smax
21    return part
22
23def updateParticle(part, best, phi1, phi2):
24    u1 = (random.uniform(0, phi1) for _ in range(len(part)))
25    u2 = (random.uniform(0, phi2) for _ in range(len(part)))
26    v_u1 = map(operator.mul, u1, map(operator.sub, part.best, part))
27    v_u2 = map(operator.mul, u2, map(operator.sub, best, part))
28    part.speed = list(map(operator.add, part.speed, map(operator.add, v_u1, v_u2)))
29    for i, speed in enumerate(part.speed):
30        if abs(speed) < part.smin:
31            part.speed[i] = math.copysign(part.smin, speed)
32        elif abs(speed) > part.smax:
33            part.speed[i] = math.copysign(part.smax, speed)
34    part[:] = list(map(operator.add, part, part.speed))
35
36toolbox = base.Toolbox()
37toolbox.register("particle", generate, size=6, pmin=-0.05, pmax=1.5, smin=-0.1, smax=0.1)
38toolbox.register("population", tools.initRepeat, list, toolbox.particle)
39toolbox.register("update", updateParticle, phi1=2.0, phi2=2.0)toolbox.register("evaluate", evaluate_controller)
40
41def main():
42    pop = toolbox.population(n=50)
43    stats = tools.Statistics(lambda ind: ind.fitness.values)
44    stats.register("avg", numpy.mean)
45    stats.register("std", numpy.std)
46    stats.register("min", numpy.min)
47    stats.register("max", numpy.max)
48
49    logbook = tools.Logbook()
50    logbook.header = ["gen", "evals"] + stats.fields
51
52    GEN = 20
53    best = None
54
55    for g in range(GEN):
56        for part in pop:
57            part.fitness.values = toolbox.evaluate(part)
58            if not part.best or part.best.fitness < part.fitness:
59                part.best = creator.Particle(part)
60                part.best.fitness.values = part.fitness.values
61            if not best or best.fitness < part.fitness:
62                best = creator.Particle(part)
63                best.fitness.values = part.fitness.values
64
65        for part in pop:
66            toolbox.update(part, best)
67        # Gather all the fitnesses in one list and print the stats
68        logbook.record(gen=g, evals=len(pop), **stats.compile(pop))
69        print(logbook.stream)
70    print(best.fitness, best.fitness.values, part.best.fitness, part.fitness.values)
71    return pop, logbook, best
72
73if __name__ == "__main__":
74    main()

3.5. best_controller.json#

Archivo de configuración que guarda el mejor controlador encontrado. Contiene principalmente:

  • params: vector de parámetros (lista de float) que define el FIS.

  • (Opcional) rutas utilizadas, métricas y metadatos para reproducibilidad.

Mejor controlador encontrado (best_controller.json)#
1{
2  "params": [0.02906318670445844, 0.5689901881492568, 0.959185644460144, 0.11181303651379565, 0.09595559801638323, 0.3875820435302194],
3  "paths": [
4    {"ax": [0.0, 6.0, 12.5, 5.0, 7.5, 3.0, -1.0], "ay": [0.0, 0.0, 5.0, 6.5, 3.0, 5.0, -2.0]},
5    {"ax": [0.0, 1.0, 2.5, 5.0, 7.5, 3.0, -1.0], "ay": [0.0, -4.0, 6.0, 6.5, 3.0, 5.0, -2.0]},
6    {"ax": [0.0, 2.0, 2.5, 5.0, 7.5, -3.0, -1.0], "ay": [0.0, 3.0, 6.0, 6.5, 5.0, 5.0, -2.0]}
7  ],
8  "notes": "Best found by PSO"
9}

3.6. show_controller.py#

Script de visualización y diagnóstico. Este archivo:

  • Carga best_controller.json.

  • Construye el controlador difuso con esos parámetros.

  • Ejecuta la simulación sobre una ruta específica (por índice).

  • Muestra animación y gráficas del seguimiento.

  • Despliega las funciones de membresía resultantes.

Visualización del mejor controlador (show_controller.py)#
 1import json
 2import tunable_fuzzy as fc
 3from evaluate_controller import compute_rmse
 4import rear_wheel_sim as rw_sim
 5import path
 6import argparse
 7
 8def load_config(path_cfg):
 9    with open(path_cfg, "r") as f:
10        cfg = json.load(f)
11    params = cfg["params"]
12    paths = [(p["ax"], p["ay"]) for p in cfg["paths"]]
13    return params, paths, cfg
14
15def show_controller(params, paths, path_id):
16
17    controller = fc.get_controller(params)
18
19    ax, ay = paths[path_id]
20    goal = [ax[-1], ay[-1]]
21    reference_path = path.CubicSplinePath(ax, ay)
22    result = rw_sim.simulacion(
23        reference_path,
24        goal,
25        controller=controller,
26    )
27    if not result['goal_flag']:
28        print("⚠️  No se alcanzó la meta")
29    if result['error_flag']:
30        print("⚠️  Error en la simulación")
31
32    traces = result["traces"]
33    rmse = compute_rmse(traces)
34    print('rmse: {rmse}')
35
36    rw_sim.animate(result, pause=0.001)
37    rw_sim.plot(result)
38    fc.plot_mfs(params=params)
39
40if __name__ == "__main__":
41    parser = argparse.ArgumentParser(
42        description="Visualiza el controlador difuso para una ruta específica"
43    )
44    parser.add_argument(
45        "path_id",
46        type=int,
47        nargs="?",
48        default=0,
49        help="Índice de la ruta a mostrar (default: 0)",
50    )
51    parser.add_argument(
52        "--config",
53        type=str,
54        default="best_controller.json",
55        help="Archivo de configuración del controlador",
56    )
57
58    args = parser.parse_args()
59
60    params, paths, cfg = load_config(args.config)
61
62    if args.path_id < 0 or args.path_id >= len(paths):
63        raise ValueError(f"path_id debe estar entre 0 y {len(paths)-1}")
64
65    show_controller(params, paths, path_id=args.path_id)
66

3.7. Ejemplo de uso#

Para mostrar el mejor controlador sobre una ruta específica (por ejemplo, la ruta con índice 2) y la ubicación del archivo con la configuración del FIS:

python show_controller.py 2 --configure ./best_controller.json

Para ejecutar la optimización con PSO (puede tomar tiempo dependiendo del número de partículas y generaciones):

python pso.py

4. Código fuente para los ejemplos de Escalado Computacional#

El siguiente archivo muestra la versión del algoritmo PSO donde la evaluación de la aptitud de cada partícula se realiza en paralelo utilizando tareas remotas de Ray. El resto del algoritmo se ejecuta de manera secuencial en un solo proceso coordinador.

Uso de Ray para ejecutar la evaluación distribuida (pso_ray.py)#
  1import operator
  2import random
  3
  4import numpy
  5import math
  6
  7from deap import base
  8from deap import creator
  9from deap import tools
 10
 11import ray
 12from evaluate_controller import evaluate_controller
 13
 14@ray.remote
 15def remote_ev_controller(particle):
 16    return evaluate_controller(particle)
 17
 18
 19creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
 20creator.create("Particle", list, fitness=creator.FitnessMin, speed=list,
 21    smin=None, smax=None, best=None)
 22
 23
 24
 25def generate(size, pmin, pmax, smin, smax):
 26    part = creator.Particle(random.uniform(pmin, pmax) for _ in range(size))
 27    part.speed = [random.uniform(smin, smax) for _ in range(size)]
 28    part.smin = smin
 29    part.smax = smax
 30    return part
 31
 32def updateParticle(part, best, phi1, phi2):
 33    u1 = (random.uniform(0, phi1) for _ in range(len(part)))
 34    u2 = (random.uniform(0, phi2) for _ in range(len(part)))
 35    v_u1 = map(operator.mul, u1, map(operator.sub, part.best, part))
 36    v_u2 = map(operator.mul, u2, map(operator.sub, best, part))
 37    part.speed = list(map(operator.add, part.speed, map(operator.add, v_u1, v_u2)))
 38    for i, speed in enumerate(part.speed):
 39        if abs(speed) < part.smin:
 40            part.speed[i] = math.copysign(part.smin, speed)
 41        elif abs(speed) > part.smax:
 42            part.speed[i] = math.copysign(part.smax, speed)
 43    part[:] = list(map(operator.add, part, part.speed))
 44
 45toolbox = base.Toolbox()
 46toolbox.register("particle", generate, size=6, pmin=-0.05, pmax=1.5, smin=-0.1, smax=0.1)
 47toolbox.register("population", tools.initRepeat, list, toolbox.particle)
 48toolbox.register("update", updateParticle, phi1=2.0, phi2=2.0)
 49#toolbox.register("evaluate", ev_controller)
 50
 51
 52def evaluate_population_ray(pop):
 53    futures_fitness_values = [remote_ev_controller.remote(list(particle)) for particle in pop] 
 54    results = ray.get(futures_fitness_values)
 55    assert len(pop) == len(results)
 56    
 57    for i, part in enumerate(pop):
 58        part.fitness.values = results[i]
 59    return pop
 60
 61def main():
 62    pop = toolbox.population(n=50)
 63    stats = tools.Statistics(lambda ind: ind.fitness.values)
 64    stats.register("avg", numpy.mean)
 65    stats.register("std", numpy.std)
 66    stats.register("min", numpy.min)
 67    stats.register("max", numpy.max)
 68
 69    logbook = tools.Logbook()
 70    logbook.header = ["gen", "evals"] + stats.fields
 71
 72    GEN = 20
 73    best = None
 74    
 75
 76    for g in range(GEN):
 77        evaluate_population_ray(pop)
 78        for part in pop:
 79            # No calculamos el fitnes utilizando el toolbox
 80            # lo hacemos con la función evaluate_population_ray(pop)
 81
 82            # part.fitness.values = toolbox.evaluate(part)
 83        
 84            # Se compara el fitness no el valor del RMSE
 85            # en este caso los controladores con menor RMSE
 86            # tienen un mayor fitness. DEAP internamente 
 87            # hace la multiplicación por -1 ya que estamos minimizando. 
 88            if not part.best or part.best.fitness < part.fitness:
 89                part.best = creator.Particle(part)
 90                part.best.fitness.values = part.fitness.values
 91            if not best or best.fitness < part.fitness:
 92                best = creator.Particle(part)
 93                best.fitness.values = part.fitness.values
 94
 95        for part in pop:
 96            toolbox.update(part, best)
 97        # Gather all the fitnesses in one list and print the stats
 98        logbook.record(gen=g, evals=len(pop), **stats.compile(pop))
 99        print(logbook.stream)
100        
101    print(best.fitness.values, best)
102    return pop, logbook, best
103
104if __name__ == "__main__":
105    ray.init()
106    main()
107    ray.shutdown()
108
109
110
111
112

A continuación se muestra el código completo del ejemplo de PSO multi-enjambre, donde cada población se implementa como un actor con estado (SwarmAgent). En este caso, Ray se utiliza tanto para ejecutar evaluaciones stateless como para mantener poblaciones persistentes que evolucionan y migran soluciones de manera coordinada.

Uso de Ray para implementar PSO multi-enjambre (pso_agent.py)#
  1import random
  2
  3import ray
  4from dataclasses import dataclass, field
  5from typing import List, Tuple, Dict
  6from evaluate_controller import evaluate_controller
  7
  8
  9@ray.remote
 10def remote_ev_controller(x: List[float]) -> float:
 11    fitness = evaluate_controller(x)
 12    # Normaliza a float aunque venga como (val,)
 13    if isinstance(fitness, tuple):
 14        return float(fitness[0])
 15    return float(fitness)
 16
 17def clip(value: float, vmin: float, vmax: float) -> float:
 18    return max(vmin, min(vmax, value))
 19
 20@dataclass
 21class Particle:
 22    """
 23    Partícula para PSO.
 24
 25    Attributes
 26    ----------
 27    x : list[float]
 28        Posición (solución candidata).
 29    v : list[float]
 30        Velocidad.
 31    best_x : list[float]
 32        Mejor posición personal encontrada hasta el momento.
 33    best_f : float
 34        Mejor valor de fitness (a minimizar) asociado a best_x.
 35    """
 36    x: List[float] = field(default_factory=list)
 37    speed: List[float] = field(default_factory=list)
 38    fitness: float = float('inf')
 39    best_x: List[float] = field(default_factory=list)
 40    best_f: float = float("inf")
 41    
 42    def __repr__(self) -> str:
 43        return f"Particle(f={self.fitness:.4f}, x={[round(v, 3) for v in self.x]})"
 44
 45    def __str__(self) -> str:
 46        return (
 47            "Particle(\n"
 48            f"  fitness = {self.fitness:.6f}\n"
 49            f"  x        = {[round(v, 4) for v in self.x]}\n"
 50            f"  speed    = {[round(v, 4) for v in self.speed]}\n"
 51            f"  best_f   = {self.best_f:.6f}\n"
 52            f"  best_x   = {[round(v, 4) for v in self.best_x]}\n"
 53            ")"
 54        )
 55
 56@ray.remote
 57class SwarmAgent:
 58    def __init__(self, config: Dict) -> None:
 59        self.config = config
 60        self.swarm_size = int(config["swarm_size"])
 61        self.size = int(config['dim'])
 62
 63        self.smin = float(config['smin'])
 64        self.smax = float(config['smax'])
 65        self.pmax = float(config['pmax'])
 66        self.pmin = float(config['pmin'])
 67        self.weight = float(config.get("w", 0.7))
 68        self.phi1 = float(config['phi1'])
 69        self.phi2 = float(config['phi2'])
 70        self.pop: List[Particle] = [self.generate() for _ in range(self.swarm_size)]
 71        self.gbest = Particle(
 72            x=self.pop[0].x.copy(),
 73            speed=[0.0] * self.size,
 74            fitness=float("inf"),
 75            best_x=self.pop[0].x.copy(),
 76            best_f=float("inf"),
 77        )
 78
 79    def generate(self) -> Particle:
 80        x = [random.uniform(self.pmin, self.pmax) for _ in range(self.size)]
 81        speed = [random.uniform(self.smin, self.smax) for _ in range(self.size)]
 82        return Particle(x=x, speed=speed, fitness=float("inf"), best_x=x.copy(), best_f=float("inf"))
 83
 84    def _update_particle(self, p: Particle) -> None:
 85        for d in range(self.size):
 86            r1 = random.random()
 87            r2 = random.random()
 88
 89            cognitive = self.phi1 * r1 * (p.best_x[d] - p.x[d])
 90            social = self.phi2 * r2 * (self.gbest.x[d] - p.x[d])
 91
 92            p.speed[d] = self.weight * p.speed[d] + cognitive + social
 93            p.speed[d] = clip(p.speed[d], self.smin, self.smax)
 94
 95            p.x[d] = p.x[d] + p.speed[d]
 96            p.x[d] = clip(p.x[d], self.pmin, self.pmax)
 97
 98
 99    def evaluate_population_ray(self):
100        futures_fitness_values = [remote_ev_controller.remote(list(particle.x)) for particle in self.pop] 
101        results = ray.get(futures_fitness_values)
102        assert len(self.pop) == len(results)
103        
104        for p, f in zip(self.pop, results):
105            p.fitness = f
106
107            if f < p.best_f:
108                p.best_f = f
109                p.best_x = p.x.copy()
110
111            if f < self.gbest.fitness:
112                self.gbest.fitness = f
113                self.gbest.x = p.x.copy()
114                self.gbest.best_f = f
115                self.gbest.best_x = p.x.copy()
116
117 
118    def migrate(self, candidates: List[Tuple[float, List[float]]]) -> None:
119        """
120        candidates: lista de (fitness, x) (tipos planos).
121        Reemplaza las peores partículas por estos candidatos.
122        """
123        # Asegurar fitness actualizado para identificar peores
124        self.evaluate_population_ray()
125
126        # Peores primero
127        worst_idx = sorted(range(self.swarm_size), key=lambda i: self.pop[i].fitness, reverse=True)
128        k = min(len(candidates), self.swarm_size)
129
130        for j in range(k):
131            f_cand, x_cand = candidates[j]
132            i = worst_idx[j]
133            # Insertar candidato con velocidad nueva (para no heredar dinámica ajena)
134            self.pop[i] = Particle(
135                x=list(x_cand),
136                speed=[random.uniform(self.smin, self.smax) for _ in range(self.size)],
137                fitness=float(f_cand),
138                best_x=list(x_cand),
139                best_f=float(f_cand),
140            )
141
142            if f_cand < self.gbest.fitness:
143                self.gbest.fitness = float(f_cand)
144                self.gbest.x = list(x_cand)
145
146
147    def step(self, n_iters: int) -> Particle:
148        for _ in range(n_iters):
149            self.evaluate_population_ray()
150            for part in self.pop:
151                self._update_particle(part)
152            # Gather all the fitnesses in one list and print the stats
153            # print(logbook.stream)
154        return self.gbest
155
156
157config = {
158    "smin": -0.2,
159    "smax":  0.20,
160    "pmin": 0.0,
161    "pmax": 1.0,
162    "phi1": 2.0,
163    "phi2": 2.0,
164
165    "dim": 6,
166    "swarm_size": 10,
167
168    "ngen": 4,
169    "migrate_interval":2,
170    "migrate_k":2,
171    "num_swarms": 6,
172    "num_rounds": 8,
173}
174
175
176import os
177os.environ["RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO"] = "0"
178os.environ["RAY_DISABLE_DASHBOARD"] = "1"
179os.environ["RAY_USAGE_STATS_ENABLED"] = "0"
180
181ray.init(ignore_reinit_error=True, include_dashboard=False)
182
183try:
184    agents = [SwarmAgent.remote(config) for i in range(config['num_swarms'])
185    ]
186   
187    best_global = Particle()
188
189    for r in range(config["num_rounds"]):
190       # 1) Cada agente ejecuta varias iteraciones locales en paralelo
191       futures = [a.step.remote(config['ngen']) for a in agents]
192       bests = [best for best in ray.get(futures)]  # [(f, x), ...]
193       bests.sort(key=lambda best: best.fitness) 
194
195       # 2) Actualizar mejor global
196       best = bests[0]
197       if best_global.fitness > best.fitness:
198           best_global.x = best.x[:] 
199           best_global.fitness  = best.fitness
200       print(f"Ronda {r:02d} | Mejor: {best_global.fitness:.6f}")
201       
202       # 3) Migración: intercambio de candidatos entre agentes
203       if (r + 1) % int(config["migrate_interval"]) == 0:
204           print("Migrando")
205           migration_futures = []
206           migration_particles = [(particle.fitness, particle.x)  for particle in bests[:config['migrate_k']]]
207           for a in agents:
208               # Candidatos: enviamos los k mejores a los otros agentes 
209               migration_futures.append(a.migrate.remote(migration_particles))
210           ray.get(migration_futures)
211    print(f"Mejor global: {best_global.fitness:.6f} | {best_global.x}")
212finally:
213    ray.shutdown()
214