From 17811d57b23edab31c8c0d905a1ee033bc3b7740 Mon Sep 17 00:00:00 2001 From: Stephanie Gredell Date: Wed, 18 Jun 2025 10:51:02 -0700 Subject: [PATCH] fix up datapipeline node --- internal/simulation/datapipelinenode.go | 65 ++++++++++++++++--------- 1 file changed, 43 insertions(+), 22 deletions(-) diff --git a/internal/simulation/datapipelinenode.go b/internal/simulation/datapipelinenode.go index 9061bdb..0a21088 100644 --- a/internal/simulation/datapipelinenode.go +++ b/internal/simulation/datapipelinenode.go @@ -5,27 +5,32 @@ import ( ) type DataPipelineNode struct { - ID string - Label string - BatchSize int - Transformation string - CurrentLoad int - Queue []*Request - Alive bool - Targets []string - output []*Request + ID string + Label string + BatchSize int + Transformation string + CurrentLoad int + Queue []*Request + Alive bool + Targets []string + Output []*Request + LastFlushTimeMS int } func (n *DataPipelineNode) GetID() string { return n.ID } -func (n *DataPipelineNode) Tick(tick int) { +func (n *DataPipelineNode) Tick(tick int, currentTimeMs int) { if len(n.Queue) == 0 { return } if len(n.Queue) < n.BatchSize { - if tick%50 == 0 { - n.processBatch(len(n.Queue)) + if n.LastFlushTimeMS == 0 { + n.LastFlushTimeMS = currentTimeMs + } + if currentTimeMs-n.LastFlushTimeMS >= 5000 { + n.processBatch(len(n.Queue), currentTimeMs) + n.LastFlushTimeMS = currentTimeMs } return } @@ -35,11 +40,12 @@ func (n *DataPipelineNode) Tick(tick int) { actualBatches := min(batchesToProcess, maxBatchesPerTick) for i := 0; i < actualBatches; i++ { - n.processBatch(n.BatchSize) + n.processBatch(n.BatchSize, currentTimeMs) + n.LastFlushTimeMS = currentTimeMs } } -func (n *DataPipelineNode) processBatch(size int) { +func (n *DataPipelineNode) processBatch(size int, currentTimeMs int) { if size == 0 { return } @@ -53,18 +59,29 @@ func (n *DataPipelineNode) processBatch(size int) { req.Path = append(req.Path, n.ID+"(batch)") switch n.Transformation { - case "aggregation": + case "aggregate": req.Type = "AGGREGATED" - case "filtering": + n.Output = append(n.Output, req) + case "filter": if rand.Float64() < 0.9 { - n.output = append(n.output, req) + n.Output = append(n.Output, req) } continue - case "enrichment": + case "enrich": req.Type = "ENRICHED" + n.Output = append(n.Output, req) + case "normalize": + req.Type = "NORMALIZED" + n.Output = append(n.Output, req) + case "dedupe": + if rand.Float64() < 0.95 { + req.Type = "DEDUPED" + n.Output = append(n.Output, req) + } + continue + default: + n.Output = append(n.Output, req) } - - n.output = append(n.output, req) } } @@ -74,11 +91,15 @@ func (n *DataPipelineNode) Receive(req *Request) { } req.LatencyMS += 1 n.Queue = append(n.Queue, req) + + if n.LastFlushTimeMS == 0 { + n.LastFlushTimeMS = req.Timestamp + } } func (n *DataPipelineNode) Emit() []*Request { - out := append([]*Request(nil), n.output...) - n.output = n.output[:0] + out := append([]*Request(nil), n.Output...) + n.Output = n.Output[:0] return out }