@@ -404,18 +404,47 @@ class SocketConnectionStream extends _$SocketConnectionStream {
|
||||
}
|
||||
|
||||
@Riverpod(keepAlive: true)
|
||||
Stream<ConversationDelta> conversationDeltaStream(
|
||||
Ref ref,
|
||||
ConversationDeltaRequest request,
|
||||
) {
|
||||
final controller = StreamController<ConversationDelta>.broadcast(sync: true);
|
||||
class ConversationDeltaStream extends _$ConversationDeltaStream {
|
||||
StreamController<ConversationDelta>? _controller;
|
||||
ProviderSubscription<AsyncValue<SocketService?>>? _serviceSubscription;
|
||||
SocketEventSubscription? _socketSubscription;
|
||||
|
||||
ProviderSubscription<AsyncValue<SocketService?>>? serviceSubscription;
|
||||
SocketEventSubscription? socketSubscription;
|
||||
@override
|
||||
Stream<ConversationDelta> build(ConversationDeltaRequest request) {
|
||||
final controller = StreamController<ConversationDelta>.broadcast(
|
||||
sync: true,
|
||||
onCancel: _maybeTearDownSocket,
|
||||
);
|
||||
_controller = controller;
|
||||
|
||||
void bindSocket(SocketService? service) {
|
||||
socketSubscription?.dispose();
|
||||
socketSubscription = null;
|
||||
final initialService = ref
|
||||
.watch(socketServiceManagerProvider)
|
||||
.maybeWhen(data: (service) => service, orElse: () => null);
|
||||
_bindSocket(initialService, request);
|
||||
|
||||
_serviceSubscription = ref.listen<AsyncValue<SocketService?>>(
|
||||
socketServiceManagerProvider,
|
||||
(_, next) => _bindSocket(
|
||||
next.maybeWhen(data: (service) => service, orElse: () => null),
|
||||
request,
|
||||
),
|
||||
);
|
||||
|
||||
ref.onDispose(() {
|
||||
_serviceSubscription?.close();
|
||||
_serviceSubscription = null;
|
||||
_socketSubscription?.dispose();
|
||||
_socketSubscription = null;
|
||||
_controller?.close();
|
||||
_controller = null;
|
||||
});
|
||||
|
||||
return controller.stream;
|
||||
}
|
||||
|
||||
void _bindSocket(SocketService? service, ConversationDeltaRequest request) {
|
||||
_socketSubscription?.dispose();
|
||||
_socketSubscription = null;
|
||||
|
||||
if (service == null) {
|
||||
return;
|
||||
@@ -423,65 +452,68 @@ Stream<ConversationDelta> conversationDeltaStream(
|
||||
|
||||
switch (request.source) {
|
||||
case ConversationDeltaSource.chat:
|
||||
socketSubscription = service.addChatEventHandler(
|
||||
_socketSubscription = service.addChatEventHandler(
|
||||
conversationId: request.conversationId,
|
||||
sessionId: request.sessionId,
|
||||
requireFocus: request.requireFocus,
|
||||
handler: (event, ack) {
|
||||
if (!controller.isClosed) {
|
||||
controller.add(
|
||||
_controller?.add(
|
||||
ConversationDelta.fromSocketEvent(
|
||||
ConversationDeltaSource.chat,
|
||||
event,
|
||||
ack,
|
||||
),
|
||||
);
|
||||
}
|
||||
},
|
||||
);
|
||||
break;
|
||||
case ConversationDeltaSource.channel:
|
||||
socketSubscription = service.addChannelEventHandler(
|
||||
_socketSubscription = service.addChannelEventHandler(
|
||||
conversationId: request.conversationId,
|
||||
sessionId: request.sessionId,
|
||||
requireFocus: request.requireFocus,
|
||||
handler: (event, ack) {
|
||||
if (!controller.isClosed) {
|
||||
controller.add(
|
||||
_controller?.add(
|
||||
ConversationDelta.fromSocketEvent(
|
||||
ConversationDeltaSource.channel,
|
||||
event,
|
||||
ack,
|
||||
),
|
||||
);
|
||||
}
|
||||
},
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
final initialService = ref
|
||||
.watch(socketServiceManagerProvider)
|
||||
.maybeWhen(data: (service) => service, orElse: () => null);
|
||||
bindSocket(initialService);
|
||||
void _maybeTearDownSocket() {
|
||||
if (_controller?.hasListener == true) {
|
||||
return;
|
||||
}
|
||||
_socketSubscription?.dispose();
|
||||
_socketSubscription = null;
|
||||
}
|
||||
|
||||
serviceSubscription = ref.listen<AsyncValue<SocketService?>>(
|
||||
socketServiceManagerProvider,
|
||||
(_, next) => bindSocket(
|
||||
next.maybeWhen(data: (service) => service, orElse: () => null),
|
||||
),
|
||||
);
|
||||
|
||||
ref.onDispose(() {
|
||||
serviceSubscription?.close();
|
||||
socketSubscription?.dispose();
|
||||
controller.close();
|
||||
});
|
||||
|
||||
return controller.stream;
|
||||
/// Provides direct access to the underlying stream.
|
||||
/// Note: This getter is necessary for compatibility with StreamProvider.
|
||||
/// While Riverpod 3 discourages public getters on Notifiers, this is a
|
||||
/// pragmatic exception for stream delegation patterns.
|
||||
// ignore: avoid_public_notifier_properties
|
||||
Stream<ConversationDelta> get stream =>
|
||||
_controller?.stream ?? const Stream<ConversationDelta>.empty();
|
||||
}
|
||||
|
||||
final conversationDeltaEventsProvider =
|
||||
StreamProvider.family<ConversationDelta, ConversationDeltaRequest>((
|
||||
ref,
|
||||
request,
|
||||
) {
|
||||
final notifier = ref.watch(
|
||||
conversationDeltaStreamProvider(request).notifier,
|
||||
);
|
||||
return notifier.stream;
|
||||
});
|
||||
|
||||
// Attachment upload queue provider
|
||||
final attachmentUploadQueueProvider = Provider<AttachmentUploadQueue?>((ref) {
|
||||
final api = ref.watch(apiServiceProvider);
|
||||
|
||||
@@ -1331,25 +1331,29 @@ Future<void> regenerateMessage(
|
||||
});
|
||||
} catch (_) {}
|
||||
|
||||
final chatEventsStream = ref.read(
|
||||
final chatEventsStream = ref
|
||||
.read(
|
||||
conversationDeltaStreamProvider(
|
||||
ConversationDeltaRequest.chat(
|
||||
conversationId: activeConversation.id,
|
||||
sessionId: effectiveSessionId,
|
||||
requireFocus: false,
|
||||
),
|
||||
),
|
||||
);
|
||||
).notifier,
|
||||
)
|
||||
.stream;
|
||||
|
||||
final channelEventsStream = ref.read(
|
||||
final channelEventsStream = ref
|
||||
.read(
|
||||
conversationDeltaStreamProvider(
|
||||
ConversationDeltaRequest.channel(
|
||||
conversationId: activeConversation.id,
|
||||
sessionId: effectiveSessionId,
|
||||
requireFocus: false,
|
||||
),
|
||||
),
|
||||
);
|
||||
).notifier,
|
||||
)
|
||||
.stream;
|
||||
|
||||
final activeStream = attachUnifiedChunkedStreaming(
|
||||
stream: stream,
|
||||
@@ -1897,25 +1901,29 @@ Future<void> _sendMessageInternal(
|
||||
});
|
||||
} catch (_) {}
|
||||
|
||||
final chatEventsStream = ref.read(
|
||||
final chatEventsStream = ref
|
||||
.read(
|
||||
conversationDeltaStreamProvider(
|
||||
ConversationDeltaRequest.chat(
|
||||
conversationId: activeConversation?.id,
|
||||
sessionId: effectiveSessionId,
|
||||
requireFocus: false,
|
||||
),
|
||||
),
|
||||
);
|
||||
).notifier,
|
||||
)
|
||||
.stream;
|
||||
|
||||
final channelEventsStream = ref.read(
|
||||
final channelEventsStream = ref
|
||||
.read(
|
||||
conversationDeltaStreamProvider(
|
||||
ConversationDeltaRequest.channel(
|
||||
conversationId: activeConversation?.id,
|
||||
sessionId: effectiveSessionId,
|
||||
requireFocus: false,
|
||||
),
|
||||
),
|
||||
);
|
||||
).notifier,
|
||||
)
|
||||
.stream;
|
||||
|
||||
final activeStream = attachUnifiedChunkedStreaming(
|
||||
stream: stream,
|
||||
|
||||
Submodule tmp/flutter_ai_repo deleted from 79187cf7e3
Reference in New Issue
Block a user