diff --git a/lib/core/providers/app_providers.dart b/lib/core/providers/app_providers.dart index 033b45c..147dc1b 100644 --- a/lib/core/providers/app_providers.dart +++ b/lib/core/providers/app_providers.dart @@ -404,47 +404,18 @@ class SocketConnectionStream extends _$SocketConnectionStream { } @Riverpod(keepAlive: true) -class ConversationDeltaStream extends _$ConversationDeltaStream { - StreamController? _controller; - ProviderSubscription>? _serviceSubscription; - SocketEventSubscription? _socketSubscription; +Stream conversationDeltaStream( + Ref ref, + ConversationDeltaRequest request, +) { + final controller = StreamController.broadcast(sync: true); - @override - Stream build(ConversationDeltaRequest request) { - final controller = StreamController.broadcast( - sync: true, - onCancel: _maybeTearDownSocket, - ); - _controller = controller; + ProviderSubscription>? serviceSubscription; + SocketEventSubscription? socketSubscription; - final initialService = ref - .watch(socketServiceManagerProvider) - .maybeWhen(data: (service) => service, orElse: () => null); - _bindSocket(initialService, request); - - _serviceSubscription = ref.listen>( - socketServiceManagerProvider, - (_, next) => _bindSocket( - next.maybeWhen(data: (service) => service, orElse: () => null), - request, - ), - ); - - ref.onDispose(() { - _serviceSubscription?.close(); - _serviceSubscription = null; - _socketSubscription?.dispose(); - _socketSubscription = null; - _controller?.close(); - _controller = null; - }); - - return controller.stream; - } - - void _bindSocket(SocketService? service, ConversationDeltaRequest request) { - _socketSubscription?.dispose(); - _socketSubscription = null; + void bindSocket(SocketService? service) { + socketSubscription?.dispose(); + socketSubscription = null; if (service == null) { return; @@ -452,68 +423,65 @@ class ConversationDeltaStream extends _$ConversationDeltaStream { switch (request.source) { case ConversationDeltaSource.chat: - _socketSubscription = service.addChatEventHandler( + socketSubscription = service.addChatEventHandler( conversationId: request.conversationId, sessionId: request.sessionId, requireFocus: request.requireFocus, handler: (event, ack) { - _controller?.add( - ConversationDelta.fromSocketEvent( - ConversationDeltaSource.chat, - event, - ack, - ), - ); + if (!controller.isClosed) { + controller.add( + ConversationDelta.fromSocketEvent( + ConversationDeltaSource.chat, + event, + ack, + ), + ); + } }, ); break; case ConversationDeltaSource.channel: - _socketSubscription = service.addChannelEventHandler( + socketSubscription = service.addChannelEventHandler( conversationId: request.conversationId, sessionId: request.sessionId, requireFocus: request.requireFocus, handler: (event, ack) { - _controller?.add( - ConversationDelta.fromSocketEvent( - ConversationDeltaSource.channel, - event, - ack, - ), - ); + if (!controller.isClosed) { + controller.add( + ConversationDelta.fromSocketEvent( + ConversationDeltaSource.channel, + event, + ack, + ), + ); + } }, ); break; } } - void _maybeTearDownSocket() { - if (_controller?.hasListener == true) { - return; - } - _socketSubscription?.dispose(); - _socketSubscription = null; - } + final initialService = ref + .watch(socketServiceManagerProvider) + .maybeWhen(data: (service) => service, orElse: () => null); + bindSocket(initialService); - /// Provides direct access to the underlying stream. - /// Note: This getter is necessary for compatibility with StreamProvider. - /// While Riverpod 3 discourages public getters on Notifiers, this is a - /// pragmatic exception for stream delegation patterns. - // ignore: avoid_public_notifier_properties - Stream get stream => - _controller?.stream ?? const Stream.empty(); + serviceSubscription = ref.listen>( + socketServiceManagerProvider, + (_, next) => bindSocket( + next.maybeWhen(data: (service) => service, orElse: () => null), + ), + ); + + ref.onDispose(() { + serviceSubscription?.close(); + socketSubscription?.dispose(); + controller.close(); + }); + + return controller.stream; } -final conversationDeltaEventsProvider = - StreamProvider.family(( - ref, - request, - ) { - final notifier = ref.watch( - conversationDeltaStreamProvider(request).notifier, - ); - return notifier.stream; - }); - // Attachment upload queue provider final attachmentUploadQueueProvider = Provider((ref) { final api = ref.watch(apiServiceProvider); diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 07a99b6..0376145 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -1331,29 +1331,25 @@ Future regenerateMessage( }); } catch (_) {} - final chatEventsStream = ref - .read( - conversationDeltaStreamProvider( - ConversationDeltaRequest.chat( - conversationId: activeConversation.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ).notifier, - ) - .stream; + final chatEventsStream = ref.read( + conversationDeltaStreamProvider( + ConversationDeltaRequest.chat( + conversationId: activeConversation.id, + sessionId: effectiveSessionId, + requireFocus: false, + ), + ), + ); - final channelEventsStream = ref - .read( - conversationDeltaStreamProvider( - ConversationDeltaRequest.channel( - conversationId: activeConversation.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ).notifier, - ) - .stream; + final channelEventsStream = ref.read( + conversationDeltaStreamProvider( + ConversationDeltaRequest.channel( + conversationId: activeConversation.id, + sessionId: effectiveSessionId, + requireFocus: false, + ), + ), + ); final activeStream = attachUnifiedChunkedStreaming( stream: stream, @@ -1901,29 +1897,25 @@ Future _sendMessageInternal( }); } catch (_) {} - final chatEventsStream = ref - .read( - conversationDeltaStreamProvider( - ConversationDeltaRequest.chat( - conversationId: activeConversation?.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ).notifier, - ) - .stream; + final chatEventsStream = ref.read( + conversationDeltaStreamProvider( + ConversationDeltaRequest.chat( + conversationId: activeConversation?.id, + sessionId: effectiveSessionId, + requireFocus: false, + ), + ), + ); - final channelEventsStream = ref - .read( - conversationDeltaStreamProvider( - ConversationDeltaRequest.channel( - conversationId: activeConversation?.id, - sessionId: effectiveSessionId, - requireFocus: false, - ), - ).notifier, - ) - .stream; + final channelEventsStream = ref.read( + conversationDeltaStreamProvider( + ConversationDeltaRequest.channel( + conversationId: activeConversation?.id, + sessionId: effectiveSessionId, + requireFocus: false, + ), + ), + ); final activeStream = attachUnifiedChunkedStreaming( stream: stream, diff --git a/tmp/flutter_ai_repo b/tmp/flutter_ai_repo new file mode 160000 index 0000000..79187cf --- /dev/null +++ b/tmp/flutter_ai_repo @@ -0,0 +1 @@ +Subproject commit 79187cf7e3c7143511ac8eeebdabe76d8da43495