refactor: remove unused code

This commit is contained in:
cogwheel0
2025-10-01 00:35:56 +05:30
parent fc4430e8df
commit 03bb03446b
3 changed files with 85 additions and 124 deletions

View File

@@ -404,47 +404,18 @@ class SocketConnectionStream extends _$SocketConnectionStream {
}
@Riverpod(keepAlive: true)
class ConversationDeltaStream extends _$ConversationDeltaStream {
StreamController<ConversationDelta>? _controller;
ProviderSubscription<AsyncValue<SocketService?>>? _serviceSubscription;
SocketEventSubscription? _socketSubscription;
Stream<ConversationDelta> conversationDeltaStream(
Ref ref,
ConversationDeltaRequest request,
) {
final controller = StreamController<ConversationDelta>.broadcast(sync: true);
@override
Stream<ConversationDelta> build(ConversationDeltaRequest request) {
final controller = StreamController<ConversationDelta>.broadcast(
sync: true,
onCancel: _maybeTearDownSocket,
);
_controller = controller;
ProviderSubscription<AsyncValue<SocketService?>>? serviceSubscription;
SocketEventSubscription? socketSubscription;
final initialService = ref
.watch(socketServiceManagerProvider)
.maybeWhen(data: (service) => service, orElse: () => null);
_bindSocket(initialService, request);
_serviceSubscription = ref.listen<AsyncValue<SocketService?>>(
socketServiceManagerProvider,
(_, next) => _bindSocket(
next.maybeWhen(data: (service) => service, orElse: () => null),
request,
),
);
ref.onDispose(() {
_serviceSubscription?.close();
_serviceSubscription = null;
_socketSubscription?.dispose();
_socketSubscription = null;
_controller?.close();
_controller = null;
});
return controller.stream;
}
void _bindSocket(SocketService? service, ConversationDeltaRequest request) {
_socketSubscription?.dispose();
_socketSubscription = null;
void bindSocket(SocketService? service) {
socketSubscription?.dispose();
socketSubscription = null;
if (service == null) {
return;
@@ -452,68 +423,65 @@ class ConversationDeltaStream extends _$ConversationDeltaStream {
switch (request.source) {
case ConversationDeltaSource.chat:
_socketSubscription = service.addChatEventHandler(
socketSubscription = service.addChatEventHandler(
conversationId: request.conversationId,
sessionId: request.sessionId,
requireFocus: request.requireFocus,
handler: (event, ack) {
_controller?.add(
if (!controller.isClosed) {
controller.add(
ConversationDelta.fromSocketEvent(
ConversationDeltaSource.chat,
event,
ack,
),
);
}
},
);
break;
case ConversationDeltaSource.channel:
_socketSubscription = service.addChannelEventHandler(
socketSubscription = service.addChannelEventHandler(
conversationId: request.conversationId,
sessionId: request.sessionId,
requireFocus: request.requireFocus,
handler: (event, ack) {
_controller?.add(
if (!controller.isClosed) {
controller.add(
ConversationDelta.fromSocketEvent(
ConversationDeltaSource.channel,
event,
ack,
),
);
}
},
);
break;
}
}
void _maybeTearDownSocket() {
if (_controller?.hasListener == true) {
return;
}
_socketSubscription?.dispose();
_socketSubscription = null;
}
final initialService = ref
.watch(socketServiceManagerProvider)
.maybeWhen(data: (service) => service, orElse: () => null);
bindSocket(initialService);
/// Provides direct access to the underlying stream.
/// Note: This getter is necessary for compatibility with StreamProvider.
/// While Riverpod 3 discourages public getters on Notifiers, this is a
/// pragmatic exception for stream delegation patterns.
// ignore: avoid_public_notifier_properties
Stream<ConversationDelta> get stream =>
_controller?.stream ?? const Stream<ConversationDelta>.empty();
}
final conversationDeltaEventsProvider =
StreamProvider.family<ConversationDelta, ConversationDeltaRequest>((
ref,
request,
) {
final notifier = ref.watch(
conversationDeltaStreamProvider(request).notifier,
serviceSubscription = ref.listen<AsyncValue<SocketService?>>(
socketServiceManagerProvider,
(_, next) => bindSocket(
next.maybeWhen(data: (service) => service, orElse: () => null),
),
);
return notifier.stream;
ref.onDispose(() {
serviceSubscription?.close();
socketSubscription?.dispose();
controller.close();
});
return controller.stream;
}
// Attachment upload queue provider
final attachmentUploadQueueProvider = Provider<AttachmentUploadQueue?>((ref) {
final api = ref.watch(apiServiceProvider);

View File

@@ -1331,29 +1331,25 @@ Future<void> regenerateMessage(
});
} catch (_) {}
final chatEventsStream = ref
.read(
final chatEventsStream = ref.read(
conversationDeltaStreamProvider(
ConversationDeltaRequest.chat(
conversationId: activeConversation.id,
sessionId: effectiveSessionId,
requireFocus: false,
),
).notifier,
)
.stream;
),
);
final channelEventsStream = ref
.read(
final channelEventsStream = ref.read(
conversationDeltaStreamProvider(
ConversationDeltaRequest.channel(
conversationId: activeConversation.id,
sessionId: effectiveSessionId,
requireFocus: false,
),
).notifier,
)
.stream;
),
);
final activeStream = attachUnifiedChunkedStreaming(
stream: stream,
@@ -1901,29 +1897,25 @@ Future<void> _sendMessageInternal(
});
} catch (_) {}
final chatEventsStream = ref
.read(
final chatEventsStream = ref.read(
conversationDeltaStreamProvider(
ConversationDeltaRequest.chat(
conversationId: activeConversation?.id,
sessionId: effectiveSessionId,
requireFocus: false,
),
).notifier,
)
.stream;
),
);
final channelEventsStream = ref
.read(
final channelEventsStream = ref.read(
conversationDeltaStreamProvider(
ConversationDeltaRequest.channel(
conversationId: activeConversation?.id,
sessionId: effectiveSessionId,
requireFocus: false,
),
).notifier,
)
.stream;
),
);
final activeStream = attachUnifiedChunkedStreaming(
stream: stream,

1
tmp/flutter_ai_repo Submodule

Submodule tmp/flutter_ai_repo added at 79187cf7e3