You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
396 lines
10 KiB
396 lines
10 KiB
package simulation |
|
|
|
import ( |
|
"testing" |
|
) |
|
|
|
func TestDataPipelineLogic_BasicProcessing(t *testing.T) { |
|
logic := DataPipelineLogic{} |
|
|
|
props := map[string]any{ |
|
"batchSize": 100.0, |
|
"transformation": "map", |
|
} |
|
|
|
// Create 50 requests (less than batch size) |
|
requests := make([]*Request, 50) |
|
for i := range requests { |
|
requests[i] = &Request{ID: string(rune('1' + i)), Type: "DATA", LatencyMS: 0} |
|
} |
|
|
|
// First tick - should create batch and start processing |
|
output, healthy := logic.Tick(props, requests, 1) |
|
|
|
if !healthy { |
|
t.Error("Expected data pipeline to be healthy") |
|
} |
|
|
|
// Should not have output yet (batch is still processing) |
|
if len(output) != 0 { |
|
t.Errorf("Expected no output during processing, got %d", len(output)) |
|
} |
|
|
|
// Check that batch was created |
|
state, ok := props["_pipelineState"].(PipelineState) |
|
if !ok { |
|
t.Error("Expected pipeline state to be created") |
|
} |
|
|
|
if len(state.ProcessingQueue) != 1 { |
|
t.Errorf("Expected 1 batch in processing queue, got %d", len(state.ProcessingQueue)) |
|
} |
|
|
|
if state.ProcessingQueue[0].RecordCount != 50 { |
|
t.Errorf("Expected batch with 50 records, got %d", state.ProcessingQueue[0].RecordCount) |
|
} |
|
} |
|
|
|
func TestDataPipelineLogic_BatchCompletion(t *testing.T) { |
|
logic := DataPipelineLogic{} |
|
|
|
props := map[string]any{ |
|
"batchSize": 10.0, |
|
"transformation": "filter", // Fast transformation |
|
} |
|
|
|
// Create 5 requests |
|
requests := make([]*Request, 5) |
|
for i := range requests { |
|
requests[i] = &Request{ID: string(rune('1' + i)), Type: "DATA", LatencyMS: 0} |
|
} |
|
|
|
// First tick - start processing |
|
logic.Tick(props, requests, 1) |
|
|
|
// Wait enough ticks for processing to complete |
|
// Filter transformation should complete quickly |
|
var output []*Request |
|
var healthy bool |
|
|
|
for tick := 2; tick <= 10; tick++ { |
|
output, healthy = logic.Tick(props, []*Request{}, tick) |
|
if len(output) > 0 { |
|
break |
|
} |
|
} |
|
|
|
if !healthy { |
|
t.Error("Expected data pipeline to be healthy") |
|
} |
|
|
|
// Should have output matching input count |
|
if len(output) != 5 { |
|
t.Errorf("Expected 5 output records, got %d", len(output)) |
|
} |
|
|
|
// Check output structure |
|
for _, req := range output { |
|
if req.Type != "PROCESSED" { |
|
t.Errorf("Expected PROCESSED type, got %s", req.Type) |
|
} |
|
if req.Origin != "data-pipeline" { |
|
t.Errorf("Expected data-pipeline origin, got %s", req.Origin) |
|
} |
|
if len(req.Path) == 0 || req.Path[0] != "pipeline-filter" { |
|
t.Error("Expected path to indicate filter transformation") |
|
} |
|
} |
|
} |
|
|
|
func TestDataPipelineLogic_MultipleBatches(t *testing.T) { |
|
logic := DataPipelineLogic{} |
|
|
|
props := map[string]any{ |
|
"batchSize": 10.0, |
|
"transformation": "map", |
|
} |
|
|
|
// Create 25 requests (should create 3 batches: 10, 10, 5) |
|
requests := make([]*Request, 25) |
|
for i := range requests { |
|
requests[i] = &Request{ID: string(rune('1' + i)), Type: "DATA", LatencyMS: 0} |
|
} |
|
|
|
// First tick - create batches |
|
output, healthy := logic.Tick(props, requests, 1) |
|
|
|
if !healthy { |
|
t.Error("Expected data pipeline to be healthy") |
|
} |
|
|
|
if len(output) != 0 { |
|
t.Error("Expected no immediate output") |
|
} |
|
|
|
// Check that 3 batches were created |
|
state, ok := props["_pipelineState"].(PipelineState) |
|
if !ok { |
|
t.Error("Expected pipeline state to be created") |
|
} |
|
|
|
if len(state.ProcessingQueue) != 3 { |
|
t.Errorf("Expected 3 batches in processing queue, got %d", len(state.ProcessingQueue)) |
|
} |
|
|
|
// Verify batch sizes |
|
expectedSizes := []int{10, 10, 5} |
|
for i, batch := range state.ProcessingQueue { |
|
if batch.RecordCount != expectedSizes[i] { |
|
t.Errorf("Expected batch %d to have %d records, got %d", |
|
i, expectedSizes[i], batch.RecordCount) |
|
} |
|
} |
|
} |
|
|
|
func TestDataPipelineLogic_TransformationComplexity(t *testing.T) { |
|
logic := DataPipelineLogic{} |
|
|
|
transformations := []string{"filter", "map", "sort", "aggregate", "join"} |
|
|
|
for _, transformation := range transformations { |
|
t.Run(transformation, func(t *testing.T) { |
|
complexity := logic.getTransformationComplexity(transformation) |
|
|
|
// Verify relative complexity ordering |
|
switch transformation { |
|
case "filter": |
|
if complexity >= logic.getTransformationComplexity("map") { |
|
t.Error("Filter should be simpler than map") |
|
} |
|
case "join": |
|
if complexity <= logic.getTransformationComplexity("aggregate") { |
|
t.Error("Join should be more complex than aggregate") |
|
} |
|
case "sort": |
|
if complexity <= logic.getTransformationComplexity("map") { |
|
t.Error("Sort should be more complex than map") |
|
} |
|
} |
|
|
|
if complexity <= 0 { |
|
t.Errorf("Expected positive complexity for %s", transformation) |
|
} |
|
}) |
|
} |
|
} |
|
|
|
func TestDataPipelineLogic_BatchOverhead(t *testing.T) { |
|
logic := DataPipelineLogic{} |
|
|
|
// Test different overhead levels |
|
testCases := []struct { |
|
transformation string |
|
expectedRange [2]float64 // [min, max] |
|
}{ |
|
{"map", [2]float64{0, 100}}, // Low overhead |
|
{"join", [2]float64{300, 600}}, // High overhead |
|
{"sort", [2]float64{150, 300}}, // Medium overhead |
|
} |
|
|
|
for _, tc := range testCases { |
|
overhead := logic.getBatchOverhead(tc.transformation) |
|
|
|
if overhead < tc.expectedRange[0] || overhead > tc.expectedRange[1] { |
|
t.Errorf("Expected %s overhead between %.0f-%.0f, got %.0f", |
|
tc.transformation, tc.expectedRange[0], tc.expectedRange[1], overhead) |
|
} |
|
} |
|
} |
|
|
|
func TestDataPipelineLogic_ProcessingTime(t *testing.T) { |
|
logic := DataPipelineLogic{} |
|
|
|
// Test that processing time scales with record count |
|
smallBatch := logic.calculateProcessingTime(10, "map") |
|
largeBatch := logic.calculateProcessingTime(100, "map") |
|
|
|
if largeBatch <= smallBatch { |
|
t.Error("Expected larger batch to take more time") |
|
} |
|
|
|
// Test that complex transformations take longer |
|
simpleTime := logic.calculateProcessingTime(50, "filter") |
|
complexTime := logic.calculateProcessingTime(50, "join") |
|
|
|
if complexTime <= simpleTime { |
|
t.Error("Expected complex transformation to take longer") |
|
} |
|
|
|
// Test economies of scale (large batches should be more efficient per record) |
|
smallPerRecord := float64(smallBatch) / 10.0 |
|
largePerRecord := float64(largeBatch) / 100.0 |
|
|
|
if largePerRecord >= smallPerRecord { |
|
t.Error("Expected economies of scale for larger batches") |
|
} |
|
} |
|
|
|
func TestDataPipelineLogic_HealthCheck(t *testing.T) { |
|
logic := DataPipelineLogic{} |
|
|
|
props := map[string]any{ |
|
"batchSize": 10.0, |
|
"transformation": "join", // Slow transformation |
|
} |
|
|
|
// Create a large number of requests to test backlog health |
|
requests := make([]*Request, 300) // 30 batches (above healthy threshold) |
|
for i := range requests { |
|
requests[i] = &Request{ID: string(rune('1' + (i % 26))), Type: "DATA", LatencyMS: 0} |
|
} |
|
|
|
// First tick - should create many batches |
|
output, healthy := logic.Tick(props, requests, 1) |
|
|
|
// Should be unhealthy due to large backlog |
|
if healthy { |
|
t.Error("Expected data pipeline to be unhealthy with large backlog") |
|
} |
|
|
|
if len(output) != 0 { |
|
t.Error("Expected no immediate output with slow transformation") |
|
} |
|
|
|
// Check backlog size |
|
state, ok := props["_pipelineState"].(PipelineState) |
|
if !ok { |
|
t.Error("Expected pipeline state to be created") |
|
} |
|
|
|
if state.BacklogSize < 200 { |
|
t.Errorf("Expected large backlog, got %d", state.BacklogSize) |
|
} |
|
} |
|
|
|
func TestDataPipelineLogic_DefaultValues(t *testing.T) { |
|
logic := DataPipelineLogic{} |
|
|
|
// Empty props should use defaults |
|
props := map[string]any{} |
|
|
|
requests := []*Request{{ID: "1", Type: "DATA", LatencyMS: 0}} |
|
|
|
output, healthy := logic.Tick(props, requests, 1) |
|
|
|
if !healthy { |
|
t.Error("Expected pipeline to be healthy with default values") |
|
} |
|
|
|
if len(output) != 0 { |
|
t.Error("Expected no immediate output") |
|
} |
|
|
|
// Should use default batch size and transformation |
|
state, ok := props["_pipelineState"].(PipelineState) |
|
if !ok { |
|
t.Error("Expected pipeline state to be created with defaults") |
|
} |
|
|
|
if len(state.ProcessingQueue) != 1 { |
|
t.Error("Expected one batch with default settings") |
|
} |
|
} |
|
|
|
func TestDataPipelineLogic_PipelineStats(t *testing.T) { |
|
logic := DataPipelineLogic{} |
|
|
|
props := map[string]any{ |
|
"batchSize": 5.0, |
|
"transformation": "filter", |
|
} |
|
|
|
// Initial stats should be empty |
|
stats := logic.GetPipelineStats(props) |
|
if stats["completedBatches"] != 0 { |
|
t.Error("Expected initial completed batches to be 0") |
|
} |
|
|
|
// Process some data |
|
requests := make([]*Request, 10) |
|
for i := range requests { |
|
requests[i] = &Request{ID: string(rune('1' + i)), Type: "DATA", LatencyMS: 0} |
|
} |
|
|
|
logic.Tick(props, requests, 1) |
|
|
|
// Check stats after processing |
|
stats = logic.GetPipelineStats(props) |
|
if stats["queuedBatches"] != 2 { |
|
t.Errorf("Expected 2 queued batches, got %v", stats["queuedBatches"]) |
|
} |
|
|
|
if stats["backlogSize"] != 10 { |
|
t.Errorf("Expected backlog size of 10, got %v", stats["backlogSize"]) |
|
} |
|
} |
|
|
|
func TestDataPipelineLogic_ContinuousProcessing(t *testing.T) { |
|
logic := DataPipelineLogic{} |
|
|
|
props := map[string]any{ |
|
"batchSize": 5.0, |
|
"transformation": "map", |
|
} |
|
|
|
// Process multiple waves of data |
|
totalOutput := 0 |
|
|
|
for wave := 0; wave < 3; wave++ { |
|
requests := make([]*Request, 5) |
|
for i := range requests { |
|
requests[i] = &Request{ID: string(rune('A' + wave*5 + i)), Type: "DATA", LatencyMS: 0} |
|
} |
|
|
|
// Process each wave |
|
for tick := wave*10 + 1; tick <= wave*10+5; tick++ { |
|
var output []*Request |
|
if tick == wave*10+1 { |
|
output, _ = logic.Tick(props, requests, tick) |
|
} else { |
|
output, _ = logic.Tick(props, []*Request{}, tick) |
|
} |
|
totalOutput += len(output) |
|
} |
|
} |
|
|
|
// Should have processed all data eventually |
|
if totalOutput != 15 { |
|
t.Errorf("Expected 15 total output records, got %d", totalOutput) |
|
} |
|
|
|
// Check final stats |
|
stats := logic.GetPipelineStats(props) |
|
if stats["totalRecords"] != 15 { |
|
t.Errorf("Expected 15 total records processed, got %v", stats["totalRecords"]) |
|
} |
|
} |
|
|
|
func TestDataPipelineLogic_EmptyQueue(t *testing.T) { |
|
logic := DataPipelineLogic{} |
|
|
|
props := map[string]any{ |
|
"batchSize": 10.0, |
|
"transformation": "map", |
|
} |
|
|
|
// Process empty queue |
|
output, healthy := logic.Tick(props, []*Request{}, 1) |
|
|
|
if !healthy { |
|
t.Error("Expected pipeline to be healthy with empty queue") |
|
} |
|
|
|
if len(output) != 0 { |
|
t.Error("Expected no output with empty queue") |
|
} |
|
|
|
// State should be initialized but empty |
|
state, ok := props["_pipelineState"].(PipelineState) |
|
if !ok { |
|
t.Error("Expected pipeline state to be initialized") |
|
} |
|
|
|
if len(state.ProcessingQueue) != 0 { |
|
t.Error("Expected empty processing queue") |
|
} |
|
}
|
|
|