You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

203 lines
5.9 KiB

package simulation
type DataPipelineLogic struct{}
type DataBatch struct {
ID string
RecordCount int
Timestamp int
ProcessingMS int
}
type PipelineState struct {
ProcessingQueue []DataBatch
CompletedBatches int
TotalRecords int
BacklogSize int
}
func (d DataPipelineLogic) Tick(props map[string]any, queue []*Request, tick int) ([]*Request, bool) {
// Extract data pipeline properties
batchSize := int(AsFloat64(props["batchSize"]))
if batchSize == 0 {
batchSize = 500 // default batch size
}
transformation := AsString(props["transformation"])
if transformation == "" {
transformation = "map" // default transformation
}
// Get pipeline state from props (persistent state)
state, ok := props["_pipelineState"].(PipelineState)
if !ok {
state = PipelineState{
ProcessingQueue: []DataBatch{},
CompletedBatches: 0,
TotalRecords: 0,
BacklogSize: 0,
}
}
currentTime := tick * 100 // Convert tick to milliseconds
// Convert incoming requests to data batches
if len(queue) > 0 {
// Group requests into batches
batches := d.createBatches(queue, batchSize, currentTime, transformation)
// Add batches to processing queue
state.ProcessingQueue = append(state.ProcessingQueue, batches...)
state.BacklogSize += len(queue)
}
// Process batches that are ready (completed their processing time)
output := []*Request{}
remainingBatches := []DataBatch{}
for _, batch := range state.ProcessingQueue {
if currentTime >= batch.Timestamp+batch.ProcessingMS {
// Batch is complete - create output requests
for i := 0; i < batch.RecordCount; i++ {
processedReq := &Request{
ID: batch.ID + "-record-" + string(rune('0'+i)),
Timestamp: batch.Timestamp,
LatencyMS: batch.ProcessingMS,
Origin: "data-pipeline",
Type: "PROCESSED",
Path: []string{"pipeline-" + transformation},
}
output = append(output, processedReq)
}
state.CompletedBatches++
state.TotalRecords += batch.RecordCount
} else {
// Batch still processing
remainingBatches = append(remainingBatches, batch)
}
}
state.ProcessingQueue = remainingBatches
state.BacklogSize = len(remainingBatches) * batchSize
// Update persistent state
props["_pipelineState"] = state
// Health check: pipeline is healthy if backlog is not too large
maxBacklogSize := batchSize * 20 // Allow up to 20 batches in backlog
healthy := state.BacklogSize < maxBacklogSize
return output, healthy
}
// createBatches groups requests into batches and calculates processing time
func (d DataPipelineLogic) createBatches(requests []*Request, batchSize int, timestamp int, transformation string) []DataBatch {
batches := []DataBatch{}
for i := 0; i < len(requests); i += batchSize {
end := i + batchSize
if end > len(requests) {
end = len(requests)
}
recordCount := end - i
processingTime := d.calculateProcessingTime(recordCount, transformation)
batch := DataBatch{
ID: "batch-" + string(rune('A'+len(batches))),
RecordCount: recordCount,
Timestamp: timestamp,
ProcessingMS: processingTime,
}
batches = append(batches, batch)
}
return batches
}
// calculateProcessingTime determines how long a batch takes to process based on transformation type
func (d DataPipelineLogic) calculateProcessingTime(recordCount int, transformation string) int {
// Base processing time per record
baseTimePerRecord := d.getTransformationComplexity(transformation)
// Total time scales with record count but with some economies of scale
totalTime := float64(recordCount) * baseTimePerRecord
// Add batch overhead (setup, teardown, I/O)
batchOverhead := d.getBatchOverhead(transformation)
totalTime += batchOverhead
// Apply economies of scale for larger batches (slightly more efficient)
if recordCount > 100 {
scaleFactor := 0.9 // 10% efficiency gain for large batches
totalTime *= scaleFactor
}
return int(totalTime)
}
// getTransformationComplexity returns base processing time per record in milliseconds
func (d DataPipelineLogic) getTransformationComplexity(transformation string) float64 {
switch transformation {
case "map":
return 1.0 // Simple field mapping
case "filter":
return 0.5 // Just evaluate conditions
case "sort":
return 3.0 // Sorting requires more compute
case "aggregate":
return 2.0 // Grouping and calculating aggregates
case "join":
return 5.0 // Most expensive - joining with other datasets
case "deduplicate":
return 2.5 // Hash-based deduplication
case "validate":
return 1.5 // Data validation and cleaning
case "enrich":
return 4.0 // Enriching with external data
case "compress":
return 1.2 // Compression processing
case "encrypt":
return 2.0 // Encryption overhead
default:
return 1.0 // Default to simple transformation
}
}
// getBatchOverhead returns fixed overhead time per batch in milliseconds
func (d DataPipelineLogic) getBatchOverhead(transformation string) float64 {
switch transformation {
case "map", "filter", "validate":
return 50.0 // Low overhead for simple operations
case "sort", "aggregate", "deduplicate":
return 200.0 // Medium overhead for complex operations
case "join", "enrich":
return 500.0 // High overhead for operations requiring external data
case "compress", "encrypt":
return 100.0 // Medium overhead for I/O operations
default:
return 100.0 // Default overhead
}
}
// Helper function to get pipeline statistics
func (d DataPipelineLogic) GetPipelineStats(props map[string]any) map[string]interface{} {
state, ok := props["_pipelineState"].(PipelineState)
if !ok {
return map[string]interface{}{
"completedBatches": 0,
"totalRecords": 0,
"backlogSize": 0,
"queuedBatches": 0,
}
}
return map[string]interface{}{
"completedBatches": state.CompletedBatches,
"totalRecords": state.TotalRecords,
"backlogSize": state.BacklogSize,
"queuedBatches": len(state.ProcessingQueue),
}
}