Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
"""
Jackson's theorem implementation for open queueing networks.
Jackson's Theorem (1957):
For an open network of M/M/1 queues with:
- External Poisson arrivals
- Exponential service times
- Probabilistic routing
- FIFO discipline
The network can be analyzed as independent M/M/1 queues with effective arrival rates.
Key formulas:
- Effective arrival rate: λᵢ = λ₀ + Σⱼ λⱼ * Pⱼᵢ
- Utilization: ρᵢ = λᵢ / μᵢ
- Stability: ρᵢ < 1 for all queues
- Average customers: Lᵢ = ρᵢ / (1 - ρᵢ)
- Average time: Wᵢ = Lᵢ / λᵢ (Little's Law)
"""
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import math
@dataclass
class QueueAnalytics:
"""Analytical results for a single queue."""
queue_id: str
service_rate: float # μ
arrival_rate: float # λ (effective)
utilization: float # ρ = λ/μ
is_stable: bool # ρ < 1
average_customers: Optional[float] = None # L = ρ/(1-ρ)
average_time: Optional[float] = None # W = L/λ
average_wait_time: Optional[float] = None # Wq = W - 1/μ
average_queue_length: Optional[float] = None # Lq = λ * Wq
@dataclass
class NetworkAnalytics:
"""Analytical results for the entire network."""
is_stable: bool
coordinator: QueueAnalytics
servers: Dict[str, QueueAnalytics]
total_average_customers: float # L = Σ Lᵢ
total_average_time: float # W = L / λ₀ (Little's Law)
external_arrival_rate: float # λ₀ - needed for Little's Law
instability_reason: Optional[str] = None
class JacksonAnalyzer:
"""
Analyzer using Jackson's theorem for open queueing networks.
Network topology (with feedback loop):
External (λ) → Coordinator → (p) Exit
→ (q₁) Server 1 → Coordinator
→ (q₂) Server 2 → Coordinator
→ (qₙ) Server n → Coordinator
Where: p + Σqᵢ = 1 (all probabilities sum to 1)
Traffic equations:
γ₀ = λ + Σᵢ γᵢ (coordinator receives external + all returns)
γᵢ = qᵢ × γ₀ (each server receives its fraction)
Solution:
γ₀ = λ / p (coordinator sees amplified traffic due to feedback)
γᵢ = (λ / p) × qᵢ (each server gets its fraction)
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
def __init__(
self,
external_arrival_rate: float,
coordinator_service_rate: float,
coordinator_exit_prob: float,
server_service_rates: List[float],
server_routing_probs: List[float]
):
"""
Initialize the analyzer.
Args:
external_arrival_rate: λ₀ - external arrivals per time unit
coordinator_service_rate: μc - coordinator service rate
coordinator_exit_prob: p - probability of exiting after coordinator
server_service_rates: [μ₁, μ₂, ..., μₙ] - server service rates
server_routing_probs: [q₁, q₂, ..., qₙ] - routing probabilities to servers
"""
self.lambda_0 = external_arrival_rate
self.mu_c = coordinator_service_rate
self.p = coordinator_exit_prob
self.mu_servers = server_service_rates
self.q_servers = server_routing_probs
self.n_servers = len(server_service_rates)
# Validate inputs
if self.lambda_0 <= 0:
raise ValueError(f"External arrival rate must be positive, got {self.lambda_0}")
if self.mu_c <= 0:
raise ValueError(f"Coordinator service rate must be positive, got {self.mu_c}")
if not (0 <= self.p <= 1):
raise ValueError(f"Exit probability must be in [0,1], got {self.p}")
total_prob = self.p + sum(self.q_servers)
if not (0.99 <= total_prob <= 1.01):
raise ValueError(
f"Exit probability + routing probabilities must sum to 1.0, got {total_prob}"
)
def calculate_effective_arrival_rates(self) -> Dict[str, float]:
"""
Calculate effective arrival rates for all queues using Jackson's theorem.
Traffic equations (servers return to coordinator with probability 1):
γ₀ = λ + Σᵢ γᵢ (coordinator receives external + all server returns)
γᵢ = qᵢ × γ₀ (each server receives its fraction)
Solving the system:
Substitute γᵢ into first equation:
γ₀ = λ + Σᵢ (qᵢ × γ₀)
γ₀ = λ + γ₀ × Σqᵢ
γ₀ = λ + γ₀ × (1 - p) (since p + Σqᵢ = 1)
γ₀ - γ₀(1 - p) = λ
γ₀ × p = λ
γ₀ = λ / p
Then: γᵢ = qᵢ × γ₀ = qᵢ × (λ / p)
Dict mapping queue_id to effective arrival rate (γ₀ for coordinator, γᵢ for servers)
# Calculate coordinator effective arrival rate: γ₀ = λ / p
if self.p == 0:
raise ValueError("Exit probability p cannot be 0 (would create infinite feedback loop)")
gamma_0 = self.lambda_0 / self.p
"coordinator": gamma_0
# Calculate server effective arrival rates: γᵢ = qᵢ × γ₀
for i, q_i in enumerate(self.q_servers):
server_id = f"server_{i+1}"
arrival_rates[server_id] = gamma_0 * q_i
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
return arrival_rates
def calculate_utilizations(self, arrival_rates: Dict[str, float]) -> Dict[str, float]:
"""
Calculate utilization ρ = λ/μ for all queues.
Args:
arrival_rates: Effective arrival rates per queue
Returns:
Dict mapping queue_id to utilization
"""
utilizations = {
"coordinator": arrival_rates["coordinator"] / self.mu_c
}
for i, mu_i in enumerate(self.mu_servers):
server_id = f"server_{i+1}"
utilizations[server_id] = arrival_rates[server_id] / mu_i
return utilizations
def check_stability(self, utilizations: Dict[str, float]) -> Tuple[bool, Optional[str]]:
"""
Check if the network is stable (all ρᵢ < 1).
Args:
utilizations: Utilization per queue
Returns:
Tuple of (is_stable, reason_if_unstable)
"""
unstable_queues = [
queue_id for queue_id, rho in utilizations.items()
if rho >= 1.0
]
if unstable_queues:
reason = f"Unstable queues (ρ ≥ 1): {', '.join(unstable_queues)}"
for queue_id in unstable_queues:
reason += f"\n {queue_id}: ρ = {utilizations[queue_id]:.4f}"
return False, reason
return True, None
def calculate_queue_analytics(
self,
queue_id: str,
service_rate: float,
arrival_rate: float,
utilization: float
) -> QueueAnalytics:
"""
Calculate analytical metrics for a single M/M/1 queue.
Args:
queue_id: Queue identifier
service_rate: μ
arrival_rate: λ (effective)
utilization: ρ = λ/μ
Returns:
QueueAnalytics with all metrics
"""
is_stable = utilization < 1.0
if is_stable:
# Average number of customers in system: L = ρ/(1-ρ)
L = utilization / (1 - utilization)
# Average time in system: W = L/λ (Little's Law)
W = L / arrival_rate if arrival_rate > 0 else 0
# Average waiting time in queue: Wq = W - 1/μ
Wq = W - (1 / service_rate)
# Average queue length: Lq = λ * Wq (Little's Law for queue)
Lq = arrival_rate * Wq
return QueueAnalytics(
queue_id=queue_id,
service_rate=service_rate,
arrival_rate=arrival_rate,
utilization=utilization,
is_stable=True,
average_customers=L,
average_time=W,
average_wait_time=Wq,
average_queue_length=Lq
)
else:
# Unstable - metrics are undefined (infinite)
return QueueAnalytics(
queue_id=queue_id,
service_rate=service_rate,
arrival_rate=arrival_rate,
utilization=utilization,
is_stable=False,
average_customers=math.inf,
average_time=math.inf,
average_wait_time=math.inf,
average_queue_length=math.inf
)
def analyze(self) -> NetworkAnalytics:
"""
Perform complete analytical analysis using Jackson's theorem.
Returns:
NetworkAnalytics with all results
"""
# Step 1: Calculate effective arrival rates
arrival_rates = self.calculate_effective_arrival_rates()
# Step 2: Calculate utilizations
utilizations = self.calculate_utilizations(arrival_rates)
# Step 3: Check stability
is_stable, instability_reason = self.check_stability(utilizations)
# Step 4: Calculate per-queue analytics
coordinator_analytics = self.calculate_queue_analytics(
"coordinator",
self.mu_c,
arrival_rates["coordinator"],
utilizations["coordinator"]
)
server_analytics = {}
for i, mu_i in enumerate(self.mu_servers):
server_id = f"server_{i+1}"
server_analytics[server_id] = self.calculate_queue_analytics(
server_id,
mu_i,
arrival_rates[server_id],
utilizations[server_id]
)
# Step 5: Calculate system-wide metrics (if stable)
if is_stable:
# Total average customers: L = Σ Lᵢ
total_L = coordinator_analytics.average_customers
for analytics in server_analytics.values():
total_L += analytics.average_customers
# Total average time: W = L / λ₀ (Little's Law for entire network)
total_W = total_L / self.lambda_0
else:
total_L = math.inf
total_W = math.inf
return NetworkAnalytics(
is_stable=is_stable,
coordinator=coordinator_analytics,
servers=server_analytics,
total_average_customers=total_L,
total_average_time=total_W,
external_arrival_rate=self.lambda_0,
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
instability_reason=instability_reason
)
def print_analysis(self) -> None:
"""Print a formatted analysis report."""
results = self.analyze()
print("=" * 80)
print("JACKSON'S THEOREM ANALYTICAL ANALYSIS")
print("=" * 80)
print(f"\nNetwork Configuration:")
print(f" External arrival rate (λ₀): {self.lambda_0:.6f}")
print(f" Coordinator service rate (μc): {self.mu_c:.6f}")
print(f" Exit probability (p): {self.p:.3f}")
print(f" Number of servers: {self.n_servers}")
print(f"\nStability: {'✅ STABLE' if results.is_stable else '❌ UNSTABLE'}")
if results.instability_reason:
print(f" {results.instability_reason}")
print(f"\n📊 Coordinator Queue:")
coord = results.coordinator
print(f" λc = {coord.arrival_rate:.6f}")
print(f" μc = {coord.service_rate:.6f}")
print(f" ρc = {coord.utilization:.4f}")
if coord.is_stable:
print(f" L = {coord.average_customers:.4f} customers")
print(f" W = {coord.average_time:.4f} time units")
print(f" Wq = {coord.average_wait_time:.4f} time units")
for server_id, analytics in results.servers.items():
print(f"\n📊 {server_id}:")
print(f" λ = {analytics.arrival_rate:.6f}")
print(f" μ = {analytics.service_rate:.6f}")
print(f" ρ = {analytics.utilization:.4f}")
if analytics.is_stable:
print(f" L = {analytics.average_customers:.4f} customers")
print(f" W = {analytics.average_time:.4f} time units")
print(f" Wq = {analytics.average_wait_time:.4f} time units")
else:
print(f" ⚠️ UNSTABLE - Infinite queue")
if results.is_stable:
print(f"\n🌐 System-Wide Metrics:")
print(f" Total L = {results.total_average_customers:.4f} customers")
print(f" Total W = {results.total_average_time:.4f} time units")
print(f" Verification (Little's Law): L = λ₀ * W = {self.lambda_0 * results.total_average_time:.4f}")
print("=" * 80)