Newer
Older
"""
API endpoints for simulation control.
"""
from typing import Dict, Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from ..models.config import SimulationConfigModel
from ..models.results import (
SimulationResultsModel,
QueueStatisticsModel,
TimeSeriesDataModel,
HistogramDataModel
)
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from ..core.simulation import Simulator
from ..analytics.jackson import JacksonAnalyzer
from ..analytics.comparison import compare_results
router = APIRouter(prefix="/api/simulation", tags=["simulation"])
# In-memory storage for simulation sessions
# In production, use Redis or a database
simulation_sessions: Dict[str, dict] = {}
class SimulationSessionResponse(BaseModel):
"""Response when starting a simulation."""
session_id: str
status: str
message: str
@router.post("/start", response_model=SimulationSessionResponse)
async def start_simulation(config: SimulationConfigModel):
"""
Start a new simulation with the given configuration.
Args:
config: Simulation configuration
Returns:
Session information
"""
try:
# Convert Pydantic model to internal config
internal_config = config.to_simulation_config()
# Create simulator
simulator = Simulator(internal_config)
# Run simulation (synchronous for now - could be async/background task)
results = simulator.run()
# Generate session ID
import uuid
session_id = str(uuid.uuid4())
# Store results
simulation_sessions[session_id] = {
"config": config.model_dump(),
"results": results,
"status": "completed"
}
return SimulationSessionResponse(
session_id=session_id,
status="completed",
message="Simulation completed successfully"
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/results/{session_id}")
async def get_simulation_results(session_id: str):
"""
Get results for a completed simulation.
Args:
session_id: Session identifier
Returns:
Simulation results
"""
if session_id not in simulation_sessions:
raise HTTPException(status_code=404, detail="Session not found")
session = simulation_sessions[session_id]
results = session["results"]
# Convert to response model
coordinator_stats = QueueStatisticsModel(
queue_id="coordinator",
service_rate=results.coordinator_stats["service_rate"],
total_arrivals=results.coordinator_stats["total_arrivals"],
total_departures=results.coordinator_stats["total_departures"],
average_wait_time=results.coordinator_stats["average_wait_time"],
average_service_time=results.coordinator_stats["average_service_time"],
average_system_time=results.coordinator_stats["average_system_time"],
utilization=results.coordinator_stats["utilization"]
)
server_stats = {}
for server_id, stats in results.server_stats.items():
server_stats[server_id] = QueueStatisticsModel(
queue_id=server_id,
service_rate=stats["service_rate"],
total_arrivals=stats["total_arrivals"],
total_departures=stats["total_departures"],
average_wait_time=stats["average_wait_time"],
average_service_time=stats["average_service_time"],
average_system_time=stats["average_system_time"],
utilization=stats["utilization"]
)
# Check stability (simple heuristic: max utilization)
max_utilization = max(
results.coordinator_stats["utilization"],
max(s["utilization"] for s in results.server_stats.values()) if results.server_stats else 0
)
is_stable = max_utilization < 0.95
# Convert time series data if available
time_series = None
if results.time_series_data:
time_series = TimeSeriesDataModel(
timestamps=results.time_series_data["timestamps"],
customers_in_system=results.time_series_data["customers_in_system"],
customers_per_queue=results.time_series_data["customers_per_queue"],
cumulative_arrivals_per_queue=results.time_series_data["cumulative_arrivals_per_queue"],
cumulative_departures_per_queue=results.time_series_data["cumulative_departures_per_queue"]
)
# Convert histogram data if available
histogram = None
if results.histogram_data:
histogram = HistogramDataModel(
bins=results.histogram_data["bins"],
frequencies=results.histogram_data["frequencies"],
min_value=results.histogram_data["min_value"],
max_value=results.histogram_data["max_value"],
mean=results.histogram_data["mean"],
std_dev=results.histogram_data["std_dev"]
)
response = SimulationResultsModel(
config=session["config"],
total_requests_arrived=results.total_requests_arrived,
total_requests_completed=results.total_requests_completed,
average_system_time=results.average_system_time,
average_customers_in_system=results.average_customers_in_system,
coordinator_stats=coordinator_stats,
server_stats=server_stats,
time_series=time_series,
processing_time_histogram=histogram,
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
is_stable=is_stable,
stability_notes="High utilization detected" if not is_stable else None
)
return response
@router.delete("/results/{session_id}")
async def delete_simulation_results(session_id: str):
"""
Delete simulation results.
Args:
session_id: Session identifier
Returns:
Confirmation message
"""
if session_id not in simulation_sessions:
raise HTTPException(status_code=404, detail="Session not found")
del simulation_sessions[session_id]
return {"message": "Session deleted successfully"}
@router.get("/sessions")
async def list_simulation_sessions():
"""
List all simulation sessions.
Returns:
List of session IDs and their status
"""
sessions = []
for session_id, session in simulation_sessions.items():
sessions.append({
"session_id": session_id,
"status": session["status"],
"total_requests": session["results"].total_requests_completed
})
return {"sessions": sessions, "count": len(sessions)}