diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index d735685..5e20901 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -2569,6 +2569,9 @@ class ApiService { } // Chat streaming with conversation context + // Track cancellable streaming requests by messageId for stop parity + final Map _streamCancelTokens = {}; + final Map _messagePersistentStreamIds = {}; // Send message with SSE streaming // Returns a record with (stream, messageId, sessionId) @@ -2801,6 +2804,28 @@ class ApiService { ); } + // === Tasks control (parity with Web client) === + Future stopTask(String taskId) async { + try { + await _dio.post('/api/tasks/stop/$taskId'); + } catch (e) { + rethrow; + } + } + + Future> getTaskIdsByChat(String chatId) async { + try { + final resp = await _dio.get('/api/tasks/chat/$chatId'); + final data = resp.data; + if (data is Map && data['task_ids'] is List) { + return (data['task_ids'] as List).map((e) => e.toString()).toList(); + } + return const []; + } catch (e) { + rethrow; + } + } + // Poll the server chat until the assistant message is populated with tool results, // then stream deltas to the UI and close. Future _pollChatForMessageUpdates({ @@ -2996,6 +3021,23 @@ class ApiService { } } + // Cancel an active streaming message by its messageId (client-side abort) + void cancelStreamingMessage(String messageId) { + try { + final token = _streamCancelTokens.remove(messageId); + if (token != null && !token.isCancelled) { + token.cancel('User cancelled'); + } + } catch (_) {} + + try { + final pid = _messagePersistentStreamIds.remove(messageId); + if (pid != null) { + PersistentStreamingService().unregisterStream(pid); + } + } catch (_) {} + } + // SSE streaming with persistent background support - Main Implementation void _streamSSE( Map data, @@ -3005,6 +3047,9 @@ class ApiService { final persistentService = PersistentStreamingService(); final recoveryService = StreamRecoveryService(); final streamId = DateTime.now().millisecondsSinceEpoch.toString(); + // Create a cancel token for this SSE request and store it by message + final cancelToken = CancelToken(); + _streamCancelTokens[messageId] = cancelToken; // Extract metadata for recovery final conversationId = data['conversation_id'] ?? data['chat_id'] ?? ''; @@ -3072,6 +3117,7 @@ class ApiService { responseType: ResponseType.stream, receiveTimeout: null, ), + cancelToken: cancelToken, ); debugPrint('DEBUG: SSE response status: ${response.statusCode}'); @@ -3245,12 +3291,27 @@ class ApiService { persistentService.unregisterStream(persistentStreamId); } recoveryService.unregisterStream(streamId); + _streamCancelTokens.remove(messageId); + _messagePersistentStreamIds.remove(messageId); if (!streamController.isClosed) { streamController.close(); } }, onError: (error) async { debugPrint('Persistent: SSE stream error: $error'); + // If this was a user cancellation, close quietly + if (error is DioException && error.type == DioExceptionType.cancel) { + if (persistentStreamId != null) { + persistentService.unregisterStream(persistentStreamId); + } + recoveryService.unregisterStream(streamId); + _streamCancelTokens.remove(messageId); + _messagePersistentStreamIds.remove(messageId); + if (!streamController.isClosed) { + streamController.close(); + } + return; + } // Try recovery through recovery service first final recoveredStream = await recoveryService.recoverStream(streamId); @@ -3303,12 +3364,16 @@ class ApiService { 'requestData': data, }, ); + // Track the persistent stream id by message for cancellation + _messagePersistentStreamIds[messageId] = persistentStreamId; } catch (e) { debugPrint('Persistent: Failed to create SSE stream: $e'); if (persistentStreamId != null) { persistentService.unregisterStream(persistentStreamId); } recoveryService.unregisterStream(streamId); + _streamCancelTokens.remove(messageId); + _messagePersistentStreamIds.remove(messageId); if (e is DioException && e.response?.statusCode == 401) { // Auth error - don't retry diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index fb83384..f3fbaaa 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -85,6 +85,11 @@ class ChatMessagesNotifier extends StateNotifier> { _messageStream = null; } + // Public wrapper to cancel the currently active stream (used by Stop) + void cancelActiveMessageStream() { + _cancelMessageStream(); + } + Future _updateModelForConversation(Conversation conversation) async { // Check if conversation has a model specified if (conversation.model == null || conversation.model!.isEmpty) { @@ -2514,8 +2519,46 @@ final regenerateLastMessageProvider = Provider Function()>((ref) { // Stop generation provider final stopGenerationProvider = Provider((ref) { return () { - // This would need to be implemented with proper cancellation support - // For now, just mark streaming as complete + try { + final messages = ref.read(chatMessagesProvider); + if (messages.isNotEmpty && + messages.last.role == 'assistant' && + messages.last.isStreaming) { + final lastId = messages.last.id; + + // Cancel the network stream (SSE) if active + final api = ref.read(apiServiceProvider); + api?.cancelStreamingMessage(lastId); + + // Stop any active socket listeners for chat/channel events + try { + final socketService = ref.read(socketServiceProvider); + socketService?.offChatEvents(); + socketService?.offChannelEvents(); + } catch (_) {} + + // Cancel local stream subscription to stop propagating further chunks + ref.read(chatMessagesProvider.notifier).cancelActiveMessageStream(); + } + } catch (_) {} + + // Best-effort: stop any background tasks associated with this chat (parity with web) + try { + final api = ref.read(apiServiceProvider); + final activeConv = ref.read(activeConversationProvider); + if (api != null && activeConv != null) { + unawaited(() async { + try { + final ids = await api.getTaskIdsByChat(activeConv.id); + for (final t in ids) { + try { await api.stopTask(t); } catch (_) {} + } + } catch (_) {} + }()); + } + } catch (_) {} + + // Ensure UI transitions out of streaming state ref.read(chatMessagesProvider.notifier).finishStreaming(); }; });