From ada6d40e5e50c096d4a794a27b06e76715f37725 Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Sat, 1 Nov 2025 00:57:40 +0530 Subject: [PATCH] feat(chat): Add worker manager to streaming helper for image processing --- lib/core/services/streaming_helper.dart | 182 +++++++++------- lib/core/services/worker_manager.dart | 200 ++++++++++++++++++ .../chat/providers/chat_providers.dart | 3 + .../widgets/assistant_message_widget.dart | 65 ++++-- .../chat/widgets/enhanced_attachment.dart | 24 ++- .../widgets/enhanced_image_attachment.dart | 17 +- 6 files changed, 387 insertions(+), 104 deletions(-) create mode 100644 lib/core/services/worker_manager.dart diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index b023918..0602839 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -17,6 +17,7 @@ import '../utils/debug_logger.dart'; import '../utils/openwebui_source_parser.dart'; import 'streaming_response_controller.dart'; import 'api_service.dart'; +import 'worker_manager.dart'; // Keep local verbosity toggle for socket logs const bool kSocketVerboseLogging = false; @@ -43,6 +44,74 @@ final _imageFilePattern = RegExp( caseSensitive: false, ); +List> _collectImageReferencesWorker(String content) { + final collected = >[]; + if (content.isEmpty) { + return collected; + } + + if (content.contains('')) { + final parsed = ToolCallsParser.parse(content); + if (parsed != null) { + for (final entry in parsed.toolCalls) { + if (entry.files != null && entry.files!.isNotEmpty) { + collected.addAll(_extractFilesFromResult(entry.files)); + } + if (entry.result != null) { + collected.addAll(_extractFilesFromResult(entry.result)); + } + } + } + } + + if (collected.isNotEmpty) { + return collected; + } + + final base64Matches = _base64ImagePattern.allMatches(content); + for (final match in base64Matches) { + final url = match.group(0); + if (url != null && url.isNotEmpty) { + collected.add({'type': 'image', 'url': url}); + } + } + + final urlMatches = _urlImagePattern.allMatches(content); + for (final match in urlMatches) { + final url = match.group(0); + if (url != null && url.isNotEmpty) { + collected.add({'type': 'image', 'url': url}); + } + } + + final jsonMatches = _jsonImagePattern.allMatches(content); + for (final match in jsonMatches) { + final url = _jsonUrlExtractPattern + .firstMatch(match.group(0) ?? '') + ?.group(1); + if (url != null && url.isNotEmpty) { + collected.add({'type': 'image', 'url': url}); + } + } + + final partialMatches = _partialResultsPattern.allMatches(content); + for (final match in partialMatches) { + final attrValue = match.group(2); + if (attrValue == null) continue; + try { + final decoded = json.decode(attrValue); + collected.addAll(_extractFilesFromResult(decoded)); + } catch (_) { + if (attrValue.startsWith('data:image/') || + _imageFilePattern.hasMatch(attrValue)) { + collected.add({'type': 'image', 'url': attrValue}); + } + } + } + + return collected; +} + class ActiveSocketStream { ActiveSocketStream({ required this.controller, @@ -70,6 +139,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ required String? activeConversationId, required ApiService api, required SocketService? socketService, + required WorkerManager workerManager, RegisterConversationDeltaListener? registerDeltaListener, // Message update callbacks required void Function(String) appendToLastMessage, @@ -228,88 +298,44 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ final content = msgs.last.content; if (content.isEmpty) return; - final collected = >[]; - - // Quick check: only parse tool calls if complete details blocks exist - if (content.contains('')) { - final parsed = ToolCallsParser.parse(content); - if (parsed != null) { - for (final entry in parsed.toolCalls) { - if (entry.files != null && entry.files!.isNotEmpty) { - collected.addAll(_extractFilesFromResult(entry.files)); - } - if (entry.result != null) { - collected.addAll(_extractFilesFromResult(entry.result)); - } - } - } - } - - if (collected.isEmpty) { - // Use pre-compiled patterns for better performance - final base64Matches = _base64ImagePattern.allMatches(content); - for (final match in base64Matches) { - final url = match.group(0); - if (url != null && url.isNotEmpty) { - collected.add({'type': 'image', 'url': url}); - } - } - - final urlMatches = _urlImagePattern.allMatches(content); - for (final match in urlMatches) { - final url = match.group(0); - if (url != null && url.isNotEmpty) { - collected.add({'type': 'image', 'url': url}); - } - } - - final jsonMatches = _jsonImagePattern.allMatches(content); - for (final match in jsonMatches) { - final url = _jsonUrlExtractPattern - .firstMatch(match.group(0) ?? '') - ?.group(1); - if (url != null && url.isNotEmpty) { - collected.add({'type': 'image', 'url': url}); - } - } - - final partialMatches = _partialResultsPattern.allMatches(content); - for (final match in partialMatches) { - final attrValue = match.group(2); - if (attrValue != null) { - try { - final decoded = json.decode(attrValue); - collected.addAll(_extractFilesFromResult(decoded)); - } catch (_) { - if (attrValue.startsWith('data:image/') || - _imageFilePattern.hasMatch(attrValue)) { - collected.add({'type': 'image', 'url': attrValue}); + final targetMessageId = msgs.last.id; + unawaited( + workerManager + .schedule>>( + _collectImageReferencesWorker, + content, + debugLabel: 'stream_collect_images', + ) + .then((collected) { + if (collected.isEmpty) return; + final currentMessages = getMessages(); + if (currentMessages.isEmpty) return; + final last = currentMessages.last; + if (last.id != targetMessageId || last.role != 'assistant') { + return; } - } - } - } - } - if (collected.isEmpty) return; + final existing = last.files ?? >[]; + final seen = { + for (final f in existing) + if (f['url'] is String) (f['url'] as String) else '', + }..removeWhere((e) => e.isEmpty); - final existing = msgs.last.files ?? >[]; - final seen = { - for (final f in existing) - if (f['url'] is String) (f['url'] as String) else '', - }..removeWhere((e) => e.isEmpty); + final merged = >[...existing]; + for (final f in collected) { + final url = f['url'] as String?; + if (url != null && url.isNotEmpty && !seen.contains(url)) { + merged.add({'type': 'image', 'url': url}); + seen.add(url); + } + } - final merged = >[...existing]; - for (final f in collected) { - final url = f['url'] as String?; - if (url != null && url.isNotEmpty && !seen.contains(url)) { - merged.add({'type': 'image', 'url': url}); - seen.add(url); - } - } - - if (merged.length != existing.length) { - updateLastMessageWith((m) => m.copyWith(files: merged)); - } + if (merged.length != existing.length) { + updateLastMessageWith((m) => m.copyWith(files: merged)); + } + }) + .catchError((_) {}), + ); } catch (_) {} } diff --git a/lib/core/services/worker_manager.dart b/lib/core/services/worker_manager.dart new file mode 100644 index 0000000..a972524 --- /dev/null +++ b/lib/core/services/worker_manager.dart @@ -0,0 +1,200 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:math' as math; + +import 'package:flutter/foundation.dart'; +import 'package:riverpod_annotation/riverpod_annotation.dart'; + +import '../utils/debug_logger.dart'; + +part 'worker_manager.g.dart'; + +/// Signature of a task that can be executed by [WorkerManager]. +typedef WorkerTask = ComputeCallback; + +/// Coordinates CPU intensive work off the UI isolate with lightweight pooling. +/// +/// The manager throttles concurrent isolate usage to avoid overwhelming the +/// platform while still enabling parallel work. On web the callback executes +/// synchronously because secondary isolates are not supported. +class WorkerManager { + WorkerManager({int maxConcurrentTasks = _defaultMaxConcurrentTasks}) + : _maxConcurrentTasks = math.max(1, maxConcurrentTasks) { + DebugLogger.log( + 'initialized', + scope: 'worker', + data: {'max': _maxConcurrentTasks}, + ); + } + + static const int _defaultMaxConcurrentTasks = 2; + + final int _maxConcurrentTasks; + final Queue<_EnqueuedJob> _pendingJobs = Queue<_EnqueuedJob>(); + bool _disposed = false; + int _activeJobs = 0; + int _jobCounter = 0; + + /// Schedule [callback] with [message] to run on a worker isolate. + /// + /// The [callback] must be a top-level or static function, mirroring the + /// constraints of `compute`. Errors from the task are propagated to the + /// returned [Future]. + Future schedule( + WorkerTask callback, + Q message, { + String? debugLabel, + }) { + if (_disposed) { + return Future.error(StateError('WorkerManager has been disposed')); + } + + final jobId = ++_jobCounter; + final completer = Completer(); + final job = _EnqueuedJob( + id: jobId, + debugLabel: debugLabel, + run: () { + if (kIsWeb) { + return Future.sync(() => callback(message)); + } + return compute(callback, message); + }, + onComplete: (value) { + if (!completer.isCompleted) { + completer.complete(value as R); + } + }, + onError: (error, stackTrace) { + if (!completer.isCompleted) { + completer.completeError(error, stackTrace); + } + }, + ); + + _pendingJobs.add(job); + + DebugLogger.log( + 'queued', + scope: 'worker', + data: { + 'id': jobId, + if (debugLabel != null) 'label': debugLabel, + 'pending': _pendingJobs.length, + 'active': _activeJobs, + }, + ); + + _processQueue(); + + return completer.future; + } + + /// Dispose the manager and reject all pending work. + void dispose() { + if (_disposed) { + return; + } + _disposed = true; + + while (_pendingJobs.isNotEmpty) { + final job = _pendingJobs.removeFirst(); + job.cancel( + StateError('WorkerManager disposed before job ${job.id} started'), + ); + } + + DebugLogger.log('disposed', scope: 'worker', data: {'active': _activeJobs}); + } + + void _processQueue() { + if (_disposed) { + return; + } + + while (_activeJobs < _maxConcurrentTasks && _pendingJobs.isNotEmpty) { + final job = _pendingJobs.removeFirst(); + _startJob(job); + } + } + + void _startJob(_EnqueuedJob job) { + _activeJobs++; + + DebugLogger.log( + 'started', + scope: 'worker', + data: { + 'id': job.id, + if (job.debugLabel != null) 'label': job.debugLabel, + 'active': _activeJobs, + }, + ); + + unawaited(_runJob(job)); + } + + Future _runJob(_EnqueuedJob job) async { + try { + final result = await job.run(); + job.onComplete(result); + + DebugLogger.log( + 'completed', + scope: 'worker', + data: { + 'id': job.id, + if (job.debugLabel != null) 'label': job.debugLabel, + 'pending': _pendingJobs.length, + }, + ); + } catch (error, stackTrace) { + job.onError(error, stackTrace); + + DebugLogger.error( + 'failed', + scope: 'worker', + error: error, + stackTrace: stackTrace, + data: { + 'id': job.id, + if (job.debugLabel != null) 'label': job.debugLabel, + }, + ); + } finally { + _activeJobs = math.max(0, _activeJobs - 1); + _processQueue(); + } + } +} + +/// Keep a single [WorkerManager] alive across the app. +@Riverpod(keepAlive: true) +// ignore: functional_ref +WorkerManager workerManager(Ref ref) { + final concurrency = kIsWeb ? 1 : WorkerManager._defaultMaxConcurrentTasks; + final manager = WorkerManager(maxConcurrentTasks: concurrency); + ref.onDispose(manager.dispose); + return manager; +} + +class _EnqueuedJob { + _EnqueuedJob({ + required this.id, + required this.run, + required this.onComplete, + required this.onError, + this.debugLabel, + }); + + final int id; + final FutureOr Function() run; + final void Function(dynamic value) onComplete; + final void Function(Object error, StackTrace stackTrace) onError; + final String? debugLabel; + final DateTime queuedAt = DateTime.now(); + + void cancel(Object error) { + onError(error, StackTrace.current); + } +} diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 35e4bbd..2d1372c 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -14,6 +14,7 @@ import '../../../core/providers/app_providers.dart'; import '../../../core/services/conversation_delta_listener.dart'; import '../../../core/services/streaming_helper.dart'; import '../../../core/services/streaming_response_controller.dart'; +import '../../../core/services/worker_manager.dart'; import '../../../core/utils/debug_logger.dart'; import '../../../core/utils/markdown_stream_formatter.dart'; import '../../../core/utils/tool_calls_parser.dart'; @@ -1449,6 +1450,7 @@ Future regenerateMessage( activeConversationId: activeConversation.id, api: api!, socketService: socketService, + workerManager: ref.read(workerManagerProvider), registerDeltaListener: registerDeltaListener, appendToLastMessage: (c) => ref.read(chatMessagesProvider.notifier).appendToLastMessage(c), @@ -1997,6 +1999,7 @@ Future _sendMessageInternal( activeConversationId: activeConversation?.id, api: api!, socketService: socketService, + workerManager: ref.read(workerManagerProvider), registerDeltaListener: registerDeltaListener, appendToLastMessage: (c) => ref.read(chatMessagesProvider.notifier).appendToLastMessage(c), diff --git a/lib/features/chat/widgets/assistant_message_widget.dart b/lib/features/chat/widgets/assistant_message_widget.dart index 0359fba..00a12ce 100644 --- a/lib/features/chat/widgets/assistant_message_widget.dart +++ b/lib/features/chat/widgets/assistant_message_widget.dart @@ -24,6 +24,7 @@ import '../providers/chat_providers.dart' show sendMessageWithContainer; import '../../../core/utils/debug_logger.dart'; import 'sources/openwebui_sources.dart'; import '../providers/assistant_response_builder_provider.dart'; +import '../../../core/services/worker_manager.dart'; // Pre-compiled regex patterns for image processing (performance optimization) final _base64ImagePattern = RegExp(r'data:image/[^;]+;base64,[A-Za-z0-9+/]+=*'); @@ -104,7 +105,7 @@ class _AssistantMessageWidgetState extends ConsumerState ); // Parse reasoning and tool-calls sections - _reparseSections(); + unawaited(_reparseSections()); _updateTypingIndicatorGate(); } @@ -121,7 +122,7 @@ class _AssistantMessageWidgetState extends ConsumerState // Re-parse sections when message content changes if (oldWidget.message.content != widget.message.content) { - _reparseSections(); + unawaited(_reparseSections()); _updateTypingIndicatorGate(); } @@ -141,7 +142,7 @@ class _AssistantMessageWidgetState extends ConsumerState } } - void _reparseSections() { + Future _reparseSections() async { final raw0 = _activeVersionIndex >= 0 ? (widget.message.versions[_activeVersionIndex].content as String?) ?? '' @@ -162,11 +163,13 @@ class _AssistantMessageWidgetState extends ConsumerState final out = []; final textBuf = StringBuffer(); + final textSegments = []; if (rSegs == null || rSegs.isEmpty) { final tSegs = ToolCallsParser.segments(raw); if (tSegs == null || tSegs.isEmpty) { out.add(MessageSegment.text(raw)); textBuf.write(raw); + textSegments.add(raw); } else { for (final s in tSegs) { if (s.isToolCall && s.entry != null) { @@ -174,6 +177,7 @@ class _AssistantMessageWidgetState extends ConsumerState } else if ((s.text ?? '').isNotEmpty) { out.add(MessageSegment.text(s.text!)); textBuf.write(s.text); + textSegments.add(s.text!); } } } @@ -187,6 +191,7 @@ class _AssistantMessageWidgetState extends ConsumerState if (tSegs == null || tSegs.isEmpty) { out.add(MessageSegment.text(t)); textBuf.write(t); + textSegments.add(t); } else { for (final s in tSegs) { if (s.isToolCall && s.entry != null) { @@ -194,6 +199,7 @@ class _AssistantMessageWidgetState extends ConsumerState } else if ((s.text ?? '').isNotEmpty) { out.add(MessageSegment.text(s.text!)); textBuf.write(s.text); + textSegments.add(s.text!); } } } @@ -202,8 +208,19 @@ class _AssistantMessageWidgetState extends ConsumerState } final segments = out.isEmpty ? [MessageSegment.text(raw)] : out; - final speechText = _buildTtsPlainText(segments, raw); + String speechText; + try { + final worker = ref.read(workerManagerProvider); + speechText = await worker.schedule, String>( + _buildTtsPlainTextWorker, + {'segments': textSegments, 'fallback': raw}, + debugLabel: 'tts_plain_text', + ); + } catch (_) { + speechText = _buildTtsPlainTextFallback(textSegments, raw); + } + if (!mounted) return; setState(() { _segments = segments; _ttsPlainText = speechText; @@ -248,18 +265,14 @@ class _AssistantMessageWidgetState extends ConsumerState } } - String _buildTtsPlainText(List segments, String fallback) { + String _buildTtsPlainTextFallback(List segments, String fallback) { if (segments.isEmpty) { return MarkdownToText.convert(fallback); } final buffer = StringBuffer(); for (final segment in segments) { - if (!segment.isText) { - continue; - } - final text = segment.text ?? ''; - final sanitized = MarkdownToText.convert(text); + final sanitized = MarkdownToText.convert(segment); if (sanitized.isEmpty) { continue; } @@ -1157,7 +1170,7 @@ class _AssistantMessageWidgetState extends ConsumerState } else if (_activeVersionIndex > 0) { _activeVersionIndex -= 1; } - _reparseSections(); + unawaited(_reparseSections()); }); }, ), @@ -1177,7 +1190,7 @@ class _AssistantMessageWidgetState extends ConsumerState } else { _activeVersionIndex = -1; // move to live } - _reparseSections(); + unawaited(_reparseSections()); }); }, ), @@ -1329,6 +1342,34 @@ class _AssistantMessageWidgetState extends ConsumerState } } +String _buildTtsPlainTextWorker(Map payload) { + final rawSegments = payload['segments']; + final fallback = payload['fallback'] as String? ?? ''; + final segments = rawSegments is List ? rawSegments.cast() : const []; + + if (segments.isEmpty) { + return MarkdownToText.convert(fallback); + } + + final buffer = StringBuffer(); + for (final segment in segments) { + if (segment is! String || segment.isEmpty) continue; + final sanitized = MarkdownToText.convert(segment); + if (sanitized.isEmpty) continue; + if (buffer.isNotEmpty) { + buffer.writeln(); + buffer.writeln(); + } + buffer.write(sanitized); + } + + final result = buffer.toString().trim(); + if (result.isEmpty) { + return MarkdownToText.convert(fallback); + } + return result; +} + class StatusHistoryTimeline extends StatefulWidget { const StatusHistoryTimeline({ super.key, diff --git a/lib/features/chat/widgets/enhanced_attachment.dart b/lib/features/chat/widgets/enhanced_attachment.dart index 86b7dff..fd9f199 100644 --- a/lib/features/chat/widgets/enhanced_attachment.dart +++ b/lib/features/chat/widgets/enhanced_attachment.dart @@ -9,6 +9,7 @@ import 'package:share_plus/share_plus.dart'; import 'package:path_provider/path_provider.dart'; import 'dart:io'; import 'dart:convert'; +import '../../../core/services/worker_manager.dart'; class EnhancedAttachment extends ConsumerStatefulWidget { final String attachmentId; @@ -102,12 +103,14 @@ class _EnhancedAttachmentState extends ConsumerState { final dir = await getTemporaryDirectory(); final filePath = '${dir.path}/$filename'; + final worker = ref.read(workerManagerProvider); try { - if (content.length > 128 && - RegExp( - r'^[A-Za-z0-9+/=\r\n]+$', - ).hasMatch(content.replaceAll('\n', ''))) { - final bytes = base64Decode(content.replaceAll('\n', '')); + if (_looksLikeBase64(content)) { + final bytes = await worker.schedule( + _decodeAttachmentBase64, + content, + debugLabel: 'attachment_decode_bytes', + ); await File(filePath).writeAsBytes(bytes, flush: true); } else { await File(filePath).writeAsString(content, flush: true); @@ -291,3 +294,14 @@ class _EnhancedAttachmentState extends ConsumerState { return '${(bytes / (1024 * 1024 * 1024)).toStringAsFixed(1)} GB'; } } + +bool _looksLikeBase64(String content) { + if (content.length <= 128) return false; + final sanitized = content.replaceAll('\n', ''); + return RegExp(r'^[A-Za-z0-9+/=]+$').hasMatch(sanitized); +} + +Uint8List _decodeAttachmentBase64(String raw) { + final sanitized = raw.replaceAll('\n', ''); + return base64Decode(sanitized); +} diff --git a/lib/features/chat/widgets/enhanced_image_attachment.dart b/lib/features/chat/widgets/enhanced_image_attachment.dart index c60cb72..70b46f6 100644 --- a/lib/features/chat/widgets/enhanced_image_attachment.dart +++ b/lib/features/chat/widgets/enhanced_image_attachment.dart @@ -1,6 +1,6 @@ import 'dart:convert'; import 'dart:io'; -import 'package:flutter/foundation.dart'; +import 'dart:typed_data'; import 'package:flutter/material.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:cached_network_image/cached_network_image.dart'; @@ -15,6 +15,7 @@ import '../../auth/providers/unified_auth_providers.dart'; import '../../../core/utils/debug_logger.dart'; import '../../../core/network/self_signed_image_cache_manager.dart'; import '../../../core/network/image_header_utils.dart'; +import '../../../core/services/worker_manager.dart'; // Simple global cache to prevent reloading final _globalImageCache = {}; @@ -23,13 +24,6 @@ final _globalErrorStates = {}; final _globalImageBytesCache = {}; final _base64WhitespacePattern = RegExp(r'\s'); -Future _decodeImageDataAsync(String data) async { - if (kIsWeb) { - return _decodeImageData(data); - } - return compute(_decodeImageData, data); -} - Uint8List _decodeImageData(String data) { var payload = data; if (payload.startsWith('data:')) { @@ -233,7 +227,12 @@ class _EnhancedImageAttachmentState if (_isDecoding) return; _isDecoding = true; try { - final bytes = await _decodeImageDataAsync(data); + final worker = ref.read(workerManagerProvider); + final bytes = await worker.schedule( + _decodeImageData, + data, + debugLabel: 'decode_image', + ); _globalImageBytesCache[widget.attachmentId] = bytes; if (!mounted) return; setState(() {