simulation.py 5,19 ko
Newer Older
"""
API endpoints for simulation control.
"""
from typing import Dict, Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel

from ..models.config import SimulationConfigModel
from ..models.results import SimulationResultsModel, QueueStatisticsModel
from ..core.simulation import Simulator
from ..analytics.jackson import JacksonAnalyzer
from ..analytics.comparison import compare_results


router = APIRouter(prefix="/api/simulation", tags=["simulation"])


# In-memory storage for simulation sessions
# In production, use Redis or a database
simulation_sessions: Dict[str, dict] = {}


class SimulationSessionResponse(BaseModel):
    """Response when starting a simulation."""
    session_id: str
    status: str
    message: str


@router.post("/start", response_model=SimulationSessionResponse)
async def start_simulation(config: SimulationConfigModel):
    """
    Start a new simulation with the given configuration.

    Args:
        config: Simulation configuration

    Returns:
        Session information
    """
    try:
        # Convert Pydantic model to internal config
        internal_config = config.to_simulation_config()

        # Create simulator
        simulator = Simulator(internal_config)

        # Run simulation (synchronous for now - could be async/background task)
        results = simulator.run()

        # Generate session ID
        import uuid
        session_id = str(uuid.uuid4())

        # Store results
        simulation_sessions[session_id] = {
            "config": config.model_dump(),
            "results": results,
            "status": "completed"
        }

        return SimulationSessionResponse(
            session_id=session_id,
            status="completed",
            message="Simulation completed successfully"
        )

    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))


@router.get("/results/{session_id}")
async def get_simulation_results(session_id: str):
    """
    Get results for a completed simulation.

    Args:
        session_id: Session identifier

    Returns:
        Simulation results
    """
    if session_id not in simulation_sessions:
        raise HTTPException(status_code=404, detail="Session not found")

    session = simulation_sessions[session_id]
    results = session["results"]

    # Convert to response model
    coordinator_stats = QueueStatisticsModel(
        queue_id="coordinator",
        service_rate=results.coordinator_stats["service_rate"],
        total_arrivals=results.coordinator_stats["total_arrivals"],
        total_departures=results.coordinator_stats["total_departures"],
        average_wait_time=results.coordinator_stats["average_wait_time"],
        average_service_time=results.coordinator_stats["average_service_time"],
        average_system_time=results.coordinator_stats["average_system_time"],
        utilization=results.coordinator_stats["utilization"]
    )

    server_stats = {}
    for server_id, stats in results.server_stats.items():
        server_stats[server_id] = QueueStatisticsModel(
            queue_id=server_id,
            service_rate=stats["service_rate"],
            total_arrivals=stats["total_arrivals"],
            total_departures=stats["total_departures"],
            average_wait_time=stats["average_wait_time"],
            average_service_time=stats["average_service_time"],
            average_system_time=stats["average_system_time"],
            utilization=stats["utilization"]
        )

    # Check stability (simple heuristic: max utilization)
    max_utilization = max(
        results.coordinator_stats["utilization"],
        max(s["utilization"] for s in results.server_stats.values()) if results.server_stats else 0
    )
    is_stable = max_utilization < 0.95

    response = SimulationResultsModel(
        config=session["config"],
        total_requests_arrived=results.total_requests_arrived,
        total_requests_completed=results.total_requests_completed,
        average_system_time=results.average_system_time,
        average_customers_in_system=0.0,  # TODO: calculate from time series
        coordinator_stats=coordinator_stats,
        server_stats=server_stats,
        is_stable=is_stable,
        stability_notes="High utilization detected" if not is_stable else None
    )

    return response


@router.delete("/results/{session_id}")
async def delete_simulation_results(session_id: str):
    """
    Delete simulation results.

    Args:
        session_id: Session identifier

    Returns:
        Confirmation message
    """
    if session_id not in simulation_sessions:
        raise HTTPException(status_code=404, detail="Session not found")

    del simulation_sessions[session_id]

    return {"message": "Session deleted successfully"}


@router.get("/sessions")
async def list_simulation_sessions():
    """
    List all simulation sessions.

    Returns:
        List of session IDs and their status
    """
    sessions = []
    for session_id, session in simulation_sessions.items():
        sessions.append({
            "session_id": session_id,
            "status": session["status"],
            "total_requests": session["results"].total_requests_completed
        })

    return {"sessions": sessions, "count": len(sessions)}