From 65cd902d5cd1aedcc5dc1797dc6ee0d45e3311d3 Mon Sep 17 00:00:00 2001 From: cogwheel <172976095+cogwheel0@users.noreply.github.com> Date: Tue, 13 Jan 2026 21:33:00 +0530 Subject: [PATCH] fix(streaming): prevent late events from corrupting message stream --- lib/core/services/streaming_helper.dart | 54 ++++++++++++++++++++----- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index 509a3f5..4f2115d 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -1214,17 +1214,39 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } else if ((type == 'chat:message:delta' || type == 'message') && payload != null) { // Incremental message content over socket - final content = payload['content']?.toString() ?? ''; - if (content.isNotEmpty) { - appendToLastMessage(content); - updateImagesFromCurrentContent(); + // Validate message ID to prevent late events from previous turns + // from corrupting the current assistant message + if (messageId != null && + messageId.isNotEmpty && + messageId != assistantMessageId) { + DebugLogger.log( + 'Ignoring delta for wrong message: $messageId (expected $assistantMessageId)', + scope: 'streaming/helper', + ); + } else { + final content = payload['content']?.toString() ?? ''; + if (content.isNotEmpty) { + appendToLastMessage(content); + updateImagesFromCurrentContent(); + } } } else if ((type == 'chat:message' || type == 'replace') && payload != null) { // Full message replacement over socket - final content = payload['content']?.toString() ?? ''; - if (content.isNotEmpty) { - replaceLastMessageContent(content); + // Validate message ID to prevent late events from previous turns + // from corrupting the current assistant message + if (messageId != null && + messageId.isNotEmpty && + messageId != assistantMessageId) { + DebugLogger.log( + 'Ignoring replace for wrong message: $messageId (expected $assistantMessageId)', + scope: 'streaming/helper', + ); + } else { + final content = payload['content']?.toString() ?? ''; + if (content.isNotEmpty) { + replaceLastMessageContent(content); + } } } else if ((type == 'chat:message:files') && payload != null) { // Alias for files event used by web client @@ -1347,10 +1369,20 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } } } else if (type == 'event:message:delta' && payload != null) { - final content = payload['content']?.toString() ?? ''; - if (content.isNotEmpty) { - appendToLastMessage(content); - updateImagesFromCurrentContent(); + // Validate message ID to prevent late events from previous turns + if (messageId != null && + messageId.isNotEmpty && + messageId != assistantMessageId) { + DebugLogger.log( + 'Ignoring event delta for wrong message: $messageId (expected $assistantMessageId)', + scope: 'streaming/helper', + ); + } else { + final content = payload['content']?.toString() ?? ''; + if (content.isNotEmpty) { + appendToLastMessage(content); + updateImagesFromCurrentContent(); + } } } else { // Log unknown event types to catch any follow-up events we might be missing