From a29350bd3c30afbc85b5803eacc4275053a90fa0 Mon Sep 17 00:00:00 2001 From: Stephanie Gredell Date: Wed, 18 Jun 2025 17:20:25 -0700 Subject: [PATCH] Add simulation nodes --- internal/simulation/cdnnode.go | 24 ++-- internal/simulation/databasenode.go | 20 ++- internal/simulation/engine.go | 121 +++++++++++++++++-- internal/simulation/loadbalancer.go | 2 +- internal/simulation/messagequeuenode.go | 93 ++++++++++++++ internal/simulation/microservicenode.go | 103 ++++++++++++++++ internal/simulation/monitoringnode.go | 66 ++++++++++ internal/simulation/thirdpartyservicenode.go | 77 ++++++++++++ internal/simulation/webservernode.go | 21 +++- 9 files changed, 484 insertions(+), 43 deletions(-) create mode 100644 internal/simulation/messagequeuenode.go create mode 100644 internal/simulation/microservicenode.go create mode 100644 internal/simulation/monitoringnode.go create mode 100644 internal/simulation/thirdpartyservicenode.go diff --git a/internal/simulation/cdnnode.go b/internal/simulation/cdnnode.go index 7dc0b5b..4774772 100644 --- a/internal/simulation/cdnnode.go +++ b/internal/simulation/cdnnode.go @@ -12,14 +12,14 @@ type CDNNode struct { CachingStrategy string Compression string HTTP2 string - CacheHitRate float64 // % of requests served from edge cache - CurrentLoad int // optional: can track active queue length - Queue []*Request // incoming request queue - EdgeNodes map[string]*CDNNode // future expansion: simulate geographic edge clusters + CacheHitRate float64 + CurrentLoad int + Queue []*Request + EdgeNodes map[string]*CDNNode Alive bool Targets []string - output []*Request // cache HIT responses - missQueue []*Request // cache MISSes to forward + output []*Request + missQueue []*Request } func (n *CDNNode) GetID() string { return n.ID } @@ -27,7 +27,7 @@ func (n *CDNNode) Type() string { return "cdn" } func (n *CDNNode) IsAlive() bool { return n.Alive } func (n *CDNNode) QueueState() []*Request { return n.Queue } -func (n *CDNNode) Tick(tick int) { +func (n *CDNNode) Tick(tick int, currentTimeMs int) { if len(n.Queue) == 0 { return } @@ -35,17 +35,15 @@ func (n *CDNNode) Tick(tick int) { maxProcessPerTick := 10 processCount := min(len(n.Queue), maxProcessPerTick) - // Avoid slice leak by reusing a cleared slice queue := n.Queue n.Queue = n.Queue[:0] for i := 0; i < processCount; i++ { req := queue[i] - // Simulate cache hit or miss hitRate := n.CacheHitRate if hitRate == 0 { - hitRate = 0.8 // default if unset + hitRate = 0.8 } if rand.Float64() < hitRate { @@ -61,9 +59,8 @@ func (n *CDNNode) Tick(tick int) { } } - // Optionally simulate cache expiration every 100 ticks - if tick%100 == 0 { - // e.g., simulate reduced hit rate or TTL expiration + if len(queue) > processCount { + n.Queue = append(n.Queue, queue[processCount:]...) } } @@ -81,7 +78,6 @@ func (n *CDNNode) Receive(req *Request) { func (n *CDNNode) Emit() []*Request { out := append(n.output, n.missQueue...) - // Clear for next tick n.output = n.output[:0] n.missQueue = n.missQueue[:0] diff --git a/internal/simulation/databasenode.go b/internal/simulation/databasenode.go index 0ed271b..99259f5 100644 --- a/internal/simulation/databasenode.go +++ b/internal/simulation/databasenode.go @@ -9,8 +9,8 @@ type DatabaseNode struct { Replicas []*DatabaseNode Alive bool Targets []string - output []*Request - replicationQueue []*Request + Output []*Request + ReplicationQueue []*Request } func (n *DatabaseNode) GetID() string { return n.ID } @@ -18,7 +18,7 @@ func (n *DatabaseNode) Type() string { return "database" } func (n *DatabaseNode) IsAlive() bool { return n.Alive } func (n *DatabaseNode) GetQueue() []*Request { return n.Queue } -func (n *DatabaseNode) Tick(tick int) { +func (n *DatabaseNode) Tick(tick int, currentTimeMs int) { if len(n.Queue) == 0 { return } @@ -35,12 +35,11 @@ func (n *DatabaseNode) Tick(tick int) { if req.Type == "READ" { req.LatencyMS += 20 req.Path = append(req.Path, n.ID) - n.output = append(n.output, req) + n.Output = append(n.Output, req) } else { req.LatencyMS += 50 req.Path = append(req.Path, n.ID) - // Replicate to replicas for _, replica := range n.Replicas { replicationReq := &Request{ ID: req.ID + "-repl", @@ -50,14 +49,13 @@ func (n *DatabaseNode) Tick(tick int) { Type: "REPLICATION", Path: append(append([]string{}, req.Path...), "->"+replica.ID), } - n.replicationQueue = append(n.replicationQueue, replicationReq) + n.ReplicationQueue = append(n.ReplicationQueue, replicationReq) } - n.output = append(n.output, req) + n.Output = append(n.Output, req) } } - // Apply queuing penalty if overloaded if len(n.Queue) > 10 { for _, req := range n.Queue { req.LatencyMS += 10 @@ -74,8 +72,8 @@ func (n *DatabaseNode) Receive(req *Request) { } func (n *DatabaseNode) Emit() []*Request { - out := append(n.output, n.replicationQueue...) - n.output = n.output[:0] - n.replicationQueue = n.replicationQueue[:0] + out := append(n.Output, n.ReplicationQueue...) + n.Output = n.Output[:0] + n.ReplicationQueue = n.ReplicationQueue[:0] return out } diff --git a/internal/simulation/engine.go b/internal/simulation/engine.go index 13d9d52..915c9db 100644 --- a/internal/simulation/engine.go +++ b/internal/simulation/engine.go @@ -14,9 +14,9 @@ type Request struct { } type SimulationNode interface { - ID() string + GetID() string Type() string - Tick(tick int) + Tick(tick int, currentTimeMs int) Receive(req *Request) Emit() []*Request IsAlive() bool @@ -35,18 +35,117 @@ type TickSnapshot struct { NodeHealth map[string]string } -func NewEngineFromDesign(design simulation.Design, duration int, tickMs int) *Engine { - nodes := design.ToSimulationNode() - nodeMap := make(make[string]SimulatSimulationNode) - for _, n := range nodes { - nodeMap[n.ID] = n +func NewEngineFromDesign(design design.Design, duration int, tickMs int) *Engine { + nodeMap := make(map[string]SimulationNode) + + for _, n := range design.Nodes { + var simNode SimulationNode + + switch n.Type { + case "webserver": + simNode = &WebServerNode{ + ID: n.ID, + Alive: true, + Queue: []*Request{}, + } + case "cache": + simNode = &CacheNode{ + ID: n.ID, + Label: n.Props["label"].(string), + CacheTTL: int(n.Props["cacheTTL"].(float64)), + MaxEntries: int(n.Props["maxEntries"].(float64)), + EvictionPolicy: n.Props["evictionPolicy"].(string), + CurrentLoad: 0, + Queue: []*Request{}, + Cache: make(map[string]CacheEntry), + Alive: true, + } + case "database": + simNode = &DatabaseNode{ + ID: n.ID, + Label: n.Props["label"].(string), + Replication: int(n.Props["replication"].(float64)), + Queue: []*Request{}, + Alive: true, + } + case "cdn": + simNode = &CDNNode{ + ID: n.ID, + Label: n.Props["label"].(string), + TTL: int(n.Props["ttl"].(float64)), + GeoReplication: n.Props["geoReplication"].(string), + CachingStrategy: n.Props["cachingStrategy"].(string), + Compression: n.Props["compression"].(string), + HTTP2: n.Props["http2"].(string), + Queue: []*Request{}, + Alive: true, + output: []*Request{}, + missQueue: []*Request{}, + } + case "messageQueue": + simNode = &MessageQueueNode{ + ID: n.ID, + Label: n.Props["label"].(string), + QueueSize: int(n.Props["maxSize"].(float64)), + MessageTTL: int(n.Props["retentionSeconds"].(float64)), + DeadLetter: false, + Queue: []*Request{}, + Alive: true, + } + case "microservice": + simNode = &MicroserviceNode{ + ID: n.ID, + Label: n.Props["label"].(string), + APIEndpoint: n.Props["apiVersion"].(string), + RateLimit: int(n.Props["rpsCapacity"].(float64)), + CircuitBreaker: true, + Queue: []*Request{}, + CircuitState: "closed", + Alive: true, + } + case "third party service": + simNode = &ThirdPartyServiceNode{ + ID: n.ID, + Label: n.Props["label"].(string), + APIEndpoint: n.Props["provider"].(string), + RateLimit: int(n.Props["latency"].(float64)), + RetryPolicy: "exponential", + Queue: []*Request{}, + Alive: true, + } + case "data pipeline": + simNode = &DataPipelineNode{ + ID: n.ID, + Label: n.Props["label"].(string), + BatchSize: int(n.Props["batchSize"].(float64)), + Transformation: n.Props["transformation"].(string), + Queue: []*Request{}, + Alive: true, + } + case "monitoring/alerting": + // Simulation not implemented yet; optionally skip + continue + default: + continue + } + + if simNode != nil { + nodeMap[simNode.GetID()] = simNode + } + } + + // Wire up connections + for _, conn := range design.Connections { + if sourceNode, ok := nodeMap[conn.Source]; ok { + if targetSetter, ok := sourceNode.(interface{ AddTarget(string) }); ok { + targetSetter.AddTarget(conn.Target) + } + } } return &Engine{ - Nodes: nodeMap, + Nodes: nodeMap, Duration: duration, - TickMs: tickMs, + TickMs: tickMs, } } - - diff --git a/internal/simulation/loadbalancer.go b/internal/simulation/loadbalancer.go index 0f91823..30de532 100644 --- a/internal/simulation/loadbalancer.go +++ b/internal/simulation/loadbalancer.go @@ -25,7 +25,7 @@ func (lb *LoadBalancerNode) Receive(req *Request) { lb.Queue = append(lb.Queue) } -func (lb *LoadBalancerNode) Tick(tick int) { +func (lb *LoadBalancerNode) Tick(tick int, currentTimeMs int) { lb.Processed = nil for _, req := range lb.Queue { diff --git a/internal/simulation/messagequeuenode.go b/internal/simulation/messagequeuenode.go new file mode 100644 index 0000000..be58d4c --- /dev/null +++ b/internal/simulation/messagequeuenode.go @@ -0,0 +1,93 @@ +package simulation + +type MessageQueueNode struct { + ID string + Label string + QueueSize int + MessageTTL int // TTL in milliseconds + DeadLetter bool + CurrentLoad int + Queue []*Request + Alive bool + Targets []string + output []*Request + deadLetterOutput []*Request +} + +func (n *MessageQueueNode) GetID() string { return n.ID } +func (n *MessageQueueNode) Type() string { return "messagequeue" } +func (n *MessageQueueNode) IsAlive() bool { return n.Alive } + +func (n *MessageQueueNode) Tick(tick int, currentTimeMs int) { + if len(n.Queue) == 0 { + return + } + + // Message queues have very high throughput + maxProcessPerTick := 20 // Higher than database (3) or CDN (10) + processCount := min(len(n.Queue), maxProcessPerTick) + + // Check for queue overflow (simulate back pressure) + if len(n.Queue) > n.QueueSize { + // Move oldest messages to dead letter queue if enabled + if n.DeadLetter { + overflow := len(n.Queue) - n.QueueSize + for i := 0; i < overflow; i++ { + deadReq := n.Queue[i] + deadReq.Type = "DEAD_LETTER" + deadReq.Path = append(deadReq.Path, n.ID+"(dead)") + n.deadLetterOutput = append(n.deadLetterOutput, deadReq) + } + n.Queue = n.Queue[overflow:] + } else { + // Drop messages if no dead letter queue + n.Queue = n.Queue[:n.QueueSize] + } + } + + // Process messages with TTL check + for i := 0; i < processCount; i++ { + req := n.Queue[0] + n.Queue = n.Queue[1:] + + // Check TTL (time to live) - use current time in milliseconds + messageAgeMs := currentTimeMs - req.Timestamp + if messageAgeMs > n.MessageTTL { + // Message expired + if n.DeadLetter { + req.Type = "EXPIRED" + req.Path = append(req.Path, n.ID+"(expired)") + n.deadLetterOutput = append(n.deadLetterOutput, req) + } + // Otherwise drop expired message + continue + } + + // Message queue adds minimal latency (very fast) + req.LatencyMS += 2 + req.Path = append(req.Path, n.ID) + n.output = append(n.output, req) + } +} + +func (n *MessageQueueNode) Receive(req *Request) { + if req == nil { + return + } + + // Message queues have very low receive overhead + req.LatencyMS += 1 + + n.Queue = append(n.Queue, req) +} + +func (n *MessageQueueNode) Emit() []*Request { + // Return both normal messages and dead letter messages + allRequests := append(n.output, n.deadLetterOutput...) + + // Clear queues + n.output = n.output[:0] + n.deadLetterOutput = n.deadLetterOutput[:0] + + return allRequests +} diff --git a/internal/simulation/microservicenode.go b/internal/simulation/microservicenode.go new file mode 100644 index 0000000..db5638e --- /dev/null +++ b/internal/simulation/microservicenode.go @@ -0,0 +1,103 @@ +package simulation + +import "math/rand" + +type MicroserviceNode struct { + ID string + Label string + APIEndpoint string + RateLimit int // max requests per tick + CircuitBreaker bool + CircuitState string // "closed", "open", "half-open" + ErrorCount int + CurrentLoad int + Queue []*Request + Output []*Request + Alive bool + Targets []string +} + +func (n *MicroserviceNode) GetID() string { return n.ID } + +func (n *MicroserviceNode) Type() string { return "microservice" } + +func (n *MicroserviceNode) IsAlive() bool { return n.Alive } + +func (n *MicroserviceNode) Receive(req *Request) { + n.Queue = append(n.Queue, req) +} + +func (n *MicroserviceNode) Emit() []*Request { + out := append([]*Request(nil), n.Output...) + n.Output = n.Output[:0] + return out +} + +func (n *MicroserviceNode) Tick(tick int, currentTimeMs int) { + if !n.Alive { + return + } + + // Simulate circuit breaker state transitions + switch n.CircuitState { + case "open": + // Skip processing + return + case "half-open": + // Allow limited testing traffic + if len(n.Queue) == 0 { + return + } + req := n.Queue[0] + n.Queue = n.Queue[1:] + req.LatencyMS += 15 + req.Path = append(req.Path, n.ID+"(half-open)") + success := simulateSuccess() + if success { + n.CircuitState = "closed" + n.ErrorCount = 0 + n.Output = append(n.Output, req) + } else { + n.ErrorCount++ + n.CircuitState = "open" + } + return + } + + // "closed" circuit - normal processing + toProcess := min(len(n.Queue), n.RateLimit) + for i := 0; i < toProcess; i++ { + req := n.Queue[i] + req.LatencyMS += 10 + req.Path = append(req.Path, n.ID) + + if simulateFailure() { + n.ErrorCount++ + if n.CircuitBreaker && n.ErrorCount > 5 { + n.CircuitState = "open" + break + } + continue + } + + n.Output = append(n.Output, req) + } + n.Queue = n.Queue[toProcess:] + + // Health check: simple example + n.Alive = n.CircuitState != "open" +} + +func simulateFailure() bool { + // Simulate 10% failure rate + return randInt(1, 100) <= 10 +} + +func simulateSuccess() bool { + // Simulate 90% success rate + return randInt(1, 100) <= 90 +} + +func randInt(min, max int) int { + return min + rand.Intn(max-min+1) +} diff --git a/internal/simulation/monitoringnode.go b/internal/simulation/monitoringnode.go new file mode 100644 index 0000000..fc37f6a --- /dev/null +++ b/internal/simulation/monitoringnode.go @@ -0,0 +1,66 @@ +package simulation + +type MonitoringNode struct { + ID string + Label string + Tool string + AlertMetric string + ThresholdValue int + ThresholdUnit string + Queue []*Request + Alive bool + Targets []string + Metrics map[string]int + Alerts []*Request +} + +func (n *MonitoringNode) GetID() string { return n.ID } + +func (n *MonitoringNode) Type() string { return "monitoring" } + +func (n *MonitoringNode) IsAlive() bool { return n.Alive } + +func (n *MonitoringNode) Receive(req *Request) { + n.Queue = append(n.Queue, req) +} + +func (n *MonitoringNode) Emit() []*Request { + out := append([]*Request(nil), n.Alerts...) + n.Alerts = n.Alerts[:0] + return out +} + +func (n *MonitoringNode) Tick(tick int, currentTimeMs int) { + if !n.Alive { + return + } + + if n.Metrics == nil { + n.Metrics = make(map[string]int) + } + + // Simulate processing requests as metrics + for _, req := range n.Queue { + // For now, pretend all requests are relevant to the AlertMetric + n.Metrics[n.AlertMetric] += 1 + req.LatencyMS += 1 + req.Path = append(req.Path, n.ID) + + if n.Metrics[n.AlertMetric] > n.ThresholdValue { + alert := &Request{ + ID: "alert-" + req.ID, + Timestamp: currentTimeMs, + Origin: n.ID, + Type: "alert", + LatencyMS: 0, + Path: []string{n.ID, "alert"}, + } + n.Alerts = append(n.Alerts, alert) + + // Reset after alert (or you could continue accumulating) + n.Metrics[n.AlertMetric] = 0 + } + } + + n.Queue = nil +} diff --git a/internal/simulation/thirdpartyservicenode.go b/internal/simulation/thirdpartyservicenode.go new file mode 100644 index 0000000..54dd559 --- /dev/null +++ b/internal/simulation/thirdpartyservicenode.go @@ -0,0 +1,77 @@ +package simulation + +type ThirdPartyServiceNode struct { + ID string + Label string + APIEndpoint string + RateLimit int // Max requests per tick + RetryPolicy string // "exponential", "fixed", etc. + CurrentLoad int + Queue []*Request + ErrorCount int + RetryCount int + Alive bool + Targets []string + Output []*Request +} + +// --- Interface Methods --- + +func (n *ThirdPartyServiceNode) GetID() string { return n.ID } +func (n *ThirdPartyServiceNode) Type() string { return "thirdpartyservice" } +func (n *ThirdPartyServiceNode) IsAlive() bool { return n.Alive } +func (n *ThirdPartyServiceNode) Receive(req *Request) { + n.Queue = append(n.Queue, req) +} +func (n *ThirdPartyServiceNode) Emit() []*Request { + out := append([]*Request(nil), n.Output...) + n.Output = n.Output[:0] + return out +} + +// Add missing Queue method for interface compliance +func (n *ThirdPartyServiceNode) GetQueue() []*Request { + return n.Queue +} + +// --- Simulation Logic --- + +func (n *ThirdPartyServiceNode) Tick(tick int, currentTimeMs int) { + if !n.Alive { + return + } + + // Simulate third-party call behavior with success/failure + maxProcess := min(n.RateLimit, len(n.Queue)) + newQueue := n.Queue[maxProcess:] + n.Queue = nil + + for i := 0; i < maxProcess; i++ { + req := newQueue[i] + success := simulateThirdPartySuccess(req) + + if success { + req.LatencyMS += 100 + randInt(0, 50) // simulate response time + req.Path = append(req.Path, n.ID) + n.Output = append(n.Output, req) + } else { + n.ErrorCount++ + n.RetryCount++ + if n.RetryPolicy == "exponential" && n.RetryCount < 3 { + n.Queue = append(n.Queue, req) // retry again next tick + } + } + } + + // Simulate degradation if too many errors + if n.ErrorCount > 10 { + n.Alive = false + } +} + +// --- Helpers --- + +func simulateThirdPartySuccess(req *Request) bool { + // 90% success rate + return randInt(0, 100) < 90 +} diff --git a/internal/simulation/webservernode.go b/internal/simulation/webservernode.go index 8a47a61..bf95534 100644 --- a/internal/simulation/webservernode.go +++ b/internal/simulation/webservernode.go @@ -8,6 +8,7 @@ type WebServerNode struct { PenaltyPerRPS float64 Processed []*Request Alive bool + Targets []string } func (ws *WebServerNode) GetID() string { @@ -22,20 +23,20 @@ func (ws *WebServerNode) IsAlive() bool { return ws.Alive } -func (ws *WebServerNode) Tick(tick int) { +func (ws *WebServerNode) Tick(tick int, currentTimeMs int) { toProcess := min(ws.CapacityRPS, len(ws.Queue)) for i := 0; i < toProcess; i++ { - req := ws.Queue[0] + req := ws.Queue[i] req.LatencyMS += ws.BaseLatencyMs ws.Processed = append(ws.Processed, req) - - ws.Queue[i] = nil } + // Remove processed requests from the queue ws.Queue = ws.Queue[toProcess:] - if len(ws.Queue) > ws.CapacityRPS { - overload := len(ws.Queue) - ws.CapacityRPS + // Apply penalty for overload + if len(ws.Queue) > 0 { + overload := len(ws.Queue) for _, req := range ws.Queue { req.LatencyMS += int(ws.PenaltyPerRPS * float64(overload)) } @@ -52,9 +53,17 @@ func (ws *WebServerNode) Emit() []*Request { return out } +func (ws *WebServerNode) AddTarget(targetID string) { + ws.Targets = append(ws.Targets, targetID) +} + func min(a, b int) int { if a < b { return a } return b } + +func (ws *WebServerNode) GetQueue() []*Request { + return ws.Queue +}