refactor(streaming): Optimize image collection and debounce mechanism

This commit is contained in:
cogwheel0
2025-11-02 22:14:45 +05:30
parent cfadeffd24
commit a05837b985
4 changed files with 211 additions and 69 deletions

View File

@@ -277,6 +277,13 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
)..start(); )..start();
} }
Timer? imageCollectionDebounce;
String? pendingImageContent;
String? pendingImageMessageId;
String? pendingImageSignature;
String? lastProcessedImageSignature;
int imageCollectionRequestId = 0;
void disposeSocketSubscriptions() { void disposeSocketSubscriptions() {
if (socketSubscriptions.isEmpty) { if (socketSubscriptions.isEmpty) {
return; return;
@@ -287,19 +294,34 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} catch (_) {} } catch (_) {}
} }
socketSubscriptions.clear(); socketSubscriptions.clear();
imageCollectionDebounce?.cancel();
imageCollectionDebounce = null;
pendingImageContent = null;
pendingImageMessageId = null;
pendingImageSignature = null;
lastProcessedImageSignature = null;
imageCollectionRequestId = 0;
socketWatchdog?.stop(); socketWatchdog?.stop();
} }
bool isSearching = false; bool isSearching = false;
void updateImagesFromCurrentContent() { void runPendingImageCollection() {
try { imageCollectionDebounce?.cancel();
final msgs = getMessages(); imageCollectionDebounce = null;
if (msgs.isEmpty || msgs.last.role != 'assistant') return;
final content = msgs.last.content;
if (content.isEmpty) return;
final targetMessageId = msgs.last.id; final content = pendingImageContent;
final targetMessageId = pendingImageMessageId;
final signature = pendingImageSignature;
if (content == null || targetMessageId == null || signature == null) {
return;
}
pendingImageContent = null;
pendingImageMessageId = null;
pendingImageSignature = null;
final requestId = ++imageCollectionRequestId;
unawaited( unawaited(
workerManager workerManager
.schedule<String, List<Map<String, dynamic>>>( .schedule<String, List<Map<String, dynamic>>>(
@@ -308,14 +330,25 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
debugLabel: 'stream_collect_images', debugLabel: 'stream_collect_images',
) )
.then((collected) { .then((collected) {
if (collected.isEmpty) return; if (requestId != imageCollectionRequestId) {
return;
}
final currentMessages = getMessages(); final currentMessages = getMessages();
if (currentMessages.isEmpty) return; if (currentMessages.isEmpty) {
return;
}
final last = currentMessages.last; final last = currentMessages.last;
if (last.id != targetMessageId || last.role != 'assistant') { if (last.id != targetMessageId || last.role != 'assistant') {
return; return;
} }
lastProcessedImageSignature = signature;
if (collected.isEmpty) {
return;
}
final existing = last.files ?? <Map<String, dynamic>>[]; final existing = last.files ?? <Map<String, dynamic>>[];
final seen = <String>{ final seen = <String>{
for (final f in existing) for (final f in existing)
@@ -337,6 +370,43 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
}) })
.catchError((_) {}), .catchError((_) {}),
); );
}
void updateImagesFromCurrentContent() {
try {
final msgs = getMessages();
if (msgs.isEmpty || msgs.last.role != 'assistant') return;
final last = msgs.last;
final content = last.content;
if (content.isEmpty) return;
final targetMessageId = last.id;
final signature =
'$targetMessageId:${content.hashCode}:${content.length}';
if (signature == lastProcessedImageSignature &&
pendingImageSignature == null) {
return;
}
if (signature == pendingImageSignature) {
return;
}
pendingImageMessageId = targetMessageId;
pendingImageContent = content;
pendingImageSignature = signature;
final shouldDelay = last.isStreaming;
imageCollectionDebounce?.cancel();
if (shouldDelay) {
imageCollectionDebounce = Timer(
const Duration(milliseconds: 200),
runPendingImageCollection,
);
} else {
runPendingImageCollection();
}
} catch (_) {} } catch (_) {}
} }

View File

@@ -647,13 +647,6 @@ class ChatMessagesNotifier extends Notifier<List<ChatMessage>> {
return; return;
} }
// Log content replacement for debugging
DebugLogger.log(
'Replacing message content: messageId=${lastMessage.id}, '
'oldLength=${lastMessage.content.length}, newLength=${content.length}',
scope: 'chat/providers',
);
_ensureFormatterForMessage(lastMessage); _ensureFormatterForMessage(lastMessage);
// Defensive check: ensure the formatter is for the correct message // Defensive check: ensure the formatter is for the correct message

View File

@@ -71,6 +71,11 @@ class _AssistantMessageWidgetState extends ConsumerState<AssistantMessageWidget>
bool _allowTypingIndicator = false; bool _allowTypingIndicator = false;
Timer? _typingGateTimer; Timer? _typingGateTimer;
String _ttsPlainText = ''; String _ttsPlainText = '';
Timer? _ttsPlainTextDebounce;
Map<String, dynamic>? _pendingTtsPlainTextPayload;
String? _pendingTtsPlainTextSource;
String? _lastAppliedTtsPlainTextSource;
int _ttsPlainTextRequestId = 0;
// Active version index (-1 means current/live content) // Active version index (-1 means current/live content)
int _activeVersionIndex = -1; int _activeVersionIndex = -1;
// press state handled by shared ChatActionButton // press state handled by shared ChatActionButton
@@ -162,13 +167,11 @@ class _AssistantMessageWidgetState extends ConsumerState<AssistantMessageWidget>
final rSegs = ReasoningParser.segments(raw); final rSegs = ReasoningParser.segments(raw);
final out = <MessageSegment>[]; final out = <MessageSegment>[];
final textBuf = StringBuffer();
final textSegments = <String>[]; final textSegments = <String>[];
if (rSegs == null || rSegs.isEmpty) { if (rSegs == null || rSegs.isEmpty) {
final tSegs = ToolCallsParser.segments(raw); final tSegs = ToolCallsParser.segments(raw);
if (tSegs == null || tSegs.isEmpty) { if (tSegs == null || tSegs.isEmpty) {
out.add(MessageSegment.text(raw)); out.add(MessageSegment.text(raw));
textBuf.write(raw);
textSegments.add(raw); textSegments.add(raw);
} else { } else {
for (final s in tSegs) { for (final s in tSegs) {
@@ -176,7 +179,6 @@ class _AssistantMessageWidgetState extends ConsumerState<AssistantMessageWidget>
out.add(MessageSegment.tool(s.entry!)); out.add(MessageSegment.tool(s.entry!));
} else if ((s.text ?? '').isNotEmpty) { } else if ((s.text ?? '').isNotEmpty) {
out.add(MessageSegment.text(s.text!)); out.add(MessageSegment.text(s.text!));
textBuf.write(s.text);
textSegments.add(s.text!); textSegments.add(s.text!);
} }
} }
@@ -190,7 +192,6 @@ class _AssistantMessageWidgetState extends ConsumerState<AssistantMessageWidget>
final tSegs = ToolCallsParser.segments(t); final tSegs = ToolCallsParser.segments(t);
if (tSegs == null || tSegs.isEmpty) { if (tSegs == null || tSegs.isEmpty) {
out.add(MessageSegment.text(t)); out.add(MessageSegment.text(t));
textBuf.write(t);
textSegments.add(t); textSegments.add(t);
} else { } else {
for (final s in tSegs) { for (final s in tSegs) {
@@ -198,7 +199,6 @@ class _AssistantMessageWidgetState extends ConsumerState<AssistantMessageWidget>
out.add(MessageSegment.tool(s.entry!)); out.add(MessageSegment.tool(s.entry!));
} else if ((s.text ?? '').isNotEmpty) { } else if ((s.text ?? '').isNotEmpty) {
out.add(MessageSegment.text(s.text!)); out.add(MessageSegment.text(s.text!));
textBuf.write(s.text);
textSegments.add(s.text!); textSegments.add(s.text!);
} }
} }
@@ -208,23 +208,15 @@ class _AssistantMessageWidgetState extends ConsumerState<AssistantMessageWidget>
} }
final segments = out.isEmpty ? [MessageSegment.text(raw)] : out; final segments = out.isEmpty ? [MessageSegment.text(raw)] : out;
String speechText;
try {
final worker = ref.read(workerManagerProvider);
speechText = await worker.schedule<Map<String, dynamic>, String>(
_buildTtsPlainTextWorker,
{'segments': textSegments, 'fallback': raw},
debugLabel: 'tts_plain_text',
);
} catch (_) {
speechText = _buildTtsPlainTextFallback(textSegments, raw);
}
if (!mounted) return; if (!mounted) return;
setState(() { setState(() {
_segments = segments; _segments = segments;
_ttsPlainText = speechText;
}); });
_scheduleTtsPlainTextBuild(
List<String>.from(textSegments, growable: false),
raw,
);
_updateTypingIndicatorGate(); _updateTypingIndicatorGate();
} }
@@ -290,6 +282,96 @@ class _AssistantMessageWidgetState extends ConsumerState<AssistantMessageWidget>
return result; return result;
} }
void _scheduleTtsPlainTextBuild(List<String> segments, String raw) {
final hasContent =
segments.any((segment) => segment.trim().isNotEmpty) ||
raw.trim().isNotEmpty;
if (!hasContent) {
_pendingTtsPlainTextPayload = null;
_pendingTtsPlainTextSource = null;
_lastAppliedTtsPlainTextSource = '';
if (_ttsPlainText.isNotEmpty && mounted) {
setState(() {
_ttsPlainText = '';
});
}
return;
}
if (_pendingTtsPlainTextPayload == null &&
raw == _lastAppliedTtsPlainTextSource) {
return;
}
if (raw == _pendingTtsPlainTextSource &&
_pendingTtsPlainTextPayload != null) {
return;
}
final pendingSegments = List<String>.from(segments, growable: false);
_pendingTtsPlainTextPayload = {
'segments': pendingSegments,
'fallback': raw,
};
_pendingTtsPlainTextSource = raw;
final delay = widget.isStreaming
? const Duration(milliseconds: 250)
: Duration.zero;
_ttsPlainTextDebounce?.cancel();
if (delay == Duration.zero) {
_runPendingTtsPlainTextBuild();
} else {
_ttsPlainTextDebounce = Timer(delay, _runPendingTtsPlainTextBuild);
}
}
void _runPendingTtsPlainTextBuild() {
_ttsPlainTextDebounce?.cancel();
_ttsPlainTextDebounce = null;
final payload = _pendingTtsPlainTextPayload;
final source = _pendingTtsPlainTextSource;
if (payload == null || source == null) {
return;
}
_pendingTtsPlainTextPayload = null;
_pendingTtsPlainTextSource = null;
final requestId = ++_ttsPlainTextRequestId;
unawaited(_executeTtsPlainTextBuild(payload, source, requestId));
}
Future<void> _executeTtsPlainTextBuild(
Map<String, dynamic> payload,
String raw,
int requestId,
) async {
final segments = (payload['segments'] as List).cast<String>();
String speechText;
try {
final worker = ref.read(workerManagerProvider);
speechText = await worker.schedule<Map<String, dynamic>, String>(
_buildTtsPlainTextWorker,
payload,
debugLabel: 'tts_plain_text',
);
} catch (_) {
speechText = _buildTtsPlainTextFallback(segments, raw);
}
if (!mounted || requestId != _ttsPlainTextRequestId) {
return;
}
_lastAppliedTtsPlainTextSource = raw;
if (_ttsPlainText != speechText) {
setState(() {
_ttsPlainText = speechText;
});
}
}
// No streaming-specific markdown fixes needed here; handled by Markdown widget // No streaming-specific markdown fixes needed here; handled by Markdown widget
Widget _buildToolCallTile(ToolCallEntry tc) { Widget _buildToolCallTile(ToolCallEntry tc) {
@@ -622,6 +704,9 @@ class _AssistantMessageWidgetState extends ConsumerState<AssistantMessageWidget>
@override @override
void dispose() { void dispose() {
_typingGateTimer?.cancel(); _typingGateTimer?.cancel();
_ttsPlainTextDebounce?.cancel();
_pendingTtsPlainTextPayload = null;
_pendingTtsPlainTextSource = null;
_fadeController.dispose(); _fadeController.dispose();
_slideController.dispose(); _slideController.dispose();
super.dispose(); super.dispose();

View File

@@ -871,19 +871,13 @@ class AppCustomizationPage extends ConsumerWidget {
duration: const Duration(milliseconds: 200), duration: const Duration(milliseconds: 200),
child: Text( child: Text(
ttsDescription, ttsDescription,
key: ValueKey<String>( key: ValueKey<String>('tts-desc-${settings.ttsEngine.name}'),
'tts-desc-${settings.ttsEngine.name}',
),
style: style:
theme.bodyMedium?.copyWith( theme.bodyMedium?.copyWith(
color: theme.sidebarForeground.withValues( color: theme.sidebarForeground.withValues(alpha: 0.9),
alpha: 0.9,
),
) ?? ) ??
TextStyle( TextStyle(
color: theme.sidebarForeground.withValues( color: theme.sidebarForeground.withValues(alpha: 0.9),
alpha: 0.9,
),
fontSize: 14, fontSize: 14,
), ),
), ),