From a05837b985decb10340bc8d4376461b2ecc48865 Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Sun, 2 Nov 2025 22:14:45 +0530 Subject: [PATCH] refactor(streaming): Optimize image collection and debounce mechanism --- lib/core/services/streaming_helper.dart | 142 +++++++++++++----- .../chat/providers/chat_providers.dart | 7 - .../widgets/assistant_message_widget.dart | 119 ++++++++++++--- .../profile/views/app_customization_page.dart | 12 +- 4 files changed, 211 insertions(+), 69 deletions(-) diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index 682c11e..c546181 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -277,6 +277,13 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ )..start(); } + Timer? imageCollectionDebounce; + String? pendingImageContent; + String? pendingImageMessageId; + String? pendingImageSignature; + String? lastProcessedImageSignature; + int imageCollectionRequestId = 0; + void disposeSocketSubscriptions() { if (socketSubscriptions.isEmpty) { return; @@ -287,56 +294,119 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } catch (_) {} } socketSubscriptions.clear(); + imageCollectionDebounce?.cancel(); + imageCollectionDebounce = null; + pendingImageContent = null; + pendingImageMessageId = null; + pendingImageSignature = null; + lastProcessedImageSignature = null; + imageCollectionRequestId = 0; socketWatchdog?.stop(); } bool isSearching = false; + void runPendingImageCollection() { + imageCollectionDebounce?.cancel(); + imageCollectionDebounce = null; + + final content = pendingImageContent; + final targetMessageId = pendingImageMessageId; + final signature = pendingImageSignature; + if (content == null || targetMessageId == null || signature == null) { + return; + } + + pendingImageContent = null; + pendingImageMessageId = null; + pendingImageSignature = null; + + final requestId = ++imageCollectionRequestId; + unawaited( + workerManager + .schedule>>( + _collectImageReferencesWorker, + content, + debugLabel: 'stream_collect_images', + ) + .then((collected) { + if (requestId != imageCollectionRequestId) { + return; + } + + final currentMessages = getMessages(); + if (currentMessages.isEmpty) { + return; + } + final last = currentMessages.last; + if (last.id != targetMessageId || last.role != 'assistant') { + return; + } + + lastProcessedImageSignature = signature; + + if (collected.isEmpty) { + return; + } + + final existing = last.files ?? >[]; + final seen = { + for (final f in existing) + if (f['url'] is String) (f['url'] as String) else '', + }..removeWhere((e) => e.isEmpty); + + final merged = >[...existing]; + for (final f in collected) { + final url = f['url'] as String?; + if (url != null && url.isNotEmpty && !seen.contains(url)) { + merged.add({'type': 'image', 'url': url}); + seen.add(url); + } + } + + if (merged.length != existing.length) { + updateLastMessageWith((m) => m.copyWith(files: merged)); + } + }) + .catchError((_) {}), + ); + } + void updateImagesFromCurrentContent() { try { final msgs = getMessages(); if (msgs.isEmpty || msgs.last.role != 'assistant') return; - final content = msgs.last.content; + final last = msgs.last; + final content = last.content; if (content.isEmpty) return; - final targetMessageId = msgs.last.id; - unawaited( - workerManager - .schedule>>( - _collectImageReferencesWorker, - content, - debugLabel: 'stream_collect_images', - ) - .then((collected) { - if (collected.isEmpty) return; - final currentMessages = getMessages(); - if (currentMessages.isEmpty) return; - final last = currentMessages.last; - if (last.id != targetMessageId || last.role != 'assistant') { - return; - } + final targetMessageId = last.id; + final signature = + '$targetMessageId:${content.hashCode}:${content.length}'; - final existing = last.files ?? >[]; - final seen = { - for (final f in existing) - if (f['url'] is String) (f['url'] as String) else '', - }..removeWhere((e) => e.isEmpty); + if (signature == lastProcessedImageSignature && + pendingImageSignature == null) { + return; + } + if (signature == pendingImageSignature) { + return; + } - final merged = >[...existing]; - for (final f in collected) { - final url = f['url'] as String?; - if (url != null && url.isNotEmpty && !seen.contains(url)) { - merged.add({'type': 'image', 'url': url}); - seen.add(url); - } - } + pendingImageMessageId = targetMessageId; + pendingImageContent = content; + pendingImageSignature = signature; - if (merged.length != existing.length) { - updateLastMessageWith((m) => m.copyWith(files: merged)); - } - }) - .catchError((_) {}), - ); + final shouldDelay = last.isStreaming; + + imageCollectionDebounce?.cancel(); + if (shouldDelay) { + imageCollectionDebounce = Timer( + const Duration(milliseconds: 200), + runPendingImageCollection, + ); + } else { + runPendingImageCollection(); + } } catch (_) {} } diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index cea62f7..c9f8307 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -647,13 +647,6 @@ class ChatMessagesNotifier extends Notifier> { return; } - // Log content replacement for debugging - DebugLogger.log( - 'Replacing message content: messageId=${lastMessage.id}, ' - 'oldLength=${lastMessage.content.length}, newLength=${content.length}', - scope: 'chat/providers', - ); - _ensureFormatterForMessage(lastMessage); // Defensive check: ensure the formatter is for the correct message diff --git a/lib/features/chat/widgets/assistant_message_widget.dart b/lib/features/chat/widgets/assistant_message_widget.dart index bfd348c..570ee3f 100644 --- a/lib/features/chat/widgets/assistant_message_widget.dart +++ b/lib/features/chat/widgets/assistant_message_widget.dart @@ -71,6 +71,11 @@ class _AssistantMessageWidgetState extends ConsumerState bool _allowTypingIndicator = false; Timer? _typingGateTimer; String _ttsPlainText = ''; + Timer? _ttsPlainTextDebounce; + Map? _pendingTtsPlainTextPayload; + String? _pendingTtsPlainTextSource; + String? _lastAppliedTtsPlainTextSource; + int _ttsPlainTextRequestId = 0; // Active version index (-1 means current/live content) int _activeVersionIndex = -1; // press state handled by shared ChatActionButton @@ -162,13 +167,11 @@ class _AssistantMessageWidgetState extends ConsumerState final rSegs = ReasoningParser.segments(raw); final out = []; - final textBuf = StringBuffer(); final textSegments = []; if (rSegs == null || rSegs.isEmpty) { final tSegs = ToolCallsParser.segments(raw); if (tSegs == null || tSegs.isEmpty) { out.add(MessageSegment.text(raw)); - textBuf.write(raw); textSegments.add(raw); } else { for (final s in tSegs) { @@ -176,7 +179,6 @@ class _AssistantMessageWidgetState extends ConsumerState out.add(MessageSegment.tool(s.entry!)); } else if ((s.text ?? '').isNotEmpty) { out.add(MessageSegment.text(s.text!)); - textBuf.write(s.text); textSegments.add(s.text!); } } @@ -190,7 +192,6 @@ class _AssistantMessageWidgetState extends ConsumerState final tSegs = ToolCallsParser.segments(t); if (tSegs == null || tSegs.isEmpty) { out.add(MessageSegment.text(t)); - textBuf.write(t); textSegments.add(t); } else { for (final s in tSegs) { @@ -198,7 +199,6 @@ class _AssistantMessageWidgetState extends ConsumerState out.add(MessageSegment.tool(s.entry!)); } else if ((s.text ?? '').isNotEmpty) { out.add(MessageSegment.text(s.text!)); - textBuf.write(s.text); textSegments.add(s.text!); } } @@ -208,23 +208,15 @@ class _AssistantMessageWidgetState extends ConsumerState } final segments = out.isEmpty ? [MessageSegment.text(raw)] : out; - String speechText; - try { - final worker = ref.read(workerManagerProvider); - speechText = await worker.schedule, String>( - _buildTtsPlainTextWorker, - {'segments': textSegments, 'fallback': raw}, - debugLabel: 'tts_plain_text', - ); - } catch (_) { - speechText = _buildTtsPlainTextFallback(textSegments, raw); - } if (!mounted) return; setState(() { _segments = segments; - _ttsPlainText = speechText; }); + _scheduleTtsPlainTextBuild( + List.from(textSegments, growable: false), + raw, + ); _updateTypingIndicatorGate(); } @@ -290,6 +282,96 @@ class _AssistantMessageWidgetState extends ConsumerState return result; } + void _scheduleTtsPlainTextBuild(List segments, String raw) { + final hasContent = + segments.any((segment) => segment.trim().isNotEmpty) || + raw.trim().isNotEmpty; + if (!hasContent) { + _pendingTtsPlainTextPayload = null; + _pendingTtsPlainTextSource = null; + _lastAppliedTtsPlainTextSource = ''; + if (_ttsPlainText.isNotEmpty && mounted) { + setState(() { + _ttsPlainText = ''; + }); + } + return; + } + + if (_pendingTtsPlainTextPayload == null && + raw == _lastAppliedTtsPlainTextSource) { + return; + } + if (raw == _pendingTtsPlainTextSource && + _pendingTtsPlainTextPayload != null) { + return; + } + + final pendingSegments = List.from(segments, growable: false); + _pendingTtsPlainTextPayload = { + 'segments': pendingSegments, + 'fallback': raw, + }; + _pendingTtsPlainTextSource = raw; + + final delay = widget.isStreaming + ? const Duration(milliseconds: 250) + : Duration.zero; + + _ttsPlainTextDebounce?.cancel(); + if (delay == Duration.zero) { + _runPendingTtsPlainTextBuild(); + } else { + _ttsPlainTextDebounce = Timer(delay, _runPendingTtsPlainTextBuild); + } + } + + void _runPendingTtsPlainTextBuild() { + _ttsPlainTextDebounce?.cancel(); + _ttsPlainTextDebounce = null; + + final payload = _pendingTtsPlainTextPayload; + final source = _pendingTtsPlainTextSource; + if (payload == null || source == null) { + return; + } + + _pendingTtsPlainTextPayload = null; + _pendingTtsPlainTextSource = null; + final requestId = ++_ttsPlainTextRequestId; + unawaited(_executeTtsPlainTextBuild(payload, source, requestId)); + } + + Future _executeTtsPlainTextBuild( + Map payload, + String raw, + int requestId, + ) async { + final segments = (payload['segments'] as List).cast(); + String speechText; + try { + final worker = ref.read(workerManagerProvider); + speechText = await worker.schedule, String>( + _buildTtsPlainTextWorker, + payload, + debugLabel: 'tts_plain_text', + ); + } catch (_) { + speechText = _buildTtsPlainTextFallback(segments, raw); + } + + if (!mounted || requestId != _ttsPlainTextRequestId) { + return; + } + + _lastAppliedTtsPlainTextSource = raw; + if (_ttsPlainText != speechText) { + setState(() { + _ttsPlainText = speechText; + }); + } + } + // No streaming-specific markdown fixes needed here; handled by Markdown widget Widget _buildToolCallTile(ToolCallEntry tc) { @@ -622,6 +704,9 @@ class _AssistantMessageWidgetState extends ConsumerState @override void dispose() { _typingGateTimer?.cancel(); + _ttsPlainTextDebounce?.cancel(); + _pendingTtsPlainTextPayload = null; + _pendingTtsPlainTextSource = null; _fadeController.dispose(); _slideController.dispose(); super.dispose(); diff --git a/lib/features/profile/views/app_customization_page.dart b/lib/features/profile/views/app_customization_page.dart index 490ea43..2236d44 100644 --- a/lib/features/profile/views/app_customization_page.dart +++ b/lib/features/profile/views/app_customization_page.dart @@ -871,19 +871,13 @@ class AppCustomizationPage extends ConsumerWidget { duration: const Duration(milliseconds: 200), child: Text( ttsDescription, - key: ValueKey( - 'tts-desc-${settings.ttsEngine.name}', - ), + key: ValueKey('tts-desc-${settings.ttsEngine.name}'), style: theme.bodyMedium?.copyWith( - color: theme.sidebarForeground.withValues( - alpha: 0.9, - ), + color: theme.sidebarForeground.withValues(alpha: 0.9), ) ?? TextStyle( - color: theme.sidebarForeground.withValues( - alpha: 0.9, - ), + color: theme.sidebarForeground.withValues(alpha: 0.9), fontSize: 14, ), ),