From d899ca5f70fc6cf114ba7e5444c2480ba37e63c5 Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Wed, 1 Oct 2025 19:46:21 +0530 Subject: [PATCH] refactor: update streaming helper to use conversation delta listener - Replaced direct stream handling for chat and channel events in `attachUnifiedChunkedStreaming` with a new `RegisterConversationDeltaListener` parameter for improved encapsulation and flexibility. - Removed the previous chat and channel event stream parameters, streamlining the function signature and enhancing code clarity. - Updated the `regenerateMessage` and `_sendMessageInternal` methods to utilize the new listener registration, improving the management of conversation delta events. - Enhanced error handling for delta listeners, ensuring robust logging of errors during event processing. --- .../services/conversation_delta_listener.dart | 136 ++++++++++++++++++ lib/core/services/streaming_helper.dart | 65 ++++++--- .../chat/providers/chat_providers.dart | 100 ++----------- 3 files changed, 194 insertions(+), 107 deletions(-) create mode 100644 lib/core/services/conversation_delta_listener.dart diff --git a/lib/core/services/conversation_delta_listener.dart b/lib/core/services/conversation_delta_listener.dart new file mode 100644 index 0000000..caeca69 --- /dev/null +++ b/lib/core/services/conversation_delta_listener.dart @@ -0,0 +1,136 @@ +import 'package:flutter/foundation.dart'; +import 'package:flutter_riverpod/flutter_riverpod.dart'; + +import '../models/socket_event.dart'; +import '../providers/app_providers.dart'; + +/// Signature for callbacks that receive conversation delta updates. +typedef ConversationDeltaDataCallback = void Function(ConversationDelta delta); + +/// Signature for callbacks that handle errors emitted by the delta stream. +typedef ConversationDeltaErrorCallback = + void Function(Object error, StackTrace stackTrace); + +/// Registers a listener for [ConversationDelta] updates behind Riverpod's +/// listening API and exposes explicit lifecycle control. +class ConversationDeltaListener { + ConversationDeltaListener({ + required dynamic ref, + required ConversationDeltaRequest request, + required ConversationDeltaDataCallback onDelta, + required ConversationDeltaErrorCallback onError, + }) : _ref = ref, + _request = request, + _onDelta = onDelta, + _onError = onError; + + final dynamic _ref; + final ConversationDeltaRequest _request; + final ConversationDeltaDataCallback _onDelta; + final ConversationDeltaErrorCallback _onError; + + ProviderSubscription>? _subscription; + bool _disposed = false; + + /// Returns `true` when a Riverpod subscription is currently active. + bool get isActive => _subscription != null; + + /// Starts listening for [ConversationDelta] updates. Subsequent calls are + /// no-ops while the listener is already active. + void start() { + if (_disposed || isActive) { + return; + } + + void handleNext( + AsyncValue? previous, + AsyncValue next, + ) { + if (!_isMounted) { + stop(); + return; + } + + switch (next) { + case AsyncData(value: final delta): + _onDelta(delta); + case AsyncError(:final error, :final stackTrace): + _onError(error, stackTrace); + default: + } + } + + final ref = _ref; + if (ref is Ref) { + _subscription = ref.listen( + conversationDeltaStreamProvider(_request), + handleNext, + fireImmediately: false, + ); + return; + } + if (ref is ProviderContainer) { + _subscription = ref.listen( + conversationDeltaStreamProvider(_request), + handleNext, + fireImmediately: false, + ); + return; + } + + throw ArgumentError('Unsupported ref type: ${ref.runtimeType}'); + } + + /// Stops listening for deltas and releases resources. Safe to call multiple + /// times. + void stop() { + _subscription?.close(); + _subscription = null; + } + + /// Disposes the listener permanently and ensures the subscription is closed. + void dispose() { + if (_disposed) { + return; + } + _disposed = true; + stop(); + } + + bool get _isMounted { + final ref = _ref; + if (ref is Ref) { + return ref.mounted; + } + return !_disposed; + } +} + +/// Type signature for registering delta listeners within helper utilities. +typedef RegisterConversationDeltaListener = + VoidCallback Function({ + required ConversationDeltaRequest request, + required ConversationDeltaDataCallback onDelta, + required ConversationDeltaErrorCallback onError, + }); + +/// Convenience factory that wires up [ConversationDeltaListener] and returns +/// the disposer callback expected by streaming helpers. +RegisterConversationDeltaListener createConversationDeltaRegistrar( + dynamic ref, +) { + return ({ + required ConversationDeltaRequest request, + required ConversationDeltaDataCallback onDelta, + required ConversationDeltaErrorCallback onError, + }) { + final listener = ConversationDeltaListener( + ref: ref, + request: request, + onDelta: onDelta, + onError: onError, + )..start(); + + return listener.dispose; + }; +} diff --git a/lib/core/services/streaming_helper.dart b/lib/core/services/streaming_helper.dart index 405ee07..585dffe 100644 --- a/lib/core/services/streaming_helper.dart +++ b/lib/core/services/streaming_helper.dart @@ -10,6 +10,7 @@ import '../../core/services/socket_service.dart'; import '../../core/utils/inactivity_watchdog.dart'; import '../../core/utils/tool_calls_parser.dart'; import 'navigation_service.dart'; +import 'conversation_delta_listener.dart'; import '../../shared/widgets/themed_dialogs.dart'; import '../../shared/theme/theme_extensions.dart'; import '../utils/debug_logger.dart'; @@ -46,8 +47,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ required String? activeConversationId, required dynamic api, required SocketService? socketService, - Stream? chatEvents, - Stream? channelEvents, + RegisterConversationDeltaListener? registerDeltaListener, // Message update callbacks required void Function(String) appendToLastMessage, required void Function(String) replaceLastMessageContent, @@ -97,7 +97,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ InactivityWatchdog? socketWatchdog; final socketSubscriptions = []; final hasSocketSignals = - socketService != null || chatEvents != null || channelEvents != null; + socketService != null || registerDeltaListener != null; if (hasSocketSignals) { // Increase timeout to match OpenWebUI's more generous timeouts for long responses socketWatchdog = InactivityWatchdog( @@ -991,14 +991,27 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ } catch (_) {} } - if (chatEvents != null) { - final subscription = chatEvents.listen((event) { - socketWatchdog?.ping(); - chatHandler(event.raw, event.ack); - }); - socketSubscriptions.add(() { - unawaited(subscription.cancel()); - }); + if (registerDeltaListener != null) { + final chatDisposer = registerDeltaListener( + request: ConversationDeltaRequest.chat( + conversationId: activeConversationId, + sessionId: sessionId, + requireFocus: false, + ), + onDelta: (event) { + socketWatchdog?.ping(); + chatHandler(event.raw, event.ack); + }, + onError: (error, stackTrace) { + DebugLogger.error( + 'Chat delta listener error', + scope: 'streaming/helper', + error: error, + stackTrace: stackTrace, + ); + }, + ); + socketSubscriptions.add(chatDisposer); } else if (socketService != null) { final chatSub = socketService.addChatEventHandler( conversationId: activeConversationId, @@ -1008,15 +1021,27 @@ ActiveSocketStream attachUnifiedChunkedStreaming({ ); socketSubscriptions.add(chatSub.dispose); } - - if (channelEvents != null) { - final subscription = channelEvents.listen((event) { - socketWatchdog?.ping(); - channelEventsHandler(event.raw, event.ack); - }); - socketSubscriptions.add(() { - unawaited(subscription.cancel()); - }); + if (registerDeltaListener != null) { + final channelDisposer = registerDeltaListener( + request: ConversationDeltaRequest.channel( + conversationId: activeConversationId, + sessionId: sessionId, + requireFocus: false, + ), + onDelta: (event) { + socketWatchdog?.ping(); + channelEventsHandler(event.raw, event.ack); + }, + onError: (error, stackTrace) { + DebugLogger.error( + 'Channel delta listener error', + scope: 'streaming/helper', + error: error, + stackTrace: stackTrace, + ); + }, + ); + socketSubscriptions.add(channelDisposer); } else if (socketService != null) { final channelSub = socketService.addChannelEventHandler( conversationId: activeConversationId, diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 9071977..a3fad2e 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -10,8 +10,8 @@ import 'package:yaml/yaml.dart' as yaml; import '../../../core/auth/auth_state_manager.dart'; import '../../../core/models/chat_message.dart'; import '../../../core/models/conversation.dart'; -import '../../../core/models/socket_event.dart'; import '../../../core/providers/app_providers.dart'; +import '../../../core/services/conversation_delta_listener.dart'; import '../../../core/services/streaming_helper.dart'; import '../../../core/services/streaming_response_controller.dart'; import '../../../core/utils/debug_logger.dart'; @@ -1331,23 +1331,7 @@ Future regenerateMessage( }); } catch (_) {} - final chatEventsHandle = _conversationDeltaStream( - ref, - ConversationDeltaRequest.chat( - conversationId: activeConversation.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ); - - final channelEventsHandle = _conversationDeltaStream( - ref, - ConversationDeltaRequest.channel( - conversationId: activeConversation.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ); + final registerDeltaListener = createConversationDeltaRegistrar(ref); final activeStream = attachUnifiedChunkedStreaming( stream: stream, @@ -1359,8 +1343,7 @@ Future regenerateMessage( activeConversationId: activeConversation.id, api: api, socketService: socketService, - chatEvents: chatEventsHandle.stream, - channelEvents: channelEventsHandle.stream, + registerDeltaListener: registerDeltaListener, appendToLastMessage: (c) => ref.read(chatMessagesProvider.notifier).appendToLastMessage(c), replaceLastMessageContent: (c) => @@ -1411,11 +1394,10 @@ Future regenerateMessage( ); ref.read(chatMessagesProvider.notifier) ..setMessageStream(activeStream.controller) - ..setSocketSubscriptions([ - ...activeStream.socketSubscriptions, - chatEventsHandle.dispose, - channelEventsHandle.dispose, - ], onDispose: activeStream.disposeWatchdog); + ..setSocketSubscriptions( + activeStream.socketSubscriptions, + onDispose: activeStream.disposeWatchdog, + ); return; } catch (e) { rethrow; @@ -1451,44 +1433,6 @@ Future sendMessageWithContainer( await _sendMessageInternal(container, message, attachments, toolIds); } -({Stream stream, void Function() dispose}) -_conversationDeltaStream(dynamic ref, ConversationDeltaRequest request) { - final controller = StreamController.broadcast(); - var isDisposed = false; - - void close() { - if (isDisposed) return; - isDisposed = true; - controller.close(); - } - - final subscription = ref.listen>( - conversationDeltaStreamProvider(request), - (previous, next) { - if (next is AsyncData) { - if (!controller.isClosed) controller.add(next.value); - return; - } - if (next is AsyncError) { - if (!controller.isClosed) { - controller.addError(next.error, next.stackTrace); - } - } - }, - fireImmediately: false, - ); - - void dispose() { - if (isDisposed) return; - subscription.close(); - close(); - } - - controller.onCancel = dispose; - - return (stream: controller.stream, dispose: dispose); -} - // Internal send message implementation Future _sendMessageInternal( dynamic ref, @@ -1934,23 +1878,7 @@ Future _sendMessageInternal( }); } catch (_) {} - final chatEventsHandle = _conversationDeltaStream( - ref, - ConversationDeltaRequest.chat( - conversationId: activeConversation?.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ); - - final channelEventsHandle = _conversationDeltaStream( - ref, - ConversationDeltaRequest.channel( - conversationId: activeConversation?.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ); + final registerDeltaListener = createConversationDeltaRegistrar(ref); final activeStream = attachUnifiedChunkedStreaming( stream: stream, @@ -1962,8 +1890,7 @@ Future _sendMessageInternal( activeConversationId: activeConversation?.id, api: api, socketService: socketService, - chatEvents: chatEventsHandle.stream, - channelEvents: channelEventsHandle.stream, + registerDeltaListener: registerDeltaListener, appendToLastMessage: (c) => ref.read(chatMessagesProvider.notifier).appendToLastMessage(c), replaceLastMessageContent: (c) => @@ -2015,11 +1942,10 @@ Future _sendMessageInternal( ref.read(chatMessagesProvider.notifier) ..setMessageStream(activeStream.controller) - ..setSocketSubscriptions([ - ...activeStream.socketSubscriptions, - chatEventsHandle.dispose, - channelEventsHandle.dispose, - ], onDispose: activeStream.disposeWatchdog); + ..setSocketSubscriptions( + activeStream.socketSubscriptions, + onDispose: activeStream.disposeWatchdog, + ); return; } catch (e) { // Handle error - remove the assistant message placeholder