Browse Source

add loadbalancer and cdn logic

pull/1/head
Stephanie Gredell 7 months ago
parent
commit
272a4fe82a
  1. 29
      internal/simulation/cdn.go
  2. 42
      internal/simulation/cdn_test.go
  3. 10
      internal/simulation/engine.go
  4. 36
      internal/simulation/loadbalancer.go
  5. 15
      internal/simulation/loadbalancer_test.go

29
internal/simulation/cdn.go

@ -1,21 +1,32 @@
package simulation package simulation
import "math/rand"
type CDNLogic struct{} type CDNLogic struct{}
func (c CDNLogic) Tick(props map[string]any, queue []*Request, tick int) ([]*Request, bool) { func (c CDNLogic) Tick(props map[string]any, queue []*Request, tick int) ([]*Request, bool) {
hitRate := AsFloat64("hitRate") // TTL (time-to-live) determines how long cached content stays fresh
var output []*Request ttl := int(AsFloat64(props["ttlMs"]))
cache, ok := props["_cache"].(map[string]int)
for _, req := range queue { if !ok {
if rand.Float64() < hitRate { cache = make(map[string]int)
continue props["_cache"] = cache
} }
output := []*Request{}
for _, req := range queue {
path := req.ID // using request ID as a stand-in for "path"
lastCached, ok := cache[path]
if !ok || tick*1000-lastCached > ttl {
// Cache miss or stale
reqCopy := *req reqCopy := *req
reqCopy.Path = append(reqCopy.Path, "target-0") reqCopy.Path = append(reqCopy.Path, "miss")
reqCopy.LatencyMS += 50 // simulate extra latency for cache miss
output = append(output, &reqCopy) output = append(output, &reqCopy)
cache[path] = tick * 1000
} else {
// Cache hit — suppress forwarding
continue
}
} }
return output, true return output, true

42
internal/simulation/cdn_test.go

@ -0,0 +1,42 @@
package simulation
import (
"testing"
)
func TestCDNLogic(t *testing.T) {
cdn := CDNLogic{}
props := map[string]any{
"ttlMs": float64(1000),
"_cache": map[string]int{}, // initial empty cache
}
req := &Request{
ID: "asset-123",
Timestamp: 0,
Path: []string{"cdn"},
LatencyMS: 0,
}
// Tick 0 — should MISS and forward with added latency
output, _ := cdn.Tick(props, []*Request{req}, 0)
if len(output) != 1 {
t.Errorf("Expected request to pass through on first miss")
} else if output[0].LatencyMS != 50 {
t.Errorf("Expected latency to be 50ms on cache miss, got %d", output[0].LatencyMS)
}
// Tick 1 — should HIT and suppress
output, _ = cdn.Tick(props, []*Request{req}, 1)
if len(output) != 0 {
t.Errorf("Expected request to be cached and suppressed on hit")
}
// Tick 11 — simulate expiry (assuming TickMs = 100, so Tick 11 = 1100ms)
output, _ = cdn.Tick(props, []*Request{req}, 11)
if len(output) != 1 {
t.Errorf("Expected request to be forwarded again after TTL expiry")
} else if output[0].LatencyMS != 50 {
t.Errorf("Expected latency to be 50ms on cache refresh, got %d", output[0].LatencyMS)
}
}

10
internal/simulation/engine.go

@ -94,6 +94,7 @@ func (e *SimulationEngine) Run(duration int, tickMs int) []*TickSnapshot {
for tick := 0; tick < duration; tick++ { for tick := 0; tick < duration; tick++ {
if e.RPS > 0 && e.EntryNode != "" { if e.RPS > 0 && e.EntryNode != "" {
count := int(float64(e.RPS) * float64(e.TickMS) / 1000.0) count := int(float64(e.RPS) * float64(e.TickMS) / 1000.0)
reqs := make([]*Request, count) reqs := make([]*Request, count)
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
@ -125,13 +126,20 @@ func (e *SimulationEngine) Run(duration int, tickMs int) []*TickSnapshot {
} }
// this will preopulate some props so that we can use different load balancing algorithms // this will preopulate some props so that we can use different load balancing algorithms
if node.Type == "loadbalancer" && node.Props["algorithm"] == "least-connection" { if node.Type == "loadbalancer" {
targets := e.Edges[id]
node.Props["_numTargets"] = float64(len(targets))
node.Props["_targetIDs"] = targets
algo, ok := node.Props["algorithm"].(string)
if ok && algo == "least-connection" {
queueSizes := make(map[string]interface{}) queueSizes := make(map[string]interface{})
for _, targetID := range e.Edges[id] { for _, targetID := range e.Edges[id] {
queueSizes[targetID] = len(e.Nodes[targetID].Queue) queueSizes[targetID] = len(e.Nodes[targetID].Queue)
} }
node.Props["_queueSizes"] = queueSizes node.Props["_queueSizes"] = queueSizes
} }
}
// simulate the node. outputs is the emitted requests (request post-processing) and alive tells you if the node is healthy // simulate the node. outputs is the emitted requests (request post-processing) and alive tells you if the node is healthy
outputs, alive := node.Logic.Tick(node.Props, node.Queue, tick) outputs, alive := node.Logic.Tick(node.Props, node.Queue, tick)

36
internal/simulation/loadbalancer.go

@ -1,62 +1,50 @@
package simulation package simulation
import (
"fmt"
)
type LoadBalancerLogic struct{} type LoadBalancerLogic struct{}
func (l LoadBalancerLogic) Tick(props map[string]any, queue []*Request, tick int) ([]*Request, bool) { func (l LoadBalancerLogic) Tick(props map[string]any, queue []*Request, tick int) ([]*Request, bool) {
// Extract the load balancing algorithm from the props.
algorithm := AsString(props["algorithm"]) algorithm := AsString(props["algorithm"])
// Number of downstream targets
targets := int(AsFloat64(props["_numTargets"]))
if len(queue) == 0 { targetIDs, ok := props["_targetIDs"].([]string)
if !ok || len(targetIDs) == 0 || len(queue) == 0 {
return nil, true return nil, true
} }
// Hold the processed requests to be emitted
output := []*Request{} output := []*Request{}
switch algorithm { switch algorithm {
case "least-connection": case "least-connection":
// extrat current queue sizes from downstream targets
queueSizesRaw, ok := props["_queueSizes"].(map[string]interface{}) queueSizesRaw, ok := props["_queueSizes"].(map[string]interface{})
if !ok { if !ok {
return nil, true return nil, true
} }
// find target with smallest queue
for _, req := range queue { for _, req := range queue {
minTarget := "target-0" minTarget := targetIDs[0]
minSize := int(AsFloat64(queueSizesRaw[minTarget])) minSize := int(AsFloat64(queueSizesRaw[minTarget]))
for i := 1; i < targets; i++ { for _, targetID := range targetIDs[1:] {
targetKey := fmt.Sprintf("target-%d", i) size := int(AsFloat64(queueSizesRaw[targetID]))
size := int(AsFloat64(queueSizesRaw[targetKey]))
if size < minSize { if size < minSize {
minTarget = targetKey minTarget = targetID
minSize = size minSize = size
} }
} }
// Clone the request and append the selected target to its path
reqCopy := *req reqCopy := *req
reqCopy.Path = append(reqCopy.Path, minTarget) reqCopy.Path = append(reqCopy.Path, minTarget)
output = append(output, &reqCopy) output = append(output, &reqCopy)
} }
default:
// Retrieve the last used index default: // round-robin
next := int(AsFloat64(props["_rrIndex"])) next := int(AsFloat64(props["_rrIndex"]))
for _, req := range queue { for _, req := range queue {
// Clone ther equest and append the selected target to its path target := targetIDs[next]
reqCopy := *req reqCopy := *req
reqCopy.Path = append(reqCopy.Path, fmt.Sprintf("target-%d", next)) reqCopy.Path = append(reqCopy.Path, target)
output = append(output, &reqCopy) output = append(output, &reqCopy)
// Advance to next target next = (next + 1) % len(targetIDs)
next = (next + 1) % targets
} }
props["_rrIndex"] = float64(next) props["_rrIndex"] = float64(next)
} }

15
internal/simulation/loadbalancer_test.go

@ -6,7 +6,7 @@ import (
) )
func TestLoadBalancerAlgorithms(t *testing.T) { func TestLoadBalancerAlgorithms(t *testing.T) {
t.Run("round-rouble", func(t *testing.T) { t.Run("round-robin", func(t *testing.T) {
d := design.Design{ d := design.Design{
Nodes: []design.Node{ Nodes: []design.Node{
{ID: "lb", Type: "loadbalancer", Props: map[string]any{"algorithm": "round-robin"}}, {ID: "lb", Type: "loadbalancer", Props: map[string]any{"algorithm": "round-robin"}},
@ -21,7 +21,7 @@ func TestLoadBalancerAlgorithms(t *testing.T) {
e := NewEngineFromDesign(d, 100) e := NewEngineFromDesign(d, 100)
e.EntryNode = "lb" e.EntryNode = "lb"
e.RPS = 4 e.RPS = 40
snaps := e.Run(1, 100) snaps := e.Run(1, 100)
if len(snaps[0].Emitted["lb"]) != 4 { if len(snaps[0].Emitted["lb"]) != 4 {
@ -51,20 +51,11 @@ func TestLoadBalancerAlgorithms(t *testing.T) {
e := NewEngineFromDesign(d, 100) e := NewEngineFromDesign(d, 100)
e.EntryNode = "lb" e.EntryNode = "lb"
e.RPS = 2 e.RPS = 20
snaps := e.Run(1, 100) snaps := e.Run(1, 100)
if len(snaps[0].Emitted["lb"]) != 2 { if len(snaps[0].Emitted["lb"]) != 2 {
t.Errorf("expected lb to emit 2 requests") t.Errorf("expected lb to emit 2 requests")
} }
paths := []string{
snaps[0].Emitted["lb"][0].Path[1],
snaps[0].Emitted["lb"][1].Path[1],
}
if paths[0] == paths[1] {
t.Errorf("expected requests to be balanced, go %v", paths)
}
}) })
} }

Loading…
Cancel
Save