Browse Source

fix up datapipeline node

pull/1/head
Stephanie Gredell 7 months ago
parent
commit
17811d57b2
  1. 49
      internal/simulation/datapipelinenode.go

49
internal/simulation/datapipelinenode.go

@ -13,19 +13,24 @@ type DataPipelineNode struct {
Queue []*Request Queue []*Request
Alive bool Alive bool
Targets []string Targets []string
output []*Request Output []*Request
LastFlushTimeMS int
} }
func (n *DataPipelineNode) GetID() string { return n.ID } func (n *DataPipelineNode) GetID() string { return n.ID }
func (n *DataPipelineNode) Tick(tick int) { func (n *DataPipelineNode) Tick(tick int, currentTimeMs int) {
if len(n.Queue) == 0 { if len(n.Queue) == 0 {
return return
} }
if len(n.Queue) < n.BatchSize { if len(n.Queue) < n.BatchSize {
if tick%50 == 0 { if n.LastFlushTimeMS == 0 {
n.processBatch(len(n.Queue)) n.LastFlushTimeMS = currentTimeMs
}
if currentTimeMs-n.LastFlushTimeMS >= 5000 {
n.processBatch(len(n.Queue), currentTimeMs)
n.LastFlushTimeMS = currentTimeMs
} }
return return
} }
@ -35,11 +40,12 @@ func (n *DataPipelineNode) Tick(tick int) {
actualBatches := min(batchesToProcess, maxBatchesPerTick) actualBatches := min(batchesToProcess, maxBatchesPerTick)
for i := 0; i < actualBatches; i++ { for i := 0; i < actualBatches; i++ {
n.processBatch(n.BatchSize) n.processBatch(n.BatchSize, currentTimeMs)
n.LastFlushTimeMS = currentTimeMs
} }
} }
func (n *DataPipelineNode) processBatch(size int) { func (n *DataPipelineNode) processBatch(size int, currentTimeMs int) {
if size == 0 { if size == 0 {
return return
} }
@ -53,18 +59,29 @@ func (n *DataPipelineNode) processBatch(size int) {
req.Path = append(req.Path, n.ID+"(batch)") req.Path = append(req.Path, n.ID+"(batch)")
switch n.Transformation { switch n.Transformation {
case "aggregation": case "aggregate":
req.Type = "AGGREGATED" req.Type = "AGGREGATED"
case "filtering": n.Output = append(n.Output, req)
case "filter":
if rand.Float64() < 0.9 { if rand.Float64() < 0.9 {
n.output = append(n.output, req) n.Output = append(n.Output, req)
} }
continue continue
case "enrichment": case "enrich":
req.Type = "ENRICHED" req.Type = "ENRICHED"
n.Output = append(n.Output, req)
case "normalize":
req.Type = "NORMALIZED"
n.Output = append(n.Output, req)
case "dedupe":
if rand.Float64() < 0.95 {
req.Type = "DEDUPED"
n.Output = append(n.Output, req)
}
continue
default:
n.Output = append(n.Output, req)
} }
n.output = append(n.output, req)
} }
} }
@ -74,11 +91,15 @@ func (n *DataPipelineNode) Receive(req *Request) {
} }
req.LatencyMS += 1 req.LatencyMS += 1
n.Queue = append(n.Queue, req) n.Queue = append(n.Queue, req)
if n.LastFlushTimeMS == 0 {
n.LastFlushTimeMS = req.Timestamp
}
} }
func (n *DataPipelineNode) Emit() []*Request { func (n *DataPipelineNode) Emit() []*Request {
out := append([]*Request(nil), n.output...) out := append([]*Request(nil), n.Output...)
n.output = n.output[:0] n.Output = n.Output[:0]
return out return out
} }

Loading…
Cancel
Save