refactor: update streaming helper to use conversation delta listener

- Replaced direct stream handling for chat and channel events in `attachUnifiedChunkedStreaming` with a new `RegisterConversationDeltaListener` parameter for improved encapsulation and flexibility.
- Removed the previous chat and channel event stream parameters, streamlining the function signature and enhancing code clarity.
- Updated the `regenerateMessage` and `_sendMessageInternal` methods to utilize the new listener registration, improving the management of conversation delta events.
- Enhanced error handling for delta listeners, ensuring robust logging of errors during event processing.
This commit is contained in:
cogwheel0
2025-10-01 19:46:21 +05:30
parent f15a8eda79
commit d899ca5f70
3 changed files with 194 additions and 107 deletions

View File

@@ -0,0 +1,136 @@
import 'package:flutter/foundation.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import '../models/socket_event.dart';
import '../providers/app_providers.dart';
/// Signature for callbacks that receive conversation delta updates.
typedef ConversationDeltaDataCallback = void Function(ConversationDelta delta);
/// Signature for callbacks that handle errors emitted by the delta stream.
typedef ConversationDeltaErrorCallback =
void Function(Object error, StackTrace stackTrace);
/// Registers a listener for [ConversationDelta] updates behind Riverpod's
/// listening API and exposes explicit lifecycle control.
class ConversationDeltaListener {
ConversationDeltaListener({
required dynamic ref,
required ConversationDeltaRequest request,
required ConversationDeltaDataCallback onDelta,
required ConversationDeltaErrorCallback onError,
}) : _ref = ref,
_request = request,
_onDelta = onDelta,
_onError = onError;
final dynamic _ref;
final ConversationDeltaRequest _request;
final ConversationDeltaDataCallback _onDelta;
final ConversationDeltaErrorCallback _onError;
ProviderSubscription<AsyncValue<ConversationDelta>>? _subscription;
bool _disposed = false;
/// Returns `true` when a Riverpod subscription is currently active.
bool get isActive => _subscription != null;
/// Starts listening for [ConversationDelta] updates. Subsequent calls are
/// no-ops while the listener is already active.
void start() {
if (_disposed || isActive) {
return;
}
void handleNext(
AsyncValue<ConversationDelta>? previous,
AsyncValue<ConversationDelta> next,
) {
if (!_isMounted) {
stop();
return;
}
switch (next) {
case AsyncData(value: final delta):
_onDelta(delta);
case AsyncError(:final error, :final stackTrace):
_onError(error, stackTrace);
default:
}
}
final ref = _ref;
if (ref is Ref) {
_subscription = ref.listen(
conversationDeltaStreamProvider(_request),
handleNext,
fireImmediately: false,
);
return;
}
if (ref is ProviderContainer) {
_subscription = ref.listen(
conversationDeltaStreamProvider(_request),
handleNext,
fireImmediately: false,
);
return;
}
throw ArgumentError('Unsupported ref type: ${ref.runtimeType}');
}
/// Stops listening for deltas and releases resources. Safe to call multiple
/// times.
void stop() {
_subscription?.close();
_subscription = null;
}
/// Disposes the listener permanently and ensures the subscription is closed.
void dispose() {
if (_disposed) {
return;
}
_disposed = true;
stop();
}
bool get _isMounted {
final ref = _ref;
if (ref is Ref) {
return ref.mounted;
}
return !_disposed;
}
}
/// Type signature for registering delta listeners within helper utilities.
typedef RegisterConversationDeltaListener =
VoidCallback Function({
required ConversationDeltaRequest request,
required ConversationDeltaDataCallback onDelta,
required ConversationDeltaErrorCallback onError,
});
/// Convenience factory that wires up [ConversationDeltaListener] and returns
/// the disposer callback expected by streaming helpers.
RegisterConversationDeltaListener createConversationDeltaRegistrar(
dynamic ref,
) {
return ({
required ConversationDeltaRequest request,
required ConversationDeltaDataCallback onDelta,
required ConversationDeltaErrorCallback onError,
}) {
final listener = ConversationDeltaListener(
ref: ref,
request: request,
onDelta: onDelta,
onError: onError,
)..start();
return listener.dispose;
};
}

View File

