fix: stop button

This commit is contained in:
cogwheel0
2025-09-01 20:26:29 +05:30
parent 047a67a0c5
commit 53e16237df
2 changed files with 110 additions and 2 deletions

View File

@@ -2569,6 +2569,9 @@ class ApiService {
} }
// Chat streaming with conversation context // Chat streaming with conversation context
// Track cancellable streaming requests by messageId for stop parity
final Map<String, CancelToken> _streamCancelTokens = {};
final Map<String, String> _messagePersistentStreamIds = {};
// Send message with SSE streaming // Send message with SSE streaming
// Returns a record with (stream, messageId, sessionId) // Returns a record with (stream, messageId, sessionId)
@@ -2801,6 +2804,28 @@ class ApiService {
); );
} }
// === Tasks control (parity with Web client) ===
Future<void> stopTask(String taskId) async {
try {
await _dio.post('/api/tasks/stop/$taskId');
} catch (e) {
rethrow;
}
}
Future<List<String>> getTaskIdsByChat(String chatId) async {
try {
final resp = await _dio.get('/api/tasks/chat/$chatId');
final data = resp.data;
if (data is Map && data['task_ids'] is List) {
return (data['task_ids'] as List).map((e) => e.toString()).toList();
}
return const [];
} catch (e) {
rethrow;
}
}
// Poll the server chat until the assistant message is populated with tool results, // Poll the server chat until the assistant message is populated with tool results,
// then stream deltas to the UI and close. // then stream deltas to the UI and close.
Future<void> _pollChatForMessageUpdates({ Future<void> _pollChatForMessageUpdates({
@@ -2996,6 +3021,23 @@ class ApiService {
} }
} }
// Cancel an active streaming message by its messageId (client-side abort)
void cancelStreamingMessage(String messageId) {
try {
final token = _streamCancelTokens.remove(messageId);
if (token != null && !token.isCancelled) {
token.cancel('User cancelled');
}
} catch (_) {}
try {
final pid = _messagePersistentStreamIds.remove(messageId);
if (pid != null) {
PersistentStreamingService().unregisterStream(pid);
}
} catch (_) {}
}
// SSE streaming with persistent background support - Main Implementation // SSE streaming with persistent background support - Main Implementation
void _streamSSE( void _streamSSE(
Map<String, dynamic> data, Map<String, dynamic> data,
@@ -3005,6 +3047,9 @@ class ApiService {
final persistentService = PersistentStreamingService(); final persistentService = PersistentStreamingService();
final recoveryService = StreamRecoveryService(); final recoveryService = StreamRecoveryService();
final streamId = DateTime.now().millisecondsSinceEpoch.toString(); final streamId = DateTime.now().millisecondsSinceEpoch.toString();
// Create a cancel token for this SSE request and store it by message
final cancelToken = CancelToken();
_streamCancelTokens[messageId] = cancelToken;
// Extract metadata for recovery // Extract metadata for recovery
final conversationId = data['conversation_id'] ?? data['chat_id'] ?? ''; final conversationId = data['conversation_id'] ?? data['chat_id'] ?? '';
@@ -3072,6 +3117,7 @@ class ApiService {
responseType: ResponseType.stream, responseType: ResponseType.stream,
receiveTimeout: null, receiveTimeout: null,
), ),
cancelToken: cancelToken,
); );
debugPrint('DEBUG: SSE response status: ${response.statusCode}'); debugPrint('DEBUG: SSE response status: ${response.statusCode}');
@@ -3245,12 +3291,27 @@ class ApiService {
persistentService.unregisterStream(persistentStreamId); persistentService.unregisterStream(persistentStreamId);
} }
recoveryService.unregisterStream(streamId); recoveryService.unregisterStream(streamId);
_streamCancelTokens.remove(messageId);
_messagePersistentStreamIds.remove(messageId);
if (!streamController.isClosed) { if (!streamController.isClosed) {
streamController.close(); streamController.close();
} }
}, },
onError: (error) async { onError: (error) async {
debugPrint('Persistent: SSE stream error: $error'); debugPrint('Persistent: SSE stream error: $error');
// If this was a user cancellation, close quietly
if (error is DioException && error.type == DioExceptionType.cancel) {
if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId);
}
recoveryService.unregisterStream(streamId);
_streamCancelTokens.remove(messageId);
_messagePersistentStreamIds.remove(messageId);
if (!streamController.isClosed) {
streamController.close();
}
return;
}
// Try recovery through recovery service first // Try recovery through recovery service first
final recoveredStream = await recoveryService.recoverStream(streamId); final recoveredStream = await recoveryService.recoverStream(streamId);
@@ -3303,12 +3364,16 @@ class ApiService {
'requestData': data, 'requestData': data,
}, },
); );
// Track the persistent stream id by message for cancellation
_messagePersistentStreamIds[messageId] = persistentStreamId;
} catch (e) { } catch (e) {
debugPrint('Persistent: Failed to create SSE stream: $e'); debugPrint('Persistent: Failed to create SSE stream: $e');
if (persistentStreamId != null) { if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId); persistentService.unregisterStream(persistentStreamId);
} }
recoveryService.unregisterStream(streamId); recoveryService.unregisterStream(streamId);
_streamCancelTokens.remove(messageId);
_messagePersistentStreamIds.remove(messageId);
if (e is DioException && e.response?.statusCode == 401) { if (e is DioException && e.response?.statusCode == 401) {
// Auth error - don't retry // Auth error - don't retry

View File

@@ -85,6 +85,11 @@ class ChatMessagesNotifier extends StateNotifier<List<ChatMessage>> {
_messageStream = null; _messageStream = null;
} }
// Public wrapper to cancel the currently active stream (used by Stop)
void cancelActiveMessageStream() {
_cancelMessageStream();
}
Future<void> _updateModelForConversation(Conversation conversation) async { Future<void> _updateModelForConversation(Conversation conversation) async {
// Check if conversation has a model specified // Check if conversation has a model specified
if (conversation.model == null || conversation.model!.isEmpty) { if (conversation.model == null || conversation.model!.isEmpty) {
@@ -2514,8 +2519,46 @@ final regenerateLastMessageProvider = Provider<Future<void> Function()>((ref) {
// Stop generation provider // Stop generation provider
final stopGenerationProvider = Provider<void Function()>((ref) { final stopGenerationProvider = Provider<void Function()>((ref) {
return () { return () {
// This would need to be implemented with proper cancellation support try {
// For now, just mark streaming as complete final messages = ref.read(chatMessagesProvider);
if (messages.isNotEmpty &&
messages.last.role == 'assistant' &&
messages.last.isStreaming) {
final lastId = messages.last.id;
// Cancel the network stream (SSE) if active
final api = ref.read(apiServiceProvider);
api?.cancelStreamingMessage(lastId);
// Stop any active socket listeners for chat/channel events
try {
final socketService = ref.read(socketServiceProvider);
socketService?.offChatEvents();
socketService?.offChannelEvents();
} catch (_) {}
// Cancel local stream subscription to stop propagating further chunks
ref.read(chatMessagesProvider.notifier).cancelActiveMessageStream();
}
} catch (_) {}
// Best-effort: stop any background tasks associated with this chat (parity with web)
try {
final api = ref.read(apiServiceProvider);
final activeConv = ref.read(activeConversationProvider);
if (api != null && activeConv != null) {
unawaited(() async {
try {
final ids = await api.getTaskIdsByChat(activeConv.id);
for (final t in ids) {
try { await api.stopTask(t); } catch (_) {}
}
} catch (_) {}
}());
}
} catch (_) {}
// Ensure UI transitions out of streaming state
ref.read(chatMessagesProvider.notifier).finishStreaming(); ref.read(chatMessagesProvider.notifier).finishStreaming();
}; };
}); });