Browse Source

Add simulation nodes

pull/1/head
Stephanie Gredell 7 months ago
parent
commit
a29350bd3c
  1. 24
      internal/simulation/cdnnode.go
  2. 20
      internal/simulation/databasenode.go
  3. 117
      internal/simulation/engine.go
  4. 2
      internal/simulation/loadbalancer.go
  5. 93
      internal/simulation/messagequeuenode.go
  6. 103
      internal/simulation/microservicenode.go
  7. 66
      internal/simulation/monitoringnode.go
  8. 77
      internal/simulation/thirdpartyservicenode.go
  9. 21
      internal/simulation/webservernode.go

24
internal/simulation/cdnnode.go

@ -12,14 +12,14 @@ type CDNNode struct { @@ -12,14 +12,14 @@ type CDNNode struct {
CachingStrategy string
Compression string
HTTP2 string
CacheHitRate float64 // % of requests served from edge cache
CurrentLoad int // optional: can track active queue length
Queue []*Request // incoming request queue
EdgeNodes map[string]*CDNNode // future expansion: simulate geographic edge clusters
CacheHitRate float64
CurrentLoad int
Queue []*Request
EdgeNodes map[string]*CDNNode
Alive bool
Targets []string
output []*Request // cache HIT responses
missQueue []*Request // cache MISSes to forward
output []*Request
missQueue []*Request
}
func (n *CDNNode) GetID() string { return n.ID }
@ -27,7 +27,7 @@ func (n *CDNNode) Type() string { return "cdn" } @@ -27,7 +27,7 @@ func (n *CDNNode) Type() string { return "cdn" }
func (n *CDNNode) IsAlive() bool { return n.Alive }
func (n *CDNNode) QueueState() []*Request { return n.Queue }
func (n *CDNNode) Tick(tick int) {
func (n *CDNNode) Tick(tick int, currentTimeMs int) {
if len(n.Queue) == 0 {
return
}
@ -35,17 +35,15 @@ func (n *CDNNode) Tick(tick int) { @@ -35,17 +35,15 @@ func (n *CDNNode) Tick(tick int) {
maxProcessPerTick := 10
processCount := min(len(n.Queue), maxProcessPerTick)
// Avoid slice leak by reusing a cleared slice
queue := n.Queue
n.Queue = n.Queue[:0]
for i := 0; i < processCount; i++ {
req := queue[i]
// Simulate cache hit or miss
hitRate := n.CacheHitRate
if hitRate == 0 {
hitRate = 0.8 // default if unset
hitRate = 0.8
}
if rand.Float64() < hitRate {
@ -61,9 +59,8 @@ func (n *CDNNode) Tick(tick int) { @@ -61,9 +59,8 @@ func (n *CDNNode) Tick(tick int) {
}
}
// Optionally simulate cache expiration every 100 ticks
if tick%100 == 0 {
// e.g., simulate reduced hit rate or TTL expiration
if len(queue) > processCount {
n.Queue = append(n.Queue, queue[processCount:]...)
}
}
@ -81,7 +78,6 @@ func (n *CDNNode) Receive(req *Request) { @@ -81,7 +78,6 @@ func (n *CDNNode) Receive(req *Request) {
func (n *CDNNode) Emit() []*Request {
out := append(n.output, n.missQueue...)
// Clear for next tick
n.output = n.output[:0]
n.missQueue = n.missQueue[:0]

20
internal/simulation/databasenode.go

@ -9,8 +9,8 @@ type DatabaseNode struct { @@ -9,8 +9,8 @@ type DatabaseNode struct {
Replicas []*DatabaseNode
Alive bool
Targets []string
output []*Request
replicationQueue []*Request
Output []*Request
ReplicationQueue []*Request
}
func (n *DatabaseNode) GetID() string { return n.ID }
@ -18,7 +18,7 @@ func (n *DatabaseNode) Type() string { return "database" } @@ -18,7 +18,7 @@ func (n *DatabaseNode) Type() string { return "database" }
func (n *DatabaseNode) IsAlive() bool { return n.Alive }
func (n *DatabaseNode) GetQueue() []*Request { return n.Queue }
func (n *DatabaseNode) Tick(tick int) {
func (n *DatabaseNode) Tick(tick int, currentTimeMs int) {
if len(n.Queue) == 0 {
return
}
@ -35,12 +35,11 @@ func (n *DatabaseNode) Tick(tick int) { @@ -35,12 +35,11 @@ func (n *DatabaseNode) Tick(tick int) {
if req.Type == "READ" {
req.LatencyMS += 20
req.Path = append(req.Path, n.ID)
n.output = append(n.output, req)
n.Output = append(n.Output, req)
} else {
req.LatencyMS += 50
req.Path = append(req.Path, n.ID)
// Replicate to replicas
for _, replica := range n.Replicas {
replicationReq := &Request{
ID: req.ID + "-repl",
@ -50,14 +49,13 @@ func (n *DatabaseNode) Tick(tick int) { @@ -50,14 +49,13 @@ func (n *DatabaseNode) Tick(tick int) {
Type: "REPLICATION",
Path: append(append([]string{}, req.Path...), "->"+replica.ID),
}
n.replicationQueue = append(n.replicationQueue, replicationReq)
n.ReplicationQueue = append(n.ReplicationQueue, replicationReq)
}
n.output = append(n.output, req)
n.Output = append(n.Output, req)
}
}
// Apply queuing penalty if overloaded
if len(n.Queue) > 10 {
for _, req := range n.Queue {
req.LatencyMS += 10
@ -74,8 +72,8 @@ func (n *DatabaseNode) Receive(req *Request) { @@ -74,8 +72,8 @@ func (n *DatabaseNode) Receive(req *Request) {
}
func (n *DatabaseNode) Emit() []*Request {
out := append(n.output, n.replicationQueue...)
n.output = n.output[:0]
n.replicationQueue = n.replicationQueue[:0]
out := append(n.Output, n.ReplicationQueue...)
n.Output = n.Output[:0]
n.ReplicationQueue = n.ReplicationQueue[:0]
return out
}

117
internal/simulation/engine.go

@ -14,9 +14,9 @@ type Request struct { @@ -14,9 +14,9 @@ type Request struct {
}
type SimulationNode interface {
ID() string
GetID() string
Type() string
Tick(tick int)
Tick(tick int, currentTimeMs int)
Receive(req *Request)
Emit() []*Request
IsAlive() bool
@ -35,11 +35,112 @@ type TickSnapshot struct { @@ -35,11 +35,112 @@ type TickSnapshot struct {
NodeHealth map[string]string
}
func NewEngineFromDesign(design simulation.Design, duration int, tickMs int) *Engine {
nodes := design.ToSimulationNode()
nodeMap := make(make[string]SimulatSimulationNode)
for _, n := range nodes {
nodeMap[n.ID] = n
func NewEngineFromDesign(design design.Design, duration int, tickMs int) *Engine {
nodeMap := make(map[string]SimulationNode)
for _, n := range design.Nodes {
var simNode SimulationNode
switch n.Type {
case "webserver":
simNode = &WebServerNode{
ID: n.ID,
Alive: true,
Queue: []*Request{},
}
case "cache":
simNode = &CacheNode{
ID: n.ID,
Label: n.Props["label"].(string),
CacheTTL: int(n.Props["cacheTTL"].(float64)),
MaxEntries: int(n.Props["maxEntries"].(float64)),
EvictionPolicy: n.Props["evictionPolicy"].(string),
CurrentLoad: 0,
Queue: []*Request{},
Cache: make(map[string]CacheEntry),
Alive: true,
}
case "database":
simNode = &DatabaseNode{
ID: n.ID,
Label: n.Props["label"].(string),
Replication: int(n.Props["replication"].(float64)),
Queue: []*Request{},
Alive: true,
}
case "cdn":
simNode = &CDNNode{
ID: n.ID,
Label: n.Props["label"].(string),
TTL: int(n.Props["ttl"].(float64)),
GeoReplication: n.Props["geoReplication"].(string),
CachingStrategy: n.Props["cachingStrategy"].(string),
Compression: n.Props["compression"].(string),
HTTP2: n.Props["http2"].(string),
Queue: []*Request{},
Alive: true,
output: []*Request{},
missQueue: []*Request{},
}
case "messageQueue":
simNode = &MessageQueueNode{
ID: n.ID,
Label: n.Props["label"].(string),
QueueSize: int(n.Props["maxSize"].(float64)),
MessageTTL: int(n.Props["retentionSeconds"].(float64)),
DeadLetter: false,
Queue: []*Request{},
Alive: true,
}
case "microservice":
simNode = &MicroserviceNode{
ID: n.ID,
Label: n.Props["label"].(string),
APIEndpoint: n.Props["apiVersion"].(string),
RateLimit: int(n.Props["rpsCapacity"].(float64)),
CircuitBreaker: true,
Queue: []*Request{},
CircuitState: "closed",
Alive: true,
}
case "third party service":
simNode = &ThirdPartyServiceNode{
ID: n.ID,
Label: n.Props["label"].(string),
APIEndpoint: n.Props["provider"].(string),
RateLimit: int(n.Props["latency"].(float64)),
RetryPolicy: "exponential",
Queue: []*Request{},
Alive: true,
}
case "data pipeline":
simNode = &DataPipelineNode{
ID: n.ID,
Label: n.Props["label"].(string),
BatchSize: int(n.Props["batchSize"].(float64)),
Transformation: n.Props["transformation"].(string),
Queue: []*Request{},
Alive: true,
}
case "monitoring/alerting":
// Simulation not implemented yet; optionally skip
continue
default:
continue
}
if simNode != nil {
nodeMap[simNode.GetID()] = simNode
}
}
// Wire up connections
for _, conn := range design.Connections {
if sourceNode, ok := nodeMap[conn.Source]; ok {
if targetSetter, ok := sourceNode.(interface{ AddTarget(string) }); ok {
targetSetter.AddTarget(conn.Target)
}
}
}
return &Engine{
@ -48,5 +149,3 @@ func NewEngineFromDesign(design simulation.Design, duration int, tickMs int) *En @@ -48,5 +149,3 @@ func NewEngineFromDesign(design simulation.Design, duration int, tickMs int) *En
TickMs: tickMs,
}
}

2
internal/simulation/loadbalancer.go

@ -25,7 +25,7 @@ func (lb *LoadBalancerNode) Receive(req *Request) { @@ -25,7 +25,7 @@ func (lb *LoadBalancerNode) Receive(req *Request) {
lb.Queue = append(lb.Queue)
}
func (lb *LoadBalancerNode) Tick(tick int) {
func (lb *LoadBalancerNode) Tick(tick int, currentTimeMs int) {
lb.Processed = nil
for _, req := range lb.Queue {

93
internal/simulation/messagequeuenode.go

@ -0,0 +1,93 @@ @@ -0,0 +1,93 @@
package simulation
type MessageQueueNode struct {
ID string
Label string
QueueSize int
MessageTTL int // TTL in milliseconds
DeadLetter bool
CurrentLoad int
Queue []*Request
Alive bool
Targets []string
output []*Request
deadLetterOutput []*Request
}
func (n *MessageQueueNode) GetID() string { return n.ID }
func (n *MessageQueueNode) Type() string { return "messagequeue" }
func (n *MessageQueueNode) IsAlive() bool { return n.Alive }
func (n *MessageQueueNode) Tick(tick int, currentTimeMs int) {
if len(n.Queue) == 0 {
return
}
// Message queues have very high throughput
maxProcessPerTick := 20 // Higher than database (3) or CDN (10)
processCount := min(len(n.Queue), maxProcessPerTick)
// Check for queue overflow (simulate back pressure)
if len(n.Queue) > n.QueueSize {
// Move oldest messages to dead letter queue if enabled
if n.DeadLetter {
overflow := len(n.Queue) - n.QueueSize
for i := 0; i < overflow; i++ {
deadReq := n.Queue[i]
deadReq.Type = "DEAD_LETTER"
deadReq.Path = append(deadReq.Path, n.ID+"(dead)")
n.deadLetterOutput = append(n.deadLetterOutput, deadReq)
}
n.Queue = n.Queue[overflow:]
} else {
// Drop messages if no dead letter queue
n.Queue = n.Queue[:n.QueueSize]
}
}
// Process messages with TTL check
for i := 0; i < processCount; i++ {
req := n.Queue[0]
n.Queue = n.Queue[1:]
// Check TTL (time to live) - use current time in milliseconds
messageAgeMs := currentTimeMs - req.Timestamp
if messageAgeMs > n.MessageTTL {
// Message expired
if n.DeadLetter {
req.Type = "EXPIRED"
req.Path = append(req.Path, n.ID+"(expired)")
n.deadLetterOutput = append(n.deadLetterOutput, req)
}
// Otherwise drop expired message
continue
}
// Message queue adds minimal latency (very fast)
req.LatencyMS += 2
req.Path = append(req.Path, n.ID)
n.output = append(n.output, req)
}
}
func (n *MessageQueueNode) Receive(req *Request) {
if req == nil {
return
}
// Message queues have very low receive overhead
req.LatencyMS += 1
n.Queue = append(n.Queue, req)
}
func (n *MessageQueueNode) Emit() []*Request {
// Return both normal messages and dead letter messages
allRequests := append(n.output, n.deadLetterOutput...)
// Clear queues
n.output = n.output[:0]
n.deadLetterOutput = n.deadLetterOutput[:0]
return allRequests
}

103
internal/simulation/microservicenode.go

@ -0,0 +1,103 @@ @@ -0,0 +1,103 @@
package simulation
import "math/rand"
type MicroserviceNode struct {
ID string
Label string
APIEndpoint string
RateLimit int // max requests per tick
CircuitBreaker bool
CircuitState string // "closed", "open", "half-open"
ErrorCount int
CurrentLoad int
Queue []*Request
Output []*Request
Alive bool
Targets []string
}
func (n *MicroserviceNode) GetID() string { return n.ID }
func (n *MicroserviceNode) Type() string { return "microservice" }
func (n *MicroserviceNode) IsAlive() bool { return n.Alive }
func (n *MicroserviceNode) Receive(req *Request) {
n.Queue = append(n.Queue, req)
}
func (n *MicroserviceNode) Emit() []*Request {
out := append([]*Request(nil), n.Output...)
n.Output = n.Output[:0]
return out
}
func (n *MicroserviceNode) Tick(tick int, currentTimeMs int) {
if !n.Alive {
return
}
// Simulate circuit breaker state transitions
switch n.CircuitState {
case "open":
// Skip processing
return
case "half-open":
// Allow limited testing traffic
if len(n.Queue) == 0 {
return
}
req := n.Queue[0]
n.Queue = n.Queue[1:]
req.LatencyMS += 15
req.Path = append(req.Path, n.ID+"(half-open)")
success := simulateSuccess()
if success {
n.CircuitState = "closed"
n.ErrorCount = 0
n.Output = append(n.Output, req)
} else {
n.ErrorCount++
n.CircuitState = "open"
}
return
}
// "closed" circuit - normal processing
toProcess := min(len(n.Queue), n.RateLimit)
for i := 0; i < toProcess; i++ {
req := n.Queue[i]
req.LatencyMS += 10
req.Path = append(req.Path, n.ID)
if simulateFailure() {
n.ErrorCount++
if n.CircuitBreaker && n.ErrorCount > 5 {
n.CircuitState = "open"
break
}
continue
}
n.Output = append(n.Output, req)
}
n.Queue = n.Queue[toProcess:]
// Health check: simple example
n.Alive = n.CircuitState != "open"
}
func simulateFailure() bool {
// Simulate 10% failure rate
return randInt(1, 100) <= 10
}
func simulateSuccess() bool {
// Simulate 90% success rate
return randInt(1, 100) <= 90
}
func randInt(min, max int) int {
return min + rand.Intn(max-min+1)
}

66
internal/simulation/monitoringnode.go

@ -0,0 +1,66 @@ @@ -0,0 +1,66 @@
package simulation
type MonitoringNode struct {
ID string
Label string
Tool string
AlertMetric string
ThresholdValue int
ThresholdUnit string
Queue []*Request
Alive bool
Targets []string
Metrics map[string]int
Alerts []*Request
}
func (n *MonitoringNode) GetID() string { return n.ID }
func (n *MonitoringNode) Type() string { return "monitoring" }
func (n *MonitoringNode) IsAlive() bool { return n.Alive }
func (n *MonitoringNode) Receive(req *Request) {
n.Queue = append(n.Queue, req)
}
func (n *MonitoringNode) Emit() []*Request {
out := append([]*Request(nil), n.Alerts...)
n.Alerts = n.Alerts[:0]
return out
}
func (n *MonitoringNode) Tick(tick int, currentTimeMs int) {
if !n.Alive {
return
}
if n.Metrics == nil {
n.Metrics = make(map[string]int)
}
// Simulate processing requests as metrics
for _, req := range n.Queue {
// For now, pretend all requests are relevant to the AlertMetric
n.Metrics[n.AlertMetric] += 1
req.LatencyMS += 1
req.Path = append(req.Path, n.ID)
if n.Metrics[n.AlertMetric] > n.ThresholdValue {
alert := &Request{
ID: "alert-" + req.ID,
Timestamp: currentTimeMs,
Origin: n.ID,
Type: "alert",
LatencyMS: 0,
Path: []string{n.ID, "alert"},
}
n.Alerts = append(n.Alerts, alert)
// Reset after alert (or you could continue accumulating)
n.Metrics[n.AlertMetric] = 0
}
}
n.Queue = nil
}

77
internal/simulation/thirdpartyservicenode.go

@ -0,0 +1,77 @@ @@ -0,0 +1,77 @@
package simulation
type ThirdPartyServiceNode struct {
ID string
Label string
APIEndpoint string
RateLimit int // Max requests per tick
RetryPolicy string // "exponential", "fixed", etc.
CurrentLoad int
Queue []*Request
ErrorCount int
RetryCount int
Alive bool
Targets []string
Output []*Request
}
// --- Interface Methods ---
func (n *ThirdPartyServiceNode) GetID() string { return n.ID }
func (n *ThirdPartyServiceNode) Type() string { return "thirdpartyservice" }
func (n *ThirdPartyServiceNode) IsAlive() bool { return n.Alive }
func (n *ThirdPartyServiceNode) Receive(req *Request) {
n.Queue = append(n.Queue, req)
}
func (n *ThirdPartyServiceNode) Emit() []*Request {
out := append([]*Request(nil), n.Output...)
n.Output = n.Output[:0]
return out
}
// Add missing Queue method for interface compliance
func (n *ThirdPartyServiceNode) GetQueue() []*Request {
return n.Queue
}
// --- Simulation Logic ---
func (n *ThirdPartyServiceNode) Tick(tick int, currentTimeMs int) {
if !n.Alive {
return
}
// Simulate third-party call behavior with success/failure
maxProcess := min(n.RateLimit, len(n.Queue))
newQueue := n.Queue[maxProcess:]
n.Queue = nil
for i := 0; i < maxProcess; i++ {
req := newQueue[i]
success := simulateThirdPartySuccess(req)
if success {
req.LatencyMS += 100 + randInt(0, 50) // simulate response time
req.Path = append(req.Path, n.ID)
n.Output = append(n.Output, req)
} else {
n.ErrorCount++
n.RetryCount++
if n.RetryPolicy == "exponential" && n.RetryCount < 3 {
n.Queue = append(n.Queue, req) // retry again next tick
}
}
}
// Simulate degradation if too many errors
if n.ErrorCount > 10 {
n.Alive = false
}
}
// --- Helpers ---
func simulateThirdPartySuccess(req *Request) bool {
// 90% success rate
return randInt(0, 100) < 90
}

21
internal/simulation/webservernode.go

@ -8,6 +8,7 @@ type WebServerNode struct { @@ -8,6 +8,7 @@ type WebServerNode struct {
PenaltyPerRPS float64
Processed []*Request
Alive bool
Targets []string
}
func (ws *WebServerNode) GetID() string {
@ -22,20 +23,20 @@ func (ws *WebServerNode) IsAlive() bool { @@ -22,20 +23,20 @@ func (ws *WebServerNode) IsAlive() bool {
return ws.Alive
}
func (ws *WebServerNode) Tick(tick int) {
func (ws *WebServerNode) Tick(tick int, currentTimeMs int) {
toProcess := min(ws.CapacityRPS, len(ws.Queue))
for i := 0; i < toProcess; i++ {
req := ws.Queue[0]
req := ws.Queue[i]
req.LatencyMS += ws.BaseLatencyMs
ws.Processed = append(ws.Processed, req)
ws.Queue[i] = nil
}
// Remove processed requests from the queue
ws.Queue = ws.Queue[toProcess:]
if len(ws.Queue) > ws.CapacityRPS {
overload := len(ws.Queue) - ws.CapacityRPS
// Apply penalty for overload
if len(ws.Queue) > 0 {
overload := len(ws.Queue)
for _, req := range ws.Queue {
req.LatencyMS += int(ws.PenaltyPerRPS * float64(overload))
}
@ -52,9 +53,17 @@ func (ws *WebServerNode) Emit() []*Request { @@ -52,9 +53,17 @@ func (ws *WebServerNode) Emit() []*Request {
return out
}
func (ws *WebServerNode) AddTarget(targetID string) {
ws.Targets = append(ws.Targets, targetID)
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func (ws *WebServerNode) GetQueue() []*Request {
return ws.Queue
}

Loading…
Cancel
Save