""" API endpoints for simulation control. """ from typing import Dict, Optional from fastapi import APIRouter, HTTPException from pydantic import BaseModel from ..models.config import SimulationConfigModel from ..models.results import ( SimulationResultsModel, QueueStatisticsModel, TimeSeriesDataModel, HistogramDataModel ) from ..core.simulation import Simulator from ..analytics.jackson import JacksonAnalyzer from ..analytics.comparison import compare_results router = APIRouter(prefix="/api/simulation", tags=["simulation"]) # In-memory storage for simulation sessions # In production, use Redis or a database simulation_sessions: Dict[str, dict] = {} class SimulationSessionResponse(BaseModel): """Response when starting a simulation.""" session_id: str status: str message: str @router.post("/start", response_model=SimulationSessionResponse) async def start_simulation(config: SimulationConfigModel): """ Start a new simulation with the given configuration. Args: config: Simulation configuration Returns: Session information """ try: # Convert Pydantic model to internal config internal_config = config.to_simulation_config() # Create simulator simulator = Simulator(internal_config) # Run simulation (synchronous for now - could be async/background task) results = simulator.run() # Generate session ID import uuid session_id = str(uuid.uuid4()) # Store results simulation_sessions[session_id] = { "config": config.model_dump(), "results": results, "status": "completed" } return SimulationSessionResponse( session_id=session_id, status="completed", message="Simulation completed successfully" ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @router.get("/results/{session_id}") async def get_simulation_results(session_id: str): """ Get results for a completed simulation. Args: session_id: Session identifier Returns: Simulation results """ if session_id not in simulation_sessions: raise HTTPException(status_code=404, detail="Session not found") session = simulation_sessions[session_id] results = session["results"] # Convert to response model coordinator_stats = QueueStatisticsModel( queue_id="coordinator", service_rate=results.coordinator_stats["service_rate"], total_arrivals=results.coordinator_stats["total_arrivals"], total_departures=results.coordinator_stats["total_departures"], average_wait_time=results.coordinator_stats["average_wait_time"], average_service_time=results.coordinator_stats["average_service_time"], average_system_time=results.coordinator_stats["average_system_time"], utilization=results.coordinator_stats["utilization"] ) server_stats = {} for server_id, stats in results.server_stats.items(): server_stats[server_id] = QueueStatisticsModel( queue_id=server_id, service_rate=stats["service_rate"], total_arrivals=stats["total_arrivals"], total_departures=stats["total_departures"], average_wait_time=stats["average_wait_time"], average_service_time=stats["average_service_time"], average_system_time=stats["average_system_time"], utilization=stats["utilization"] ) # Check stability (simple heuristic: max utilization) max_utilization = max( results.coordinator_stats["utilization"], max(s["utilization"] for s in results.server_stats.values()) if results.server_stats else 0 ) is_stable = max_utilization < 0.95 # Convert time series data if available time_series = None if results.time_series_data: time_series = TimeSeriesDataModel( timestamps=results.time_series_data["timestamps"], customers_in_system=results.time_series_data["customers_in_system"], customers_per_queue=results.time_series_data["customers_per_queue"], cumulative_arrivals_per_queue=results.time_series_data["cumulative_arrivals_per_queue"], cumulative_departures_per_queue=results.time_series_data["cumulative_departures_per_queue"] ) # Convert histogram data if available histogram = None if results.histogram_data: histogram = HistogramDataModel( bins=results.histogram_data["bins"], frequencies=results.histogram_data["frequencies"], min_value=results.histogram_data["min_value"], max_value=results.histogram_data["max_value"], mean=results.histogram_data["mean"], std_dev=results.histogram_data["std_dev"] ) response = SimulationResultsModel( config=session["config"], total_requests_arrived=results.total_requests_arrived, total_requests_completed=results.total_requests_completed, average_system_time=results.average_system_time, average_customers_in_system=results.average_customers_in_system, coordinator_stats=coordinator_stats, server_stats=server_stats, time_series=time_series, processing_time_histogram=histogram, is_stable=is_stable, stability_notes="High utilization detected" if not is_stable else None ) return response @router.delete("/results/{session_id}") async def delete_simulation_results(session_id: str): """ Delete simulation results. Args: session_id: Session identifier Returns: Confirmation message """ if session_id not in simulation_sessions: raise HTTPException(status_code=404, detail="Session not found") del simulation_sessions[session_id] return {"message": "Session deleted successfully"} @router.get("/sessions") async def list_simulation_sessions(): """ List all simulation sessions. Returns: List of session IDs and their status """ sessions = [] for session_id, session in simulation_sessions.items(): sessions.append({ "session_id": session_id, "status": session["status"], "total_requests": session["results"].total_requests_completed }) return {"sessions": sessions, "count": len(sessions)}