jackson.py 11,3 ko
Newer Older
"""
Jackson's theorem implementation for open queueing networks.

Jackson's Theorem (1957):
For an open network of M/M/1 queues with:
- External Poisson arrivals
- Exponential service times
- Probabilistic routing
- FIFO discipline

The network can be analyzed as independent M/M/1 queues with effective arrival rates.

Key formulas:
- Effective arrival rate: λᵢ = λ₀ + Σⱼ λⱼ * Pⱼᵢ
- Utilization: ρᵢ = λᵢ / μᵢ
- Stability: ρᵢ < 1 for all queues
- Average customers: Lᵢ = ρᵢ / (1 - ρᵢ)
- Average time: Wᵢ = Lᵢ / λᵢ (Little's Law)
"""
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import math


@dataclass
class QueueAnalytics:
    """Analytical results for a single queue."""
    queue_id: str
    service_rate: float  # μ
    arrival_rate: float  # λ (effective)
    utilization: float  # ρ = λ/μ
    is_stable: bool  # ρ < 1
    average_customers: Optional[float] = None  # L = ρ/(1-ρ)
    average_time: Optional[float] = None  # W = L/λ
    average_wait_time: Optional[float] = None  # Wq = W - 1/μ
    average_queue_length: Optional[float] = None  # Lq = λ * Wq


@dataclass
class NetworkAnalytics:
    """Analytical results for the entire network."""
    is_stable: bool
    coordinator: QueueAnalytics
    servers: Dict[str, QueueAnalytics]
    total_average_customers: float  # L = Σ Lᵢ
    total_average_time: float  # W = L / λ₀ (Little's Law)
    instability_reason: Optional[str] = None


class JacksonAnalyzer:
    """
    Analyzer using Jackson's theorem for open queueing networks.

    Network topology:
        External → Coordinator → (p) Exit
                              → (q₁) Server 1 → Coordinator
                              → (q₂) Server 2 → Coordinator
                              → (qₙ) Server n → Coordinator
    """

    def __init__(
        self,
        external_arrival_rate: float,
        coordinator_service_rate: float,
        coordinator_exit_prob: float,
        server_service_rates: List[float],
        server_routing_probs: List[float]
    ):
        """
        Initialize the analyzer.

        Args:
            external_arrival_rate: λ₀ - external arrivals per time unit
            coordinator_service_rate: μc - coordinator service rate
            coordinator_exit_prob: p - probability of exiting after coordinator
            server_service_rates: [μ₁, μ₂, ..., μₙ] - server service rates
            server_routing_probs: [q₁, q₂, ..., qₙ] - routing probabilities to servers
        """
        self.lambda_0 = external_arrival_rate
        self.mu_c = coordinator_service_rate
        self.p = coordinator_exit_prob
        self.mu_servers = server_service_rates
        self.q_servers = server_routing_probs
        self.n_servers = len(server_service_rates)

        # Validate inputs
        if self.lambda_0 <= 0:
            raise ValueError(f"External arrival rate must be positive, got {self.lambda_0}")
        if self.mu_c <= 0:
            raise ValueError(f"Coordinator service rate must be positive, got {self.mu_c}")
        if not (0 <= self.p <= 1):
            raise ValueError(f"Exit probability must be in [0,1], got {self.p}")

        total_prob = self.p + sum(self.q_servers)
        if not (0.99 <= total_prob <= 1.01):
            raise ValueError(
                f"Exit probability + routing probabilities must sum to 1.0, got {total_prob}"
            )

    def calculate_effective_arrival_rates(self) -> Dict[str, float]:
        """
        Calculate effective arrival rates for all queues.

        For this specific topology:
        - Coordinator: λc = λ₀ (all external arrivals go through coordinator)
        - Server i: λᵢ = λ₀ * qᵢ (fraction routed to server i)

        Note: In a more complex network, we'd need to solve a system of equations.

        Returns:
            Dict mapping queue_id to effective arrival rate
        """
        arrival_rates = {
            "coordinator": self.lambda_0
        }

        for i, q_i in enumerate(self.q_servers):
            server_id = f"server_{i+1}"
            arrival_rates[server_id] = self.lambda_0 * q_i

        return arrival_rates

    def calculate_utilizations(self, arrival_rates: Dict[str, float]) -> Dict[str, float]:
        """
        Calculate utilization ρ = λ/μ for all queues.

        Args:
            arrival_rates: Effective arrival rates per queue

        Returns:
            Dict mapping queue_id to utilization
        """
        utilizations = {
            "coordinator": arrival_rates["coordinator"] / self.mu_c
        }

        for i, mu_i in enumerate(self.mu_servers):
            server_id = f"server_{i+1}"
            utilizations[server_id] = arrival_rates[server_id] / mu_i

        return utilizations

    def check_stability(self, utilizations: Dict[str, float]) -> Tuple[bool, Optional[str]]:
        """
        Check if the network is stable (all ρᵢ < 1).

        Args:
            utilizations: Utilization per queue

        Returns:
            Tuple of (is_stable, reason_if_unstable)
        """
        unstable_queues = [
            queue_id for queue_id, rho in utilizations.items()
            if rho >= 1.0
        ]

        if unstable_queues:
            reason = f"Unstable queues (ρ ≥ 1): {', '.join(unstable_queues)}"
            for queue_id in unstable_queues:
                reason += f"\n  {queue_id}: ρ = {utilizations[queue_id]:.4f}"
            return False, reason

        return True, None

    def calculate_queue_analytics(
        self,
        queue_id: str,
        service_rate: float,
        arrival_rate: float,
        utilization: float
    ) -> QueueAnalytics:
        """
        Calculate analytical metrics for a single M/M/1 queue.

        Args:
            queue_id: Queue identifier
            service_rate: μ
            arrival_rate: λ (effective)
            utilization: ρ = λ/μ

        Returns:
            QueueAnalytics with all metrics
        """
        is_stable = utilization < 1.0

        if is_stable:
            # Average number of customers in system: L = ρ/(1-ρ)
            L = utilization / (1 - utilization)

            # Average time in system: W = L/λ (Little's Law)
            W = L / arrival_rate if arrival_rate > 0 else 0

            # Average waiting time in queue: Wq = W - 1/μ
            Wq = W - (1 / service_rate)

            # Average queue length: Lq = λ * Wq (Little's Law for queue)
            Lq = arrival_rate * Wq

            return QueueAnalytics(
                queue_id=queue_id,
                service_rate=service_rate,
                arrival_rate=arrival_rate,
                utilization=utilization,
                is_stable=True,
                average_customers=L,
                average_time=W,
                average_wait_time=Wq,
                average_queue_length=Lq
            )
        else:
            # Unstable - metrics are undefined (infinite)
            return QueueAnalytics(
                queue_id=queue_id,
                service_rate=service_rate,
                arrival_rate=arrival_rate,
                utilization=utilization,
                is_stable=False,
                average_customers=math.inf,
                average_time=math.inf,
                average_wait_time=math.inf,
                average_queue_length=math.inf
            )

    def analyze(self) -> NetworkAnalytics:
        """
        Perform complete analytical analysis using Jackson's theorem.

        Returns:
            NetworkAnalytics with all results
        """
        # Step 1: Calculate effective arrival rates
        arrival_rates = self.calculate_effective_arrival_rates()

        # Step 2: Calculate utilizations
        utilizations = self.calculate_utilizations(arrival_rates)

        # Step 3: Check stability
        is_stable, instability_reason = self.check_stability(utilizations)

        # Step 4: Calculate per-queue analytics
        coordinator_analytics = self.calculate_queue_analytics(
            "coordinator",
            self.mu_c,
            arrival_rates["coordinator"],
            utilizations["coordinator"]
        )

        server_analytics = {}
        for i, mu_i in enumerate(self.mu_servers):
            server_id = f"server_{i+1}"
            server_analytics[server_id] = self.calculate_queue_analytics(
                server_id,
                mu_i,
                arrival_rates[server_id],
                utilizations[server_id]
            )

        # Step 5: Calculate system-wide metrics (if stable)
        if is_stable:
            # Total average customers: L = Σ Lᵢ
            total_L = coordinator_analytics.average_customers
            for analytics in server_analytics.values():
                total_L += analytics.average_customers

            # Total average time: W = L / λ₀ (Little's Law for entire network)
            total_W = total_L / self.lambda_0

        else:
            total_L = math.inf
            total_W = math.inf

        return NetworkAnalytics(
            is_stable=is_stable,
            coordinator=coordinator_analytics,
            servers=server_analytics,
            total_average_customers=total_L,
            total_average_time=total_W,
            instability_reason=instability_reason
        )

    def print_analysis(self) -> None:
        """Print a formatted analysis report."""
        results = self.analyze()

        print("=" * 80)
        print("JACKSON'S THEOREM ANALYTICAL ANALYSIS")
        print("=" * 80)

        print(f"\nNetwork Configuration:")
        print(f"  External arrival rate (λ₀): {self.lambda_0:.6f}")
        print(f"  Coordinator service rate (μc): {self.mu_c:.6f}")
        print(f"  Exit probability (p): {self.p:.3f}")
        print(f"  Number of servers: {self.n_servers}")

        print(f"\nStability: {'✅ STABLE' if results.is_stable else '❌ UNSTABLE'}")
        if results.instability_reason:
            print(f"  {results.instability_reason}")

        print(f"\n📊 Coordinator Queue:")
        coord = results.coordinator
        print(f"  λc = {coord.arrival_rate:.6f}")
        print(f"  μc = {coord.service_rate:.6f}")
        print(f"  ρc = {coord.utilization:.4f}")
        if coord.is_stable:
            print(f"  L = {coord.average_customers:.4f} customers")
            print(f"  W = {coord.average_time:.4f} time units")
            print(f"  Wq = {coord.average_wait_time:.4f} time units")

        for server_id, analytics in results.servers.items():
            print(f"\n📊 {server_id}:")
            print(f"  λ = {analytics.arrival_rate:.6f}")
            print(f"  μ = {analytics.service_rate:.6f}")
            print(f"  ρ = {analytics.utilization:.4f}")
            if analytics.is_stable:
                print(f"  L = {analytics.average_customers:.4f} customers")
                print(f"  W = {analytics.average_time:.4f} time units")
                print(f"  Wq = {analytics.average_wait_time:.4f} time units")
            else:
                print(f"  ⚠️  UNSTABLE - Infinite queue")

        if results.is_stable:
            print(f"\n🌐 System-Wide Metrics:")
            print(f"  Total L = {results.total_average_customers:.4f} customers")
            print(f"  Total W = {results.total_average_time:.4f} time units")
            print(f"  Verification (Little's Law): L = λ₀ * W = {self.lambda_0 * results.total_average_time:.4f}")

        print("=" * 80)