From f15a8eda7955f692b87d97ebf3fbb41e79364c9a Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Wed, 1 Oct 2025 19:20:46 +0530 Subject: [PATCH] refactor: streamline conversation delta stream handling - Replaced direct stream access in the `regenerateMessage` and `_sendMessageInternal` methods with a new helper function `_conversationDeltaStream` for better encapsulation and reusability. - Improved the management of chat and channel event streams by utilizing the new helper function, enhancing code clarity and maintainability. - Removed the previous `conversationDeltaEventsProvider` as it is no longer necessary with the new implementation, simplifying the provider structure. --- lib/core/providers/app_providers.dart | 19 --- .../chat/providers/chat_providers.dart | 140 +++++++++++------- 2 files changed, 84 insertions(+), 75 deletions(-) diff --git a/lib/core/providers/app_providers.dart b/lib/core/providers/app_providers.dart index 6c687fe..696fce8 100644 --- a/lib/core/providers/app_providers.dart +++ b/lib/core/providers/app_providers.dart @@ -462,27 +462,8 @@ class ConversationDeltaStream extends _$ConversationDeltaStream { _socketSubscription?.dispose(); _socketSubscription = null; } - - /// Provides direct access to the underlying stream. - /// Note: This getter is necessary for compatibility with StreamProvider. - /// While Riverpod 3 discourages public getters on Notifiers, this is a - /// pragmatic exception for stream delegation patterns. - // ignore: avoid_public_notifier_properties - Stream get stream => - _controller?.stream ?? const Stream.empty(); } -final conversationDeltaEventsProvider = - StreamProvider.family(( - ref, - request, - ) { - final notifier = ref.watch( - conversationDeltaStreamProvider(request).notifier, - ); - return notifier.stream; - }); - // Attachment upload queue provider final attachmentUploadQueueProvider = Provider((ref) { final api = ref.watch(apiServiceProvider); diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 6431a53..9071977 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -1331,29 +1331,23 @@ Future regenerateMessage( }); } catch (_) {} - final chatEventsStream = ref - .read( - conversationDeltaStreamProvider( - ConversationDeltaRequest.chat( - conversationId: activeConversation.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ).notifier, - ) - .stream; + final chatEventsHandle = _conversationDeltaStream( + ref, + ConversationDeltaRequest.chat( + conversationId: activeConversation.id, + sessionId: effectiveSessionId, + requireFocus: false, + ), + ); - final channelEventsStream = ref - .read( - conversationDeltaStreamProvider( - ConversationDeltaRequest.channel( - conversationId: activeConversation.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ).notifier, - ) - .stream; + final channelEventsHandle = _conversationDeltaStream( + ref, + ConversationDeltaRequest.channel( + conversationId: activeConversation.id, + sessionId: effectiveSessionId, + requireFocus: false, + ), + ); final activeStream = attachUnifiedChunkedStreaming( stream: stream, @@ -1365,8 +1359,8 @@ Future regenerateMessage( activeConversationId: activeConversation.id, api: api, socketService: socketService, - chatEvents: chatEventsStream, - channelEvents: channelEventsStream, + chatEvents: chatEventsHandle.stream, + channelEvents: channelEventsHandle.stream, appendToLastMessage: (c) => ref.read(chatMessagesProvider.notifier).appendToLastMessage(c), replaceLastMessageContent: (c) => @@ -1417,10 +1411,11 @@ Future regenerateMessage( ); ref.read(chatMessagesProvider.notifier) ..setMessageStream(activeStream.controller) - ..setSocketSubscriptions( - activeStream.socketSubscriptions, - onDispose: activeStream.disposeWatchdog, - ); + ..setSocketSubscriptions([ + ...activeStream.socketSubscriptions, + chatEventsHandle.dispose, + channelEventsHandle.dispose, + ], onDispose: activeStream.disposeWatchdog); return; } catch (e) { rethrow; @@ -1456,6 +1451,44 @@ Future sendMessageWithContainer( await _sendMessageInternal(container, message, attachments, toolIds); } +({Stream stream, void Function() dispose}) +_conversationDeltaStream(dynamic ref, ConversationDeltaRequest request) { + final controller = StreamController.broadcast(); + var isDisposed = false; + + void close() { + if (isDisposed) return; + isDisposed = true; + controller.close(); + } + + final subscription = ref.listen>( + conversationDeltaStreamProvider(request), + (previous, next) { + if (next is AsyncData) { + if (!controller.isClosed) controller.add(next.value); + return; + } + if (next is AsyncError) { + if (!controller.isClosed) { + controller.addError(next.error, next.stackTrace); + } + } + }, + fireImmediately: false, + ); + + void dispose() { + if (isDisposed) return; + subscription.close(); + close(); + } + + controller.onCancel = dispose; + + return (stream: controller.stream, dispose: dispose); +} + // Internal send message implementation Future _sendMessageInternal( dynamic ref, @@ -1901,29 +1934,23 @@ Future _sendMessageInternal( }); } catch (_) {} - final chatEventsStream = ref - .read( - conversationDeltaStreamProvider( - ConversationDeltaRequest.chat( - conversationId: activeConversation?.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ).notifier, - ) - .stream; + final chatEventsHandle = _conversationDeltaStream( + ref, + ConversationDeltaRequest.chat( + conversationId: activeConversation?.id, + sessionId: effectiveSessionId, + requireFocus: false, + ), + ); - final channelEventsStream = ref - .read( - conversationDeltaStreamProvider( - ConversationDeltaRequest.channel( - conversationId: activeConversation?.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ).notifier, - ) - .stream; + final channelEventsHandle = _conversationDeltaStream( + ref, + ConversationDeltaRequest.channel( + conversationId: activeConversation?.id, + sessionId: effectiveSessionId, + requireFocus: false, + ), + ); final activeStream = attachUnifiedChunkedStreaming( stream: stream, @@ -1935,8 +1962,8 @@ Future _sendMessageInternal( activeConversationId: activeConversation?.id, api: api, socketService: socketService, - chatEvents: chatEventsStream, - channelEvents: channelEventsStream, + chatEvents: chatEventsHandle.stream, + channelEvents: channelEventsHandle.stream, appendToLastMessage: (c) => ref.read(chatMessagesProvider.notifier).appendToLastMessage(c), replaceLastMessageContent: (c) => @@ -1988,10 +2015,11 @@ Future _sendMessageInternal( ref.read(chatMessagesProvider.notifier) ..setMessageStream(activeStream.controller) - ..setSocketSubscriptions( - activeStream.socketSubscriptions, - onDispose: activeStream.disposeWatchdog, - ); + ..setSocketSubscriptions([ + ...activeStream.socketSubscriptions, + chatEventsHandle.dispose, + channelEventsHandle.dispose, + ], onDispose: activeStream.disposeWatchdog); return; } catch (e) { // Handle error - remove the assistant message placeholder