You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

396 lines
10 KiB

package simulation
import (
"testing"
)
func TestDataPipelineLogic_BasicProcessing(t *testing.T) {
logic := DataPipelineLogic{}
props := map[string]any{
"batchSize": 100.0,
"transformation": "map",
}
// Create 50 requests (less than batch size)
requests := make([]*Request, 50)
for i := range requests {
requests[i] = &Request{ID: string(rune('1' + i)), Type: "DATA", LatencyMS: 0}
}
// First tick - should create batch and start processing
output, healthy := logic.Tick(props, requests, 1)
if !healthy {
t.Error("Expected data pipeline to be healthy")
}
// Should not have output yet (batch is still processing)
if len(output) != 0 {
t.Errorf("Expected no output during processing, got %d", len(output))
}
// Check that batch was created
state, ok := props["_pipelineState"].(PipelineState)
if !ok {
t.Error("Expected pipeline state to be created")
}
if len(state.ProcessingQueue) != 1 {
t.Errorf("Expected 1 batch in processing queue, got %d", len(state.ProcessingQueue))
}
if state.ProcessingQueue[0].RecordCount != 50 {
t.Errorf("Expected batch with 50 records, got %d", state.ProcessingQueue[0].RecordCount)
}
}
func TestDataPipelineLogic_BatchCompletion(t *testing.T) {
logic := DataPipelineLogic{}
props := map[string]any{
"batchSize": 10.0,
"transformation": "filter", // Fast transformation
}
// Create 5 requests
requests := make([]*Request, 5)
for i := range requests {
requests[i] = &Request{ID: string(rune('1' + i)), Type: "DATA", LatencyMS: 0}
}
// First tick - start processing
logic.Tick(props, requests, 1)
// Wait enough ticks for processing to complete
// Filter transformation should complete quickly
var output []*Request
var healthy bool
for tick := 2; tick <= 10; tick++ {
output, healthy = logic.Tick(props, []*Request{}, tick)
if len(output) > 0 {
break
}
}
if !healthy {
t.Error("Expected data pipeline to be healthy")
}
// Should have output matching input count
if len(output) != 5 {
t.Errorf("Expected 5 output records, got %d", len(output))
}
// Check output structure
for _, req := range output {
if req.Type != "PROCESSED" {
t.Errorf("Expected PROCESSED type, got %s", req.Type)
}
if req.Origin != "data-pipeline" {
t.Errorf("Expected data-pipeline origin, got %s", req.Origin)
}
if len(req.Path) == 0 || req.Path[0] != "pipeline-filter" {
t.Error("Expected path to indicate filter transformation")
}
}
}
func TestDataPipelineLogic_MultipleBatches(t *testing.T) {
logic := DataPipelineLogic{}
props := map[string]any{
"batchSize": 10.0,
"transformation": "map",
}
// Create 25 requests (should create 3 batches: 10, 10, 5)
requests := make([]*Request, 25)
for i := range requests {
requests[i] = &Request{ID: string(rune('1' + i)), Type: "DATA", LatencyMS: 0}
}
// First tick - create batches
output, healthy := logic.Tick(props, requests, 1)
if !healthy {
t.Error("Expected data pipeline to be healthy")
}
if len(output) != 0 {
t.Error("Expected no immediate output")
}
// Check that 3 batches were created
state, ok := props["_pipelineState"].(PipelineState)
if !ok {
t.Error("Expected pipeline state to be created")
}
if len(state.ProcessingQueue) != 3 {
t.Errorf("Expected 3 batches in processing queue, got %d", len(state.ProcessingQueue))
}
// Verify batch sizes
expectedSizes := []int{10, 10, 5}
for i, batch := range state.ProcessingQueue {
if batch.RecordCount != expectedSizes[i] {
t.Errorf("Expected batch %d to have %d records, got %d",
i, expectedSizes[i], batch.RecordCount)
}
}
}
func TestDataPipelineLogic_TransformationComplexity(t *testing.T) {
logic := DataPipelineLogic{}
transformations := []string{"filter", "map", "sort", "aggregate", "join"}
for _, transformation := range transformations {
t.Run(transformation, func(t *testing.T) {
complexity := logic.getTransformationComplexity(transformation)
// Verify relative complexity ordering
switch transformation {
case "filter":
if complexity >= logic.getTransformationComplexity("map") {
t.Error("Filter should be simpler than map")
}
case "join":
if complexity <= logic.getTransformationComplexity("aggregate") {
t.Error("Join should be more complex than aggregate")
}
case "sort":
if complexity <= logic.getTransformationComplexity("map") {
t.Error("Sort should be more complex than map")
}
}
if complexity <= 0 {
t.Errorf("Expected positive complexity for %s", transformation)
}
})
}
}
func TestDataPipelineLogic_BatchOverhead(t *testing.T) {
logic := DataPipelineLogic{}
// Test different overhead levels
testCases := []struct {
transformation string
expectedRange [2]float64 // [min, max]
}{
{"map", [2]float64{0, 100}}, // Low overhead
{"join", [2]float64{300, 600}}, // High overhead
{"sort", [2]float64{150, 300}}, // Medium overhead
}
for _, tc := range testCases {
overhead := logic.getBatchOverhead(tc.transformation)
if overhead < tc.expectedRange[0] || overhead > tc.expectedRange[1] {
t.Errorf("Expected %s overhead between %.0f-%.0f, got %.0f",
tc.transformation, tc.expectedRange[0], tc.expectedRange[1], overhead)
}
}
}
func TestDataPipelineLogic_ProcessingTime(t *testing.T) {
logic := DataPipelineLogic{}
// Test that processing time scales with record count
smallBatch := logic.calculateProcessingTime(10, "map")
largeBatch := logic.calculateProcessingTime(100, "map")
if largeBatch <= smallBatch {
t.Error("Expected larger batch to take more time")
}
// Test that complex transformations take longer
simpleTime := logic.calculateProcessingTime(50, "filter")
complexTime := logic.calculateProcessingTime(50, "join")
if complexTime <= simpleTime {
t.Error("Expected complex transformation to take longer")
}
// Test economies of scale (large batches should be more efficient per record)
smallPerRecord := float64(smallBatch) / 10.0
largePerRecord := float64(largeBatch) / 100.0
if largePerRecord >= smallPerRecord {
t.Error("Expected economies of scale for larger batches")
}
}
func TestDataPipelineLogic_HealthCheck(t *testing.T) {
logic := DataPipelineLogic{}
props := map[string]any{
"batchSize": 10.0,
"transformation": "join", // Slow transformation
}
// Create a large number of requests to test backlog health
requests := make([]*Request, 300) // 30 batches (above healthy threshold)
for i := range requests {
requests[i] = &Request{ID: string(rune('1' + (i % 26))), Type: "DATA", LatencyMS: 0}
}
// First tick - should create many batches
output, healthy := logic.Tick(props, requests, 1)
// Should be unhealthy due to large backlog
if healthy {
t.Error("Expected data pipeline to be unhealthy with large backlog")
}
if len(output) != 0 {
t.Error("Expected no immediate output with slow transformation")
}
// Check backlog size
state, ok := props["_pipelineState"].(PipelineState)
if !ok {
t.Error("Expected pipeline state to be created")
}
if state.BacklogSize < 200 {
t.Errorf("Expected large backlog, got %d", state.BacklogSize)
}
}
func TestDataPipelineLogic_DefaultValues(t *testing.T) {
logic := DataPipelineLogic{}
// Empty props should use defaults
props := map[string]any{}
requests := []*Request{{ID: "1", Type: "DATA", LatencyMS: 0}}
output, healthy := logic.Tick(props, requests, 1)
if !healthy {
t.Error("Expected pipeline to be healthy with default values")
}
if len(output) != 0 {
t.Error("Expected no immediate output")
}
// Should use default batch size and transformation
state, ok := props["_pipelineState"].(PipelineState)
if !ok {
t.Error("Expected pipeline state to be created with defaults")
}
if len(state.ProcessingQueue) != 1 {
t.Error("Expected one batch with default settings")
}
}
func TestDataPipelineLogic_PipelineStats(t *testing.T) {
logic := DataPipelineLogic{}
props := map[string]any{
"batchSize": 5.0,
"transformation": "filter",
}
// Initial stats should be empty
stats := logic.GetPipelineStats(props)
if stats["completedBatches"] != 0 {
t.Error("Expected initial completed batches to be 0")
}
// Process some data
requests := make([]*Request, 10)
for i := range requests {
requests[i] = &Request{ID: string(rune('1' + i)), Type: "DATA", LatencyMS: 0}
}
logic.Tick(props, requests, 1)
// Check stats after processing
stats = logic.GetPipelineStats(props)
if stats["queuedBatches"] != 2 {
t.Errorf("Expected 2 queued batches, got %v", stats["queuedBatches"])
}
if stats["backlogSize"] != 10 {
t.Errorf("Expected backlog size of 10, got %v", stats["backlogSize"])
}
}
func TestDataPipelineLogic_ContinuousProcessing(t *testing.T) {
logic := DataPipelineLogic{}
props := map[string]any{
"batchSize": 5.0,
"transformation": "map",
}
// Process multiple waves of data
totalOutput := 0
for wave := 0; wave < 3; wave++ {
requests := make([]*Request, 5)
for i := range requests {
requests[i] = &Request{ID: string(rune('A' + wave*5 + i)), Type: "DATA", LatencyMS: 0}
}
// Process each wave
for tick := wave*10 + 1; tick <= wave*10+5; tick++ {
var output []*Request
if tick == wave*10+1 {
output, _ = logic.Tick(props, requests, tick)
} else {
output, _ = logic.Tick(props, []*Request{}, tick)
}
totalOutput += len(output)
}
}
// Should have processed all data eventually
if totalOutput != 15 {
t.Errorf("Expected 15 total output records, got %d", totalOutput)
}
// Check final stats
stats := logic.GetPipelineStats(props)
if stats["totalRecords"] != 15 {
t.Errorf("Expected 15 total records processed, got %v", stats["totalRecords"])
}
}
func TestDataPipelineLogic_EmptyQueue(t *testing.T) {
logic := DataPipelineLogic{}
props := map[string]any{
"batchSize": 10.0,
"transformation": "map",
}
// Process empty queue
output, healthy := logic.Tick(props, []*Request{}, 1)
if !healthy {
t.Error("Expected pipeline to be healthy with empty queue")
}
if len(output) != 0 {
t.Error("Expected no output with empty queue")
}
// State should be initialized but empty
state, ok := props["_pipelineState"].(PipelineState)
if !ok {
t.Error("Expected pipeline state to be initialized")
}
if len(state.ProcessingQueue) != 0 {
t.Error("Expected empty processing queue")
}
}