You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

115 lines
3.3 KiB

package simulation
type MessageQueueLogic struct{}
type QueuedMessage struct {
RequestID string
Timestamp int
MessageData string
RetryCount int
}
func (mq MessageQueueLogic) Tick(props map[string]any, queue []*Request, tick int) ([]*Request, bool) {
// Extract message queue properties
queueCapacity := int(AsFloat64(props["queueCapacity"]))
if queueCapacity == 0 {
queueCapacity = 1000 // default capacity
}
retentionSeconds := int(AsFloat64(props["retentionSeconds"]))
if retentionSeconds == 0 {
retentionSeconds = 86400 // default 24 hours in seconds
}
// Processing rate (messages per tick)
processingRate := int(AsFloat64(props["processingRate"]))
if processingRate == 0 {
processingRate = 100 // default 100 messages per tick
}
// Current timestamp for this tick
currentTime := tick * 100 // assuming 100ms per tick
// Initialize queue storage in props
messageQueue, ok := props["_messageQueue"].([]QueuedMessage)
if !ok {
messageQueue = []QueuedMessage{}
}
// Clean up expired messages based on retention policy
messageQueue = mq.cleanExpiredMessages(messageQueue, currentTime, retentionSeconds*1000)
// First, process existing messages from the queue (FIFO order)
output := []*Request{}
messagesToProcess := len(messageQueue)
if messagesToProcess > processingRate {
messagesToProcess = processingRate
}
for i := 0; i < messagesToProcess; i++ {
if len(messageQueue) == 0 {
break
}
// Dequeue message (FIFO - take from front)
message := messageQueue[0]
messageQueue = messageQueue[1:]
// Create request for downstream processing
processedReq := &Request{
ID: message.RequestID,
Timestamp: message.Timestamp,
LatencyMS: 2, // Small latency for queue processing
Origin: "message-queue",
Type: "PROCESS",
Path: []string{"queued-message"},
}
output = append(output, processedReq)
}
// Then, add incoming requests to the queue for next tick
for _, req := range queue {
// Check if queue is at capacity
if len(messageQueue) >= queueCapacity {
// Queue full - message is dropped (or could implement backpressure)
// For now, we'll drop the message and add latency penalty
reqCopy := *req
reqCopy.LatencyMS += 1000 // High latency penalty for dropped messages
reqCopy.Path = append(reqCopy.Path, "queue-full-dropped")
// Don't add to output as message was dropped
continue
}
// Add message to queue
message := QueuedMessage{
RequestID: req.ID,
Timestamp: currentTime,
MessageData: "message-payload", // In real system, this would be the actual message
RetryCount: 0,
}
messageQueue = append(messageQueue, message)
}
// Update queue storage in props
props["_messageQueue"] = messageQueue
// Queue is healthy if not at capacity or if we can still process messages
// Queue becomes unhealthy only when completely full AND we can't process anything
healthy := len(messageQueue) < queueCapacity || processingRate > 0
return output, healthy
}
func (mq MessageQueueLogic) cleanExpiredMessages(messageQueue []QueuedMessage, currentTime, retentionMs int) []QueuedMessage {
cleaned := []QueuedMessage{}
for _, message := range messageQueue {
if (currentTime - message.Timestamp) <= retentionMs {
cleaned = append(cleaned, message)
}
// Expired messages are dropped
}
return cleaned
}