diff --git a/lib/core/providers/app_providers.dart b/lib/core/providers/app_providers.dart index 147dc1b..033b45c 100644 --- a/lib/core/providers/app_providers.dart +++ b/lib/core/providers/app_providers.dart @@ -404,18 +404,47 @@ class SocketConnectionStream extends _$SocketConnectionStream { } @Riverpod(keepAlive: true) -Stream conversationDeltaStream( - Ref ref, - ConversationDeltaRequest request, -) { - final controller = StreamController.broadcast(sync: true); +class ConversationDeltaStream extends _$ConversationDeltaStream { + StreamController? _controller; + ProviderSubscription>? _serviceSubscription; + SocketEventSubscription? _socketSubscription; - ProviderSubscription>? serviceSubscription; - SocketEventSubscription? socketSubscription; + @override + Stream build(ConversationDeltaRequest request) { + final controller = StreamController.broadcast( + sync: true, + onCancel: _maybeTearDownSocket, + ); + _controller = controller; - void bindSocket(SocketService? service) { - socketSubscription?.dispose(); - socketSubscription = null; + final initialService = ref + .watch(socketServiceManagerProvider) + .maybeWhen(data: (service) => service, orElse: () => null); + _bindSocket(initialService, request); + + _serviceSubscription = ref.listen>( + socketServiceManagerProvider, + (_, next) => _bindSocket( + next.maybeWhen(data: (service) => service, orElse: () => null), + request, + ), + ); + + ref.onDispose(() { + _serviceSubscription?.close(); + _serviceSubscription = null; + _socketSubscription?.dispose(); + _socketSubscription = null; + _controller?.close(); + _controller = null; + }); + + return controller.stream; + } + + void _bindSocket(SocketService? service, ConversationDeltaRequest request) { + _socketSubscription?.dispose(); + _socketSubscription = null; if (service == null) { return; @@ -423,65 +452,68 @@ Stream conversationDeltaStream( switch (request.source) { case ConversationDeltaSource.chat: - socketSubscription = service.addChatEventHandler( + _socketSubscription = service.addChatEventHandler( conversationId: request.conversationId, sessionId: request.sessionId, requireFocus: request.requireFocus, handler: (event, ack) { - if (!controller.isClosed) { - controller.add( - ConversationDelta.fromSocketEvent( - ConversationDeltaSource.chat, - event, - ack, - ), - ); - } + _controller?.add( + ConversationDelta.fromSocketEvent( + ConversationDeltaSource.chat, + event, + ack, + ), + ); }, ); break; case ConversationDeltaSource.channel: - socketSubscription = service.addChannelEventHandler( + _socketSubscription = service.addChannelEventHandler( conversationId: request.conversationId, sessionId: request.sessionId, requireFocus: request.requireFocus, handler: (event, ack) { - if (!controller.isClosed) { - controller.add( - ConversationDelta.fromSocketEvent( - ConversationDeltaSource.channel, - event, - ack, - ), - ); - } + _controller?.add( + ConversationDelta.fromSocketEvent( + ConversationDeltaSource.channel, + event, + ack, + ), + ); }, ); break; } } - final initialService = ref - .watch(socketServiceManagerProvider) - .maybeWhen(data: (service) => service, orElse: () => null); - bindSocket(initialService); + void _maybeTearDownSocket() { + if (_controller?.hasListener == true) { + return; + } + _socketSubscription?.dispose(); + _socketSubscription = null; + } - serviceSubscription = ref.listen>( - socketServiceManagerProvider, - (_, next) => bindSocket( - next.maybeWhen(data: (service) => service, orElse: () => null), - ), - ); - - ref.onDispose(() { - serviceSubscription?.close(); - socketSubscription?.dispose(); - controller.close(); - }); - - return controller.stream; + /// Provides direct access to the underlying stream. + /// Note: This getter is necessary for compatibility with StreamProvider. + /// While Riverpod 3 discourages public getters on Notifiers, this is a + /// pragmatic exception for stream delegation patterns. + // ignore: avoid_public_notifier_properties + Stream get stream => + _controller?.stream ?? const Stream.empty(); } +final conversationDeltaEventsProvider = + StreamProvider.family(( + ref, + request, + ) { + final notifier = ref.watch( + conversationDeltaStreamProvider(request).notifier, + ); + return notifier.stream; + }); + // Attachment upload queue provider final attachmentUploadQueueProvider = Provider((ref) { final api = ref.watch(apiServiceProvider); diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 0376145..07a99b6 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -1331,25 +1331,29 @@ Future regenerateMessage( }); } catch (_) {} - final chatEventsStream = ref.read( - conversationDeltaStreamProvider( - ConversationDeltaRequest.chat( - conversationId: activeConversation.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ), - ); + final chatEventsStream = ref + .read( + conversationDeltaStreamProvider( + ConversationDeltaRequest.chat( + conversationId: activeConversation.id, + sessionId: effectiveSessionId, + requireFocus: false, + ), + ).notifier, + ) + .stream; - final channelEventsStream = ref.read( - conversationDeltaStreamProvider( - ConversationDeltaRequest.channel( - conversationId: activeConversation.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ), - ); + final channelEventsStream = ref + .read( + conversationDeltaStreamProvider( + ConversationDeltaRequest.channel( + conversationId: activeConversation.id, + sessionId: effectiveSessionId, + requireFocus: false, + ), + ).notifier, + ) + .stream; final activeStream = attachUnifiedChunkedStreaming( stream: stream, @@ -1897,25 +1901,29 @@ Future _sendMessageInternal( }); } catch (_) {} - final chatEventsStream = ref.read( - conversationDeltaStreamProvider( - ConversationDeltaRequest.chat( - conversationId: activeConversation?.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ), - ); + final chatEventsStream = ref + .read( + conversationDeltaStreamProvider( + ConversationDeltaRequest.chat( + conversationId: activeConversation?.id, + sessionId: effectiveSessionId, + requireFocus: false, + ), + ).notifier, + ) + .stream; - final channelEventsStream = ref.read( - conversationDeltaStreamProvider( - ConversationDeltaRequest.channel( - conversationId: activeConversation?.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ), - ); + final channelEventsStream = ref + .read( + conversationDeltaStreamProvider( + ConversationDeltaRequest.channel( + conversationId: activeConversation?.id, + sessionId: effectiveSessionId, + requireFocus: false, + ), + ).notifier, + ) + .stream; final activeStream = attachUnifiedChunkedStreaming( stream: stream, diff --git a/tmp/flutter_ai_repo b/tmp/flutter_ai_repo deleted file mode 160000 index 79187cf..0000000 --- a/tmp/flutter_ai_repo +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 79187cf7e3c7143511ac8eeebdabe76d8da43495