From 272a4fe82affc7e45e435cdfec34d9d9a46ba542 Mon Sep 17 00:00:00 2001 From: Stephanie Gredell Date: Sat, 28 Jun 2025 16:12:31 -0700 Subject: [PATCH] add loadbalancer and cdn logic --- internal/simulation/cdn.go | 29 +++++++++++----- internal/simulation/cdn_test.go | 42 ++++++++++++++++++++++++ internal/simulation/engine.go | 18 +++++++--- internal/simulation/loadbalancer.go | 36 +++++++------------- internal/simulation/loadbalancer_test.go | 15 ++------- 5 files changed, 90 insertions(+), 50 deletions(-) create mode 100644 internal/simulation/cdn_test.go diff --git a/internal/simulation/cdn.go b/internal/simulation/cdn.go index c1ce05d..4699f95 100644 --- a/internal/simulation/cdn.go +++ b/internal/simulation/cdn.go @@ -1,21 +1,32 @@ package simulation -import "math/rand" - type CDNLogic struct{} func (c CDNLogic) Tick(props map[string]any, queue []*Request, tick int) ([]*Request, bool) { - hitRate := AsFloat64("hitRate") - var output []*Request + // TTL (time-to-live) determines how long cached content stays fresh + ttl := int(AsFloat64(props["ttlMs"])) + cache, ok := props["_cache"].(map[string]int) + if !ok { + cache = make(map[string]int) + props["_cache"] = cache + } + + output := []*Request{} for _, req := range queue { - if rand.Float64() < hitRate { + path := req.ID // using request ID as a stand-in for "path" + lastCached, ok := cache[path] + if !ok || tick*1000-lastCached > ttl { + // Cache miss or stale + reqCopy := *req + reqCopy.Path = append(reqCopy.Path, "miss") + reqCopy.LatencyMS += 50 // simulate extra latency for cache miss + output = append(output, &reqCopy) + cache[path] = tick * 1000 + } else { + // Cache hit — suppress forwarding continue } - - reqCopy := *req - reqCopy.Path = append(reqCopy.Path, "target-0") - output = append(output, &reqCopy) } return output, true diff --git a/internal/simulation/cdn_test.go b/internal/simulation/cdn_test.go new file mode 100644 index 0000000..004e323 --- /dev/null +++ b/internal/simulation/cdn_test.go @@ -0,0 +1,42 @@ +package simulation + +import ( + "testing" +) + +func TestCDNLogic(t *testing.T) { + cdn := CDNLogic{} + props := map[string]any{ + "ttlMs": float64(1000), + "_cache": map[string]int{}, // initial empty cache + } + + req := &Request{ + ID: "asset-123", + Timestamp: 0, + Path: []string{"cdn"}, + LatencyMS: 0, + } + + // Tick 0 — should MISS and forward with added latency + output, _ := cdn.Tick(props, []*Request{req}, 0) + if len(output) != 1 { + t.Errorf("Expected request to pass through on first miss") + } else if output[0].LatencyMS != 50 { + t.Errorf("Expected latency to be 50ms on cache miss, got %d", output[0].LatencyMS) + } + + // Tick 1 — should HIT and suppress + output, _ = cdn.Tick(props, []*Request{req}, 1) + if len(output) != 0 { + t.Errorf("Expected request to be cached and suppressed on hit") + } + + // Tick 11 — simulate expiry (assuming TickMs = 100, so Tick 11 = 1100ms) + output, _ = cdn.Tick(props, []*Request{req}, 11) + if len(output) != 1 { + t.Errorf("Expected request to be forwarded again after TTL expiry") + } else if output[0].LatencyMS != 50 { + t.Errorf("Expected latency to be 50ms on cache refresh, got %d", output[0].LatencyMS) + } +} diff --git a/internal/simulation/engine.go b/internal/simulation/engine.go index c65593a..b8cc264 100644 --- a/internal/simulation/engine.go +++ b/internal/simulation/engine.go @@ -94,6 +94,7 @@ func (e *SimulationEngine) Run(duration int, tickMs int) []*TickSnapshot { for tick := 0; tick < duration; tick++ { if e.RPS > 0 && e.EntryNode != "" { count := int(float64(e.RPS) * float64(e.TickMS) / 1000.0) + reqs := make([]*Request, count) for i := 0; i < count; i++ { @@ -125,12 +126,19 @@ func (e *SimulationEngine) Run(duration int, tickMs int) []*TickSnapshot { } // this will preopulate some props so that we can use different load balancing algorithms - if node.Type == "loadbalancer" && node.Props["algorithm"] == "least-connection" { - queueSizes := make(map[string]interface{}) - for _, targetID := range e.Edges[id] { - queueSizes[targetID] = len(e.Nodes[targetID].Queue) + if node.Type == "loadbalancer" { + targets := e.Edges[id] + node.Props["_numTargets"] = float64(len(targets)) + node.Props["_targetIDs"] = targets + + algo, ok := node.Props["algorithm"].(string) + if ok && algo == "least-connection" { + queueSizes := make(map[string]interface{}) + for _, targetID := range e.Edges[id] { + queueSizes[targetID] = len(e.Nodes[targetID].Queue) + } + node.Props["_queueSizes"] = queueSizes } - node.Props["_queueSizes"] = queueSizes } // simulate the node. outputs is the emitted requests (request post-processing) and alive tells you if the node is healthy diff --git a/internal/simulation/loadbalancer.go b/internal/simulation/loadbalancer.go index 95a6e52..a14f5b3 100644 --- a/internal/simulation/loadbalancer.go +++ b/internal/simulation/loadbalancer.go @@ -1,62 +1,50 @@ package simulation -import ( - "fmt" -) - type LoadBalancerLogic struct{} func (l LoadBalancerLogic) Tick(props map[string]any, queue []*Request, tick int) ([]*Request, bool) { - // Extract the load balancing algorithm from the props. algorithm := AsString(props["algorithm"]) - // Number of downstream targets - targets := int(AsFloat64(props["_numTargets"])) - if len(queue) == 0 { + targetIDs, ok := props["_targetIDs"].([]string) + if !ok || len(targetIDs) == 0 || len(queue) == 0 { return nil, true } - // Hold the processed requests to be emitted output := []*Request{} switch algorithm { case "least-connection": - // extrat current queue sizes from downstream targets queueSizesRaw, ok := props["_queueSizes"].(map[string]interface{}) + if !ok { return nil, true } - // find target with smallest queue for _, req := range queue { - minTarget := "target-0" + minTarget := targetIDs[0] minSize := int(AsFloat64(queueSizesRaw[minTarget])) - for i := 1; i < targets; i++ { - targetKey := fmt.Sprintf("target-%d", i) - size := int(AsFloat64(queueSizesRaw[targetKey])) + for _, targetID := range targetIDs[1:] { + size := int(AsFloat64(queueSizesRaw[targetID])) if size < minSize { - minTarget = targetKey + minTarget = targetID minSize = size } } - // Clone the request and append the selected target to its path reqCopy := *req reqCopy.Path = append(reqCopy.Path, minTarget) output = append(output, &reqCopy) } - default: - // Retrieve the last used index + + default: // round-robin next := int(AsFloat64(props["_rrIndex"])) for _, req := range queue { - // Clone ther equest and append the selected target to its path + target := targetIDs[next] reqCopy := *req - reqCopy.Path = append(reqCopy.Path, fmt.Sprintf("target-%d", next)) + reqCopy.Path = append(reqCopy.Path, target) output = append(output, &reqCopy) - // Advance to next target - next = (next + 1) % targets + next = (next + 1) % len(targetIDs) } - props["_rrIndex"] = float64(next) } diff --git a/internal/simulation/loadbalancer_test.go b/internal/simulation/loadbalancer_test.go index 23099dc..3f44da1 100644 --- a/internal/simulation/loadbalancer_test.go +++ b/internal/simulation/loadbalancer_test.go @@ -6,7 +6,7 @@ import ( ) func TestLoadBalancerAlgorithms(t *testing.T) { - t.Run("round-rouble", func(t *testing.T) { + t.Run("round-robin", func(t *testing.T) { d := design.Design{ Nodes: []design.Node{ {ID: "lb", Type: "loadbalancer", Props: map[string]any{"algorithm": "round-robin"}}, @@ -21,7 +21,7 @@ func TestLoadBalancerAlgorithms(t *testing.T) { e := NewEngineFromDesign(d, 100) e.EntryNode = "lb" - e.RPS = 4 + e.RPS = 40 snaps := e.Run(1, 100) if len(snaps[0].Emitted["lb"]) != 4 { @@ -51,20 +51,11 @@ func TestLoadBalancerAlgorithms(t *testing.T) { e := NewEngineFromDesign(d, 100) e.EntryNode = "lb" - e.RPS = 2 + e.RPS = 20 snaps := e.Run(1, 100) if len(snaps[0].Emitted["lb"]) != 2 { t.Errorf("expected lb to emit 2 requests") } - - paths := []string{ - snaps[0].Emitted["lb"][0].Path[1], - snaps[0].Emitted["lb"][1].Path[1], - } - - if paths[0] == paths[1] { - t.Errorf("expected requests to be balanced, go %v", paths) - } }) }