You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

315 lines
10 KiB

from typing import NamedTuple, Dict, Tuple
from enum import Enum
class LoadBalancerSpec(NamedTuple):
capacity: float # e.g. float('inf')
baseLatency: int # ms
cost: int
class WebServerSmall(NamedTuple):
capacity: int
baseLatency: int
penaltyPerRPS: float
cost: int
class WebServerMedium(NamedTuple):
capacity: int
baseLatency: int
penaltyPerRPS: float
cost: int
class CacheStandard(NamedTuple):
capacity: int
baseLatency: int
penaltyPer10RPS: float
hitRates: Dict[str, float]
cost: int
class CacheLarge(NamedTuple):
capacity: int
baseLatency: int
penaltyPer10RPS: float
hitRates: Dict[str, float]
cost: int
class DbReadReplica(NamedTuple):
readCapacity: int # RPS
baseReadLatency: int # ms
penaltyPer10RPS: float
cost: int
class ComponentSpec(NamedTuple):
loadBalancer: LoadBalancerSpec
webServerSmall: WebServerSmall
webServerMedium: WebServerMedium
cacheStandard: CacheStandard
cacheLarge: CacheLarge
dbReadReplica: DbReadReplica
class Design(NamedTuple):
numWebServerSmall: int
numWebServerMedium: int
cacheType: str # Either "cacheStandard" or "cacheLarge"
cacheTTL: str
numDbReplicas: int
promotionDelaySeconds: int
class Level(NamedTuple):
id: int
description: str
targetRPS: int
maxP95Latency: int
maxMonthlyCost: int
requiredAvailability: int
failureEvents: list
componentSpec: ComponentSpec
simulatedDurationSeconds: int
class CacheType(Enum):
STANDARD = "cacheStandard"
LARGE = "cacheLarge"
class LevelSimulator:
def __init__(self, level: Level, design: Design):
self.level = level
self.design = design
self.specs = self.level.componentSpec
def compute_cost(self) -> int:
s = self.specs
d = self.design
cost_lb = s.loadBalancer.cost
cost_ws_small = d.numWebServerSmall * s.webServerSmall.cost
cost_ws_medium = d.numWebServerMedium * s.webServerMedium.cost
if d.cacheType == CacheType.STANDARD.value:
cost_cache = s.cacheStandard.cost
else:
cost_cache = s.cacheLarge.cost
# “1” here stands for the master; add d.numDbReplicas for replicas
cost_db = s.dbReadReplica.cost * (1 + d.numDbReplicas)
return cost_lb + cost_ws_small + cost_ws_medium + cost_cache + cost_db
def compute_rps(self) -> Tuple[float, float]:
"""
Returns (hits_rps, misses_rps) for a read workload of size level.targetRPS.
"""
s = self.specs
d = self.design
total_rps = self.level.targetRPS
if d.cacheType == CacheType.STANDARD.value:
hit_rate = s.cacheStandard.hitRates[d.cacheTTL]
else:
hit_rate = s.cacheLarge.hitRates[d.cacheTTL]
hits_rps = total_rps * hit_rate
misses_rps = total_rps * (1 - hit_rate)
return hits_rps, misses_rps
def compute_latencies(self) -> Dict[str, float]:
"""
Computes:
- L95_ws (worst P95 among small/medium, given misses_rps)
- L95_cache (baseLatency)
- L95_db_read (based on misses_rps and replicas)
- L95_total_read = miss_path (since misses are slower)
"""
s = self.specs
d = self.design
# 1) First compute hits/misses
_, misses_rps = self.compute_rps()
# 2) Web server P95
cap_small = s.webServerSmall.capacity
cap_medium = s.webServerMedium.capacity
weighted_count = d.numWebServerSmall + (2 * d.numWebServerMedium)
if weighted_count == 0:
L95_ws = float("inf")
else:
load_per_weighted = misses_rps / weighted_count
L95_ws_small = 0.0
if d.numWebServerSmall > 0:
if load_per_weighted <= cap_small:
L95_ws_small = s.webServerSmall.baseLatency
else:
L95_ws_small = (
s.webServerSmall.baseLatency
+ s.webServerSmall.penaltyPerRPS
* (load_per_weighted - cap_small)
)
L95_ws_medium = 0.0
# <<== FIXED: change “> 00” to “> 0”
if d.numWebServerMedium > 0:
if load_per_weighted <= cap_medium:
L95_ws_medium = s.webServerMedium.baseLatency
else:
L95_ws_medium = (
s.webServerMedium.baseLatency
+ s.webServerMedium.penaltyPerRPS
* (load_per_weighted - cap_medium)
)
L95_ws = max(L95_ws_small, L95_ws_medium)
# 3) Cache P95
if d.cacheType == CacheType.STANDARD.value:
L95_cache = s.cacheStandard.baseLatency
else:
L95_cache = s.cacheLarge.baseLatency
# 4) DB read P95
read_cap = s.dbReadReplica.readCapacity
base_read_lat = s.dbReadReplica.baseReadLatency
pen_per10 = s.dbReadReplica.penaltyPer10RPS
num_reps = d.numDbReplicas
if num_reps == 0:
if misses_rps <= read_cap:
L95_db_read = base_read_lat
else:
excess = misses_rps - read_cap
L95_db_read = base_read_lat + pen_per10 * (excess / 10.0)
else:
load_per_rep = misses_rps / num_reps
if load_per_rep <= read_cap:
L95_db_read = base_read_lat
else:
excess = load_per_rep - read_cap
L95_db_read = base_read_lat + pen_per10 * (excess / 10.0)
# 5) End-to-end P95 read = miss_path
L_lb = s.loadBalancer.baseLatency
miss_path = L_lb + L95_ws + L95_db_read
L95_total_read = miss_path
return {
"L95_ws": L95_ws,
"L95_cache": L95_cache,
"L95_db_read": L95_db_read,
"L95_total_read": L95_total_read,
}
def compute_availability(self) -> float:
"""
If failureEvents=[], just return 100.0.
Otherwise:
- For each failure (e.g. DB master crash at t_crash),
if numDbReplicas==0 → downtime = sim_duration - t_crash
else if design has auto_failover:
downtime = failover_delay
else:
downtime = sim_duration - t_crash
- availability = (sim_duration - total_downtime) / sim_duration * 100
"""
sim_duration = self.level.simulatedDurationSeconds # you’d need this field
total_downtime = 0
for event in self.level.failureEvents:
t_crash = event["time"]
if event["type"] == "DB_MASTER_CRASH":
if self.design.numDbReplicas == 0:
total_downtime += (sim_duration - t_crash)
else:
# assume a fixed promotion delay (e.g. 5s)
delay = self.design.promotionDelaySeconds
total_downtime += delay
# (handle other event types if needed)
return (sim_duration - total_downtime) / sim_duration * 100
def validate(self) -> dict:
"""
1) Cost check
2) Throughput checks (cache, DB, WS)
3) Latency check
4) Availability check (if there are failureEvents)
Return { "pass": True, "metrics": {...} } or { "pass": False, "reason": "..." }.
"""
total_cost = self.compute_cost()
if total_cost > self.level.maxMonthlyCost:
return { "pass": False, "reason": f"Budget ${total_cost} > ${self.level.maxMonthlyCost}" }
hits_rps, misses_rps = self.compute_rps()
# Cache capacity
cache_cap = (
self.specs.cacheStandard.capacity
if self.design.cacheType == CacheType.STANDARD.value
else self.specs.cacheLarge.capacity
)
if hits_rps > cache_cap:
return { "pass": False, "reason": f"Cache overloaded ({hits_rps:.1f} RPS > {cache_cap})" }
# DB capacity
db_cap = self.specs.dbReadReplica.readCapacity
if self.design.numDbReplicas == 0:
if misses_rps > db_cap:
return { "pass": False, "reason": f"DB overloaded ({misses_rps:.1f} RPS > {db_cap})" }
else:
per_rep = misses_rps / self.design.numDbReplicas
if per_rep > db_cap:
return {
"pass": False,
"reason": f"DB replicas overloaded ({per_rep:.1f} RPS/replica > {db_cap})"
}
# WS capacity
total_ws_cap = (
self.design.numWebServerSmall * self.specs.webServerSmall.capacity
+ self.design.numWebServerMedium * self.specs.webServerMedium.capacity
)
if misses_rps > total_ws_cap:
return {
"pass": False,
"reason": f"Web servers overloaded ({misses_rps:.1f} RPS > {total_ws_cap})"
}
# Latency
lat = self.compute_latencies()
if lat["L95_total_read"] > self.level.maxP95Latency:
return {
"pass": False,
"reason": f"P95 too high ({lat['L95_total_read']:.1f} ms > {self.level.maxP95Latency} ms)"
}
# Availability (only if failureEvents is nonempty)
availability = 100.0
if self.level.failureEvents:
availability = self.compute_availability()
if availability < self.level.requiredAvailability:
return {
"pass": False,
"reason": f"Availability too low ({availability:.1f}% < "
f"{self.level.requiredAvailability}%)"
}
# If we reach here, all checks passed
return {
"pass": True,
"metrics": {
"cost": total_cost,
"p95": lat["L95_total_read"],
"achievedRPS": self.level.targetRPS,
"availability": (
100.0 if not self.level.failureEvents else availability
)
}
}