From a14856330b5fd8cc5ec5d513b2de86a060dcf3b6 Mon Sep 17 00:00:00 2001 From: Massiles Ghernaout <749-gm213204@users.noreply.www-apps.univ-lehavre.fr> Date: Fri, 6 Feb 2026 19:45:26 +0100 Subject: [PATCH] added an automatic scaler for our workers, it is basically a polling loop that will check every now and then how many jobs are there in the redis queue, if #>maxthreshold then scale up if # { console.log(`Backend API listening on port ${port}`); + if (process.env.SCALER_ENABLED === "true") { + startScaler(); + } }); diff --git a/backend/src/routes/cluster.js b/backend/src/routes/cluster.js index 750fd4c..e1a012e 100644 --- a/backend/src/routes/cluster.js +++ b/backend/src/routes/cluster.js @@ -1,6 +1,7 @@ import express from "express"; import Docker from "dockerode"; import Redis from "ioredis"; +import { scaleToReplicas } from "../services/clusterService.js"; const router = express.Router(); @@ -10,6 +11,8 @@ const docker = new Docker({ socketPath: dockerSocketPath }); const redisUrl = process.env.REDIS_URL || "redis://redis:6379"; const redis = new Redis(redisUrl); +const workerServiceName = process.env.WORKER_SERVICE_NAME || "md5_hash_worker"; + // GET /cluster/state - état simple du cluster router.get("/state", async (req, res) => { try { @@ -18,7 +21,6 @@ router.get("/state", async (req, res) => { redis.llen("jobs:pending"), ]); - const workerServiceName = process.env.WORKER_SERVICE_NAME || "hash_worker"; const workerService = services.find( (s) => s.Spec && s.Spec.Name === workerServiceName ); @@ -50,33 +52,19 @@ router.post("/scale", async (req, res) => { return res.status(400).json({ error: "invalid replicas count" }); } - try { - const workerServiceName = process.env.WORKER_SERVICE_NAME || "hash_worker"; - const services = await docker.listServices(); - const serviceInfo = services.find( - (s) => s.Spec && s.Spec.Name === workerServiceName - ); - - if (!serviceInfo) { - return res.status(404).json({ error: "worker service not found" }); - } - - const service = docker.getService(serviceInfo.ID); - const spec = serviceInfo.Spec; - spec.Mode = spec.Mode || {}; - spec.Mode.Replicated = spec.Mode.Replicated || {}; - spec.Mode.Replicated.Replicas = desired; - - await service.update({ - version: serviceInfo.Version.Index, - ...spec, - }); + const result = await scaleToReplicas(docker, workerServiceName, desired); - res.json({ workerReplicas: desired }); - } catch (err) { - console.error("Error scaling workers", err); - res.status(500).json({ error: "failed to scale workers" }); + if (result.ok) { + return res.json({ workerReplicas: result.workerReplicas }); + } + if (result.reason === "service_not_found") { + return res.status(404).json({ error: "worker service not found" }); + } + if (result.reason === "not_swarm") { + return res.status(503).json({ error: "Docker is not in Swarm mode" }); } + console.error("Error scaling workers", result.error); + res.status(500).json({ error: "failed to scale workers" }); }); export default router; diff --git a/backend/src/services/clusterService.js b/backend/src/services/clusterService.js new file mode 100644 index 0000000..1c454b1 --- /dev/null +++ b/backend/src/services/clusterService.js @@ -0,0 +1,39 @@ +/** + * Shared logic for scaling the worker service in Docker Swarm. + * Used by POST /cluster/scale and by the automatic scaler. + * @param {import("dockerode")} docker - Dockerode instance + * @param {string} workerServiceName - Name of the worker service (e.g. md5_hash_worker) + * @param {number} desiredReplicas - Desired replica count + * @returns {Promise<{ ok: boolean, workerReplicas?: number, reason?: string, error?: string }>} + */ +export async function scaleToReplicas(docker, workerServiceName, desiredReplicas) { + try { + const services = await docker.listServices(); + const serviceInfo = services.find( + (s) => s.Spec && s.Spec.Name === workerServiceName + ); + + if (!serviceInfo) { + return { ok: false, reason: "service_not_found" }; + } + + const service = docker.getService(serviceInfo.ID); + const spec = { ...serviceInfo.Spec }; + spec.Mode = spec.Mode || {}; + spec.Mode.Replicated = spec.Mode.Replicated || {}; + spec.Mode.Replicated.Replicas = desiredReplicas; + + await service.update({ + version: serviceInfo.Version.Index, + ...spec, + }); + + return { ok: true, workerReplicas: desiredReplicas }; + } catch (err) { + const msg = err?.message || String(err); + if (msg.includes("not a swarm manager") || msg.includes("This node is not a swarm manager")) { + return { ok: false, reason: "not_swarm" }; + } + return { ok: false, reason: "error", error: msg }; + } +} diff --git a/backend/src/services/scaler.js b/backend/src/services/scaler.js new file mode 100644 index 0000000..5d1ca16 --- /dev/null +++ b/backend/src/services/scaler.js @@ -0,0 +1,75 @@ +import Docker from "dockerode"; +import Redis from "ioredis"; +import { scaleToReplicas } from "./clusterService.js"; + +const dockerSocketPath = process.env.DOCKER_SOCKET || "/var/run/docker.sock"; +const redisUrl = process.env.REDIS_URL || "redis://redis:6379"; +const workerServiceName = process.env.WORKER_SERVICE_NAME || "md5_hash_worker"; + +const SCALER_INTERVAL_MS = Number(process.env.SCALER_INTERVAL_MS) || 10000; +const SCALER_MIN_REPLICAS = Number(process.env.SCALER_MIN_REPLICAS) || 1; +const SCALER_MAX_REPLICAS = Number(process.env.SCALER_MAX_REPLICAS) || 10; +const SCALER_SCALE_UP_WHEN_JOBS_ABOVE = Number(process.env.SCALER_SCALE_UP_WHEN_JOBS_ABOVE) || 5; +const SCALER_SCALE_DOWN_WHEN_JOBS_BELOW = Number(process.env.SCALER_SCALE_DOWN_WHEN_JOBS_BELOW) || 3; + +let intervalId = null; + +async function evaluate(docker, redis) { + try { + const pendingCount = await redis.llen("jobs:pending"); + + let services; + try { + services = await docker.listServices(); + } catch (err) { + const msg = err?.message || String(err); + if (msg.includes("not a swarm manager") || msg.includes("This node is not a swarm manager")) { + return; + } + throw err; + } + + const workerService = services.find( + (s) => s.Spec && s.Spec.Name === workerServiceName + ); + + let current = 0; + if (workerService?.Spec?.Mode?.Replicated != null) { + current = workerService.Spec.Mode.Replicated.Replicas ?? 0; + } + + let desired = -1; + + if (pendingCount >= SCALER_SCALE_UP_WHEN_JOBS_ABOVE) { + desired = Math.min(SCALER_MAX_REPLICAS, current + 1); + } else if (pendingCount <= SCALER_SCALE_DOWN_WHEN_JOBS_BELOW) { + desired = Math.max(SCALER_MIN_REPLICAS, current - 1); + } + + if(desired === -1) return; // value did not change, so no todo + + const result = await scaleToReplicas(docker, workerServiceName, desired); + + if (result.ok) { + console.log(`Scaler: scaled workers from ${current} to ${desired} (pending jobs: ${pendingCount})`); + } else { + console.warn("Scaler: scale failed", result.reason, result.error || ""); + } + } catch (err) { + console.error("Scaler: evaluation error", err?.message || err); + } +} + +export function startScaler() { + if (intervalId != null) { + console.log("Scaler: already started"); + return; + } + + const redis = new Redis(redisUrl); + const docker = new Docker({ socketPath: dockerSocketPath }); + + intervalId = setInterval(() => evaluate(docker, redis), SCALER_INTERVAL_MS); + + console.log(`Scaler: started (interval ${SCALER_INTERVAL_MS}ms, min=${SCALER_MIN_REPLICAS}, max=${SCALER_MAX_REPLICAS})`); +} diff --git a/infra/stack.yml b/infra/stack.yml index cecd8a8..6b80792 100644 --- a/infra/stack.yml +++ b/infra/stack.yml @@ -10,7 +10,14 @@ services: environment: - REDIS_URL=redis://redis:6379 - DOCKER_SOCKET=/var/run/docker.sock - - WORKER_SERVICE_NAME=hash_worker + - WORKER_SERVICE_NAME=md5_hash_worker + # Scaler (auto-scale workers from queue size) + - SCALER_ENABLED=true + - SCALER_INTERVAL_MS=10000 + - SCALER_MIN_REPLICAS=1 + - SCALER_MAX_REPLICAS=10 + - SCALER_SCALE_UP_WHEN_JOBS_ABOVE=5 + - SCALER_SCALE_DOWN_WHEN_JOBS_BELOW=3 volumes: - /var/run/docker.sock:/var/run/docker.sock networks: @@ -39,6 +46,8 @@ services: context: ../frontend ports: - "5173:80" + environment: + - API_URL=http://localhost:8080 networks: - md5_net -- GitLab