@@ -10,6 +10,7 @@ import '../../core/services/socket_service.dart';
import '../../core/utils/inactivity_watchdog.dart';
import '../../core/utils/tool_calls_parser.dart';
import 'navigation_service.dart';
import 'conversation_delta_listener.dart';
import '../../shared/widgets/themed_dialogs.dart';
import '../../shared/theme/theme_extensions.dart';
import '../utils/debug_logger.dart';
@@ -46,8 +47,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
required String? activeConversationId,
required dynamic api,
required SocketService? socketService,
Stream<ConversationDelta>? chatEvents,
Stream<ConversationDelta>? channelEvents,
RegisterConversationDeltaListener? registerDeltaListener,
// Message update callbacks
required void Function(String) appendToLastMessage,
required void Function(String) replaceLastMessageContent,
@@ -97,7 +97,7 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
InactivityWatchdog? socketWatchdog;
final socketSubscriptions = <VoidCallback>[];
final hasSocketSignals =
socketService != null || chatEvents != null || channelEvents != null;
socketService != null || registerDeltaListener != null;
if (hasSocketSignals) {
// Increase timeout to match OpenWebUI's more generous timeouts for long responses
socketWatchdog = InactivityWatchdog(
@@ -991,14 +991,27 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
} catch (_) {}
}
if (chatEvents != null) {
final subscription = chatEvents.listen((event) {
socketWatchdog?.ping();
chatHandler(event.raw, event.ack);
});
socketSubscriptions.add(() {
unawaited(subscription.cancel());
});
if (registerDeltaListener != null) {
final chatDisposer = registerDeltaListener(
request: ConversationDeltaRequest.chat(
conversationId: activeConversationId,
sessionId: sessionId,
requireFocus: false,
),
onDelta: (event) {
socketWatchdog?.ping();
chatHandler(event.raw, event.ack);
},
onError: (error, stackTrace) {
DebugLogger.error(
'Chat delta listener error',
scope: 'streaming/helper',
error: error,
stackTrace: stackTrace,
);
},
);
socketSubscriptions.add(chatDisposer);
} else if (socketService != null) {
final chatSub = socketService.addChatEventHandler(
conversationId: activeConversationId,
@@ -1008,15 +1021,27 @@ ActiveSocketStream attachUnifiedChunkedStreaming({
);
socketSubscriptions.add(chatSub.dispose);
}
if (channelEvents != null) {
final subscription = channelEvents.listen((event) {
socketWatchdog?.ping();
channelEventsHandler(event.raw, event.ack);
});
socketSubscriptions.add(() {
unawaited(subscription.cancel());
});
if (registerDeltaListener != null) {
final channelDisposer = registerDeltaListener(
request: ConversationDeltaRequest.channel(
conversationId: activeConversationId,
sessionId: sessionId,
requireFocus: false,
),
onDelta: (event) {
socketWatchdog?.ping();
channelEventsHandler(event.raw, event.ack);
},
onError: (error, stackTrace) {
DebugLogger.error(
'Channel delta listener error',
scope: 'streaming/helper',
error: error,
stackTrace: stackTrace,
);
},
);
socketSubscriptions.add(channelDisposer);
} else if (socketService != null) {
final channelSub = socketService.addChannelEventHandler(
conversationId: activeConversationId,

View File

@@ -10,8 +10,8 @@ import 'package:yaml/yaml.dart' as yaml;
import '../../../core/auth/auth_state_manager.dart';
import '../../../core/models/chat_message.dart';
import '../../../core/models/conversation.dart';
import '../../../core/models/socket_event.dart';
import '../../../core/providers/app_providers.dart';
import '../../../core/services/conversation_delta_listener.dart';
import '../../../core/services/streaming_helper.dart';
import '../../../core/services/streaming_response_controller.dart';
import '../../../core/utils/debug_logger.dart';
@@ -1331,23 +1331,7 @@ Future<void> regenerateMessage(
});
} catch (_) {}
final chatEventsHandle = _conversationDeltaStream(
ref,
ConversationDeltaRequest.chat(
conversationId: activeConversation.id,
sessionId: effectiveSessionId,
requireFocus: false,
),
);
final channelEventsHandle = _conversationDeltaStream(
ref,
ConversationDeltaRequest.channel(
conversationId: activeConversation.id,
sessionId: effectiveSessionId,
requireFocus: false,
),
);
final registerDeltaListener = createConversationDeltaRegistrar(ref);
final activeStream = attachUnifiedChunkedStreaming(
stream: stream,
@@ -1359,8 +1343,7 @@ Future<void> regenerateMessage(
activeConversationId: activeConversation.id,
api: api,
socketService: socketService,
chatEvents: chatEventsHandle.stream,
channelEvents: channelEventsHandle.stream,
registerDeltaListener: registerDeltaListener,
appendToLastMessage: (c) =>
ref.read(chatMessagesProvider.notifier).appendToLastMessage(c),
replaceLastMessageContent: (c) =>
@@ -1411,11 +1394,10 @@ Future<void> regenerateMessage(
);
ref.read(chatMessagesProvider.notifier)
..setMessageStream(activeStream.controller)
..setSocketSubscriptions([
...activeStream.socketSubscriptions,
chatEventsHandle.dispose,
channelEventsHandle.dispose,
], onDispose: activeStream.disposeWatchdog);
..setSocketSubscriptions(
activeStream.socketSubscriptions,
onDispose: activeStream.disposeWatchdog,
);
return;
} catch (e) {
rethrow;
@@ -1451,44 +1433,6 @@ Future<void> sendMessageWithContainer(
await _sendMessageInternal(container, message, attachments, toolIds);
}
({Stream<ConversationDelta> stream, void Function() dispose})
_conversationDeltaStream(dynamic ref, ConversationDeltaRequest request) {
final controller = StreamController<ConversationDelta>.broadcast();
var isDisposed = false;
void close() {
if (isDisposed) return;
isDisposed = true;
controller.close();
}
final subscription = ref.listen<AsyncValue<ConversationDelta>>(
conversationDeltaStreamProvider(request),
(previous, next) {
if (next is AsyncData<ConversationDelta>) {
if (!controller.isClosed) controller.add(next.value);
return;
}
if (next is AsyncError<ConversationDelta>) {
if (!controller.isClosed) {
controller.addError(next.error, next.stackTrace);
}
}
},
fireImmediately: false,
);
void dispose() {
if (isDisposed) return;
subscription.close();
close();
}
controller.onCancel = dispose;
return (stream: controller.stream, dispose: dispose);
}
// Internal send message implementation
Future<void> _sendMessageInternal(
dynamic ref,
@@ -1934,23 +1878,7 @@ Future<void> _sendMessageInternal(
});
} catch (_) {}
final chatEventsHandle = _conversationDeltaStream(
ref,
ConversationDeltaRequest.chat(
conversationId: activeConversation?.id,
sessionId: effectiveSessionId,
requireFocus: false,
),
);
final channelEventsHandle = _conversationDeltaStream(
ref,
ConversationDeltaRequest.channel(
conversationId: activeConversation?.id,
sessionId: effectiveSessionId,
requireFocus: false,
),
);
final registerDeltaListener = createConversationDeltaRegistrar(ref);
final activeStream = attachUnifiedChunkedStreaming(
stream: stream,
@@ -1962,8 +1890,7 @@ Future<void> _sendMessageInternal(
activeConversationId: activeConversation?.id,
api: api,
socketService: socketService,
chatEvents: chatEventsHandle.stream,
channelEvents: channelEventsHandle.stream,
registerDeltaListener: registerDeltaListener,
appendToLastMessage: (c) =>
ref.read(chatMessagesProvider.notifier).appendToLastMessage(c),
replaceLastMessageContent: (c) =>
@@ -2015,11 +1942,10 @@ Future<void> _sendMessageInternal(
ref.read(chatMessagesProvider.notifier)
..setMessageStream(activeStream.controller)
..setSocketSubscriptions([
...activeStream.socketSubscriptions,
chatEventsHandle.dispose,
channelEventsHandle.dispose,
], onDispose: activeStream.disposeWatchdog);
..setSocketSubscriptions(
activeStream.socketSubscriptions,
onDispose: activeStream.disposeWatchdog,
);
return;
} catch (e) {
// Handle error - remove the assistant message placeholder