refactor: streamline conversation delta stream handling

- Replaced direct stream access in the `regenerateMessage` and `_sendMessageInternal` methods with a new helper function `_conversationDeltaStream` for better encapsulation and reusability.
- Improved the management of chat and channel event streams by utilizing the new helper function, enhancing code clarity and maintainability.
- Removed the previous `conversationDeltaEventsProvider` as it is no longer necessary with the new implementation, simplifying the provider structure.
This commit is contained in:
cogwheel0
2025-10-01 19:20:46 +05:30
parent 21ef8bf68e
commit f15a8eda79
2 changed files with 84 additions and 75 deletions

View File

@@ -462,27 +462,8 @@ class ConversationDeltaStream extends _$ConversationDeltaStream {
_socketSubscription?.dispose();
_socketSubscription = null;
}
/// Provides direct access to the underlying stream.
/// Note: This getter is necessary for compatibility with StreamProvider.
/// While Riverpod 3 discourages public getters on Notifiers, this is a
/// pragmatic exception for stream delegation patterns.
// ignore: avoid_public_notifier_properties
Stream<ConversationDelta> get stream =>
_controller?.stream ?? const Stream<ConversationDelta>.empty();
}
final conversationDeltaEventsProvider =
StreamProvider.family<ConversationDelta, ConversationDeltaRequest>((
ref,
request,
) {
final notifier = ref.watch(
conversationDeltaStreamProvider(request).notifier,
);
return notifier.stream;
});
// Attachment upload queue provider
final attachmentUploadQueueProvider = Provider<AttachmentUploadQueue?>((ref) {
final api = ref.watch(apiServiceProvider);

View File

@@ -1331,29 +1331,23 @@ Future<void> regenerateMessage(
});
} catch (_) {}
final chatEventsStream = ref
.read(
conversationDeltaStreamProvider(
final chatEventsHandle = _conversationDeltaStream(
ref,
ConversationDeltaRequest.chat(
conversationId: activeConversation.id,
sessionId: effectiveSessionId,
requireFocus: false,
),
).notifier,
)
.stream;
);
final channelEventsStream = ref
.read(
conversationDeltaStreamProvider(
final channelEventsHandle = _conversationDeltaStream(
ref,
ConversationDeltaRequest.channel(
conversationId: activeConversation.id,
sessionId: effectiveSessionId,
requireFocus: false,
),
).notifier,
)
.stream;
);
final activeStream = attachUnifiedChunkedStreaming(
stream: stream,
@@ -1365,8 +1359,8 @@ Future<void> regenerateMessage(
activeConversationId: activeConversation.id,
api: api,
socketService: socketService,
chatEvents: chatEventsStream,
channelEvents: channelEventsStream,
chatEvents: chatEventsHandle.stream,
channelEvents: channelEventsHandle.stream,
appendToLastMessage: (c) =>
ref.read(chatMessagesProvider.notifier).appendToLastMessage(c),
replaceLastMessageContent: (c) =>
@@ -1417,10 +1411,11 @@ Future<void> regenerateMessage(
);
ref.read(chatMessagesProvider.notifier)
..setMessageStream(activeStream.controller)
..setSocketSubscriptions(
activeStream.socketSubscriptions,
onDispose: activeStream.disposeWatchdog,
);
..setSocketSubscriptions([
...activeStream.socketSubscriptions,
chatEventsHandle.dispose,
channelEventsHandle.dispose,
], onDispose: activeStream.disposeWatchdog);
return;
} catch (e) {
rethrow;
@@ -1456,6 +1451,44 @@ Future<void> sendMessageWithContainer(
await _sendMessageInternal(container, message, attachments, toolIds);
}
({Stream<ConversationDelta> stream, void Function() dispose})
_conversationDeltaStream(dynamic ref, ConversationDeltaRequest request) {
final controller = StreamController<ConversationDelta>.broadcast();
var isDisposed = false;
void close() {
if (isDisposed) return;
isDisposed = true;
controller.close();
}
final subscription = ref.listen<AsyncValue<ConversationDelta>>(
conversationDeltaStreamProvider(request),
(previous, next) {
if (next is AsyncData<ConversationDelta>) {
if (!controller.isClosed) controller.add(next.value);
return;
}
if (next is AsyncError<ConversationDelta>) {
if (!controller.isClosed) {
controller.addError(next.error, next.stackTrace);
}
}
},
fireImmediately: false,
);
void dispose() {
if (isDisposed) return;
subscription.close();
close();
}
controller.onCancel = dispose;
return (stream: controller.stream, dispose: dispose);
}
// Internal send message implementation
Future<void> _sendMessageInternal(
dynamic ref,
@@ -1901,29 +1934,23 @@ Future<void> _sendMessageInternal(
});
} catch (_) {}
final chatEventsStream = ref
.read(
conversationDeltaStreamProvider(
final chatEventsHandle = _conversationDeltaStream(
ref,
ConversationDeltaRequest.chat(
conversationId: activeConversation?.id,
sessionId: effectiveSessionId,
requireFocus: false,
),
).notifier,
)
.stream;
);
final channelEventsStream = ref
.read(
conversationDeltaStreamProvider(
final channelEventsHandle = _conversationDeltaStream(
ref,
ConversationDeltaRequest.channel(
conversationId: activeConversation?.id,
sessionId: effectiveSessionId,
requireFocus: false,
),
).notifier,
)
.stream;
);
final activeStream = attachUnifiedChunkedStreaming(
stream: stream,
@@ -1935,8 +1962,8 @@ Future<void> _sendMessageInternal(
activeConversationId: activeConversation?.id,
api: api,
socketService: socketService,
chatEvents: chatEventsStream,
channelEvents: channelEventsStream,
chatEvents: chatEventsHandle.stream,
channelEvents: channelEventsHandle.stream,
appendToLastMessage: (c) =>
ref.read(chatMessagesProvider.notifier).appendToLastMessage(c),
replaceLastMessageContent: (c) =>
@@ -1988,10 +2015,11 @@ Future<void> _sendMessageInternal(
ref.read(chatMessagesProvider.notifier)
..setMessageStream(activeStream.controller)
..setSocketSubscriptions(
activeStream.socketSubscriptions,
onDispose: activeStream.disposeWatchdog,
);
..setSocketSubscriptions([
...activeStream.socketSubscriptions,
chatEventsHandle.dispose,
channelEventsHandle.dispose,
], onDispose: activeStream.disposeWatchdog);
return;
} catch (e) {
// Handle error - remove the assistant message placeholder