fix(streaming): prevent late events from corrupting message stream
This commit is contained in:
@@ -1214,18 +1214,40 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
|||||||
} else if ((type == 'chat:message:delta' || type == 'message') &&
|
} else if ((type == 'chat:message:delta' || type == 'message') &&
|
||||||
payload != null) {
|
payload != null) {
|
||||||
// Incremental message content over socket
|
// Incremental message content over socket
|
||||||
|
// Validate message ID to prevent late events from previous turns
|
||||||
|
// from corrupting the current assistant message
|
||||||
|
if (messageId != null &&
|
||||||
|
messageId.isNotEmpty &&
|
||||||
|
messageId != assistantMessageId) {
|
||||||
|
DebugLogger.log(
|
||||||
|
'Ignoring delta for wrong message: $messageId (expected $assistantMessageId)',
|
||||||
|
scope: 'streaming/helper',
|
||||||
|
);
|
||||||
|
} else {
|
||||||
final content = payload['content']?.toString() ?? '';
|
final content = payload['content']?.toString() ?? '';
|
||||||
if (content.isNotEmpty) {
|
if (content.isNotEmpty) {
|
||||||
appendToLastMessage(content);
|
appendToLastMessage(content);
|
||||||
updateImagesFromCurrentContent();
|
updateImagesFromCurrentContent();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else if ((type == 'chat:message' || type == 'replace') &&
|
} else if ((type == 'chat:message' || type == 'replace') &&
|
||||||
payload != null) {
|
payload != null) {
|
||||||
// Full message replacement over socket
|
// Full message replacement over socket
|
||||||
|
// Validate message ID to prevent late events from previous turns
|
||||||
|
// from corrupting the current assistant message
|
||||||
|
if (messageId != null &&
|
||||||
|
messageId.isNotEmpty &&
|
||||||
|
messageId != assistantMessageId) {
|
||||||
|
DebugLogger.log(
|
||||||
|
'Ignoring replace for wrong message: $messageId (expected $assistantMessageId)',
|
||||||
|
scope: 'streaming/helper',
|
||||||
|
);
|
||||||
|
} else {
|
||||||
final content = payload['content']?.toString() ?? '';
|
final content = payload['content']?.toString() ?? '';
|
||||||
if (content.isNotEmpty) {
|
if (content.isNotEmpty) {
|
||||||
replaceLastMessageContent(content);
|
replaceLastMessageContent(content);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else if ((type == 'chat:message:files') && payload != null) {
|
} else if ((type == 'chat:message:files') && payload != null) {
|
||||||
// Alias for files event used by web client
|
// Alias for files event used by web client
|
||||||
try {
|
try {
|
||||||
@@ -1347,11 +1369,21 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (type == 'event:message:delta' && payload != null) {
|
} else if (type == 'event:message:delta' && payload != null) {
|
||||||
|
// Validate message ID to prevent late events from previous turns
|
||||||
|
if (messageId != null &&
|
||||||
|
messageId.isNotEmpty &&
|
||||||
|
messageId != assistantMessageId) {
|
||||||
|
DebugLogger.log(
|
||||||
|
'Ignoring event delta for wrong message: $messageId (expected $assistantMessageId)',
|
||||||
|
scope: 'streaming/helper',
|
||||||
|
);
|
||||||
|
} else {
|
||||||
final content = payload['content']?.toString() ?? '';
|
final content = payload['content']?.toString() ?? '';
|
||||||
if (content.isNotEmpty) {
|
if (content.isNotEmpty) {
|
||||||
appendToLastMessage(content);
|
appendToLastMessage(content);
|
||||||
updateImagesFromCurrentContent();
|
updateImagesFromCurrentContent();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Log unknown event types to catch any follow-up events we might be missing
|
// Log unknown event types to catch any follow-up events we might be missing
|
||||||
if (type != null && type.toString().contains('follow')) {
|
if (type != null && type.toString().contains('follow')) {
|
||||||
|
|||||||
Reference in New Issue
Block a user