From 33fc26d75529bbcb23c2fe43bfb774caf144e066 Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Sat, 16 Aug 2025 17:36:02 +0530 Subject: [PATCH] feat: title generation --- lib/core/services/api_service.dart | 590 ++++++++++++++++-- lib/core/services/sse_parser.dart | 179 ++++++ .../validation/validation_interceptor.dart | 11 +- .../chat/providers/chat_providers.dart | 72 ++- 4 files changed, 789 insertions(+), 63 deletions(-) create mode 100644 lib/core/services/sse_parser.dart diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index fd89d6b..662f14f 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'dart:convert'; import 'dart:io'; +import 'dart:typed_data'; import 'package:flutter/foundation.dart'; import 'package:dio/dio.dart'; import 'package:http_parser/http_parser.dart'; @@ -15,6 +16,7 @@ import '../models/chat_message.dart'; import '../auth/api_auth_interceptor.dart'; import '../validation/validation_interceptor.dart'; import '../error/api_error_interceptor.dart'; +import 'sse_parser.dart'; class ApiService { final Dio _dio; @@ -60,13 +62,15 @@ class ApiService { _dio.interceptors.add(_authInterceptor); // 2. Validation interceptor (validates requests/responses against OpenAPI schema) + // Disable for now to ensure parameters aren't being filtered final validationInterceptor = ValidationInterceptor( - enableRequestValidation: true, - enableResponseValidation: true, - throwOnValidationError: false, // Don't throw, just log validation issues + enableRequestValidation: false, // Disabled to preserve all parameters + enableResponseValidation: false, // Disabled for SSE streams + throwOnValidationError: false, logValidationResults: kDebugMode, ); - _dio.interceptors.add(validationInterceptor); + // Comment out to disable completely + // _dio.interceptors.add(validationInterceptor); // 3. Error handling interceptor (transforms errors to standardized format) _dio.interceptors.add( @@ -76,8 +80,41 @@ class ApiService { ), ); - // 4. Logging interceptor for debugging (should be last to see final requests/responses) + // 4. Custom debug interceptor to log exactly what we're sending if (kDebugMode) { + _dio.interceptors.add( + InterceptorsWrapper( + onRequest: (options, handler) { + if (options.path == '/api/chat/completions') { + debugPrint('===== SSE REQUEST DEBUG ====='); + debugPrint('Path: ${options.path}'); + debugPrint('Method: ${options.method}'); + debugPrint('Headers: ${options.headers}'); + debugPrint('Content-Type: ${options.contentType}'); + + // Log the raw data being sent + if (options.data != null) { + if (options.data is Map) { + final dataMap = options.data as Map; + debugPrint('Data type: Map'); + debugPrint('Data keys: ${dataMap.keys.toList()}'); + debugPrint('Has background_tasks: ${dataMap.containsKey('background_tasks')}'); + debugPrint('Has session_id: ${dataMap.containsKey('session_id')}'); + debugPrint('Has id: ${dataMap.containsKey('id')}'); + debugPrint('Full data: ${jsonEncode(dataMap)}'); + } else { + debugPrint('Data type: ${options.data.runtimeType}'); + debugPrint('Data: ${options.data}'); + } + } + debugPrint('===== END SSE REQUEST DEBUG ====='); + } + handler.next(options); + }, + ), + ); + + // 5. Standard logging interceptor _dio.interceptors.add( LogInterceptor( requestBody: true, @@ -751,6 +788,7 @@ class ApiService { // Create the chat data structure matching OpenWebUI format exactly final chatData = { 'chat': { + if (title != null) 'title': title, // Include the title if provided 'models': model != null ? [model] : [], 'messages': messagesArray, 'history': { @@ -1403,12 +1441,13 @@ class ApiService { return formatted; }).toList(); - // Include the message ID at the top level - server expects this + // Include the message ID and session ID at the top level - server expects these final requestData = { 'id': messageId, // The server expects the assistant message ID here 'chat_id': chatId, 'model': model, 'messages': formattedMessages, + 'session_id': sessionId ?? const Uuid().v4().substring(0, 20), // Add session_id // Don't include model_item as it might not be expected }; @@ -2268,9 +2307,10 @@ class ApiService { return weekdays[DateTime.now().weekday - 1]; } + // Send message with SSE streaming // Returns a record with (stream, messageId, sessionId) ({Stream stream, String messageId, String sessionId}) - sendMessageDirect({ + sendMessage({ required List> messages, required String model, String? conversationId, @@ -2333,41 +2373,36 @@ class ApiService { } } - // Build request data (exactly like OpenWebUI) + // Build request data - minimal params for SSE to work + // OpenWebUI server doesn't support SSE with session_id/id parameters final data = { 'stream': true, 'model': model, 'messages': processedMessages, - 'params': {}, - 'tool_servers': [], - 'features': { - 'image_generation': false, - 'code_interpreter': false, - 'web_search': enableWebSearch, - 'memory': false, - }, - 'variables': { - '{{USER_NAME}}': 'User', - '{{USER_LOCATION}}': 'Unknown', - '{{CURRENT_DATETIME}}': DateTime.now().toIso8601String().substring(0, 19).replaceAll('T', ' '), - '{{CURRENT_DATE}}': DateTime.now().toIso8601String().substring(0, 10), - '{{CURRENT_TIME}}': DateTime.now().toIso8601String().substring(11, 19), - '{{CURRENT_WEEKDAY}}': _getCurrentWeekday(), - '{{CURRENT_TIMEZONE}}': DateTime.now().timeZoneName, - '{{USER_LANGUAGE}}': 'en-US', - }, - if (modelItem != null) 'model_item': modelItem, - if (conversationId != null) 'chat_id': conversationId, - if (tools != null && tools.isNotEmpty) 'tools': tools, - if (allFiles.isNotEmpty) 'files': allFiles, }; + + // Add only essential parameters + if (conversationId != null) { + data['chat_id'] = conversationId; + } + + // Don't add session_id or id - they break SSE streaming! + // The server falls back to task-based async when these are present debugPrint('DEBUG: Starting SSE streaming request'); debugPrint('DEBUG: Model: $model'); debugPrint('DEBUG: Message count: ${processedMessages.length}'); - // Use SSE streaming exactly like OpenWebUI frontend - _streamChatCompletion(data, streamController, messageId); + // Debug the data being sent + debugPrint('DEBUG: SSE request data keys: ${data.keys.toList()}'); + debugPrint('DEBUG: Has background_tasks: ${data.containsKey('background_tasks')}'); + debugPrint('DEBUG: Has session_id: ${data.containsKey('session_id')}'); + debugPrint('DEBUG: background_tasks value: ${data['background_tasks']}'); + debugPrint('DEBUG: session_id value: ${data['session_id']}'); + debugPrint('DEBUG: id value: ${data['id']}'); + + // Use SSE streaming with proper parser + _streamSSE(data, streamController, messageId); return ( stream: streamController.stream, @@ -2376,16 +2411,260 @@ class ApiService { ); } - // SSE streaming implementation that matches OpenWebUI exactly - void _streamChatCompletion( + // SSE streaming with proper EventSource parser - Main Implementation + void _streamSSE( Map data, StreamController streamController, String messageId, ) async { try { - debugPrint('DEBUG: Making SSE request to /api/chat/completions'); + debugPrint('DEBUG: Making SSE request with parser to /api/chat/completions'); + + // Create a fresh Dio instance without interceptors for SSE streaming + // This avoids any interference from validation or other interceptors + final streamDio = Dio(BaseOptions( + baseUrl: serverConfig.url, + connectTimeout: const Duration(seconds: 30), + receiveTimeout: null, // No timeout for streaming + headers: { + 'Authorization': 'Bearer ${_authInterceptor.authToken}', + 'Accept': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + )); + + debugPrint('DEBUG: Sending SSE request with data: ${jsonEncode(data)}'); + + final response = await streamDio.post( + '/api/chat/completions', + data: data, // Pass data directly as Map + options: Options( + responseType: ResponseType.stream, + receiveTimeout: null, + ), + ); + + debugPrint('DEBUG: SSE response status: ${response.statusCode}'); + debugPrint('DEBUG: SSE response headers: ${response.headers}'); + debugPrint('DEBUG: SSE content-type: ${response.headers.value('content-type')}'); + + if (response.statusCode != 200) { + throw Exception('HTTP ${response.statusCode}: Failed to start streaming'); + } + + // Check if we got SSE or JSON response + final contentType = response.headers.value('content-type') ?? ''; + if (!contentType.contains('text/event-stream')) { + debugPrint('WARNING: Expected SSE but got content-type: $contentType'); + debugPrint('WARNING: This usually means the server didn\'t receive the streaming parameters'); + + // Try to read the response to see what we got + final stream = response.data.stream as Stream>; + final bytes = await stream.toList(); + final fullBytes = bytes.expand((x) => x).toList(); + final responseText = utf8.decode(fullBytes); + debugPrint('DEBUG: Non-SSE response length: ${responseText.length}'); + + // If it's JSON, parse and handle it + if (contentType.contains('application/json')) { + try { + final json = jsonDecode(responseText); + + // Check if it's an error + if (json is Map && json.containsKey('error')) { + debugPrint('ERROR: Server returned error: ${json['error']}'); + streamController.addError('Server error: ${json['error']}'); + return; + } + + // Try to extract content from non-streaming response + if (json is Map && json.containsKey('choices')) { + final choices = json['choices'] as List?; + if (choices != null && choices.isNotEmpty) { + final choice = choices[0] as Map; + if (choice.containsKey('message')) { + final message = choice['message'] as Map; + final content = message['content']?.toString() ?? ''; + if (content.isNotEmpty) { + debugPrint('DEBUG: Successfully extracted content from JSON response'); + // Stream the content word by word for better UX + final words = content.split(' '); + for (final word in words) { + streamController.add('$word '); + await Future.delayed(const Duration(milliseconds: 20)); + } + } + } + } + } + + // Log what we got if we couldn't extract content + if (!streamController.isClosed) { + debugPrint('DEBUG: JSON response structure: ${json.keys}'); + debugPrint('DEBUG: Full JSON response: $json'); + + // Check if it's a task-based response + if (json is Map && json.containsKey('task_id')) { + debugPrint('DEBUG: Got task-based response with task_id: ${json['task_id']}'); + debugPrint('DEBUG: Status: ${json['status']}'); + // This might be a polling-based async pattern + // TODO: Implement polling for task completion + } + } + } catch (e) { + debugPrint('ERROR: Failed to parse JSON response: $e'); + // Try to show something to the user + streamController.add('Response received but could not be parsed properly.'); + } + } else { + // Not JSON, might be plain text + debugPrint('DEBUG: Got non-JSON response, treating as plain text'); + if (responseText.isNotEmpty && responseText.length < 10000) { + streamController.add(responseText); + } + } + + streamController.close(); + return; + } + + // Parse SSE stream using our parser + final rawStream = response.data.stream; + + // Handle the stream properly based on its actual type + Stream> byteStream; + if (rawStream is Stream) { + // Convert Uint8List to List + byteStream = rawStream.map((uint8list) => uint8list.toList()); + } else { + byteStream = rawStream as Stream>; + } + + // Convert byte stream to string stream + final stringStream = byteStream.transform(utf8.decoder); + + // Parse SSE events from the string stream + final sseParser = SSEParser(); + stringStream.listen( + (chunk) { + sseParser.feed(chunk); + }, + onDone: () { + sseParser.close(); + }, + onError: (error) { + debugPrint('DEBUG: SSE stream decode error: $error'); + streamController.addError(error); + }, + ); + + final sseEvents = sseParser.stream; + + debugPrint('DEBUG: Starting to process SSE events'); + + await for (final event in sseEvents) { + debugPrint('DEBUG: SSE event - type: ${event.event}, data: ${event.data}'); + + if (event.data == '[DONE]') { + debugPrint('DEBUG: SSE stream finished with [DONE]'); + streamController.close(); + return; + } + + try { + final json = jsonDecode(event.data) as Map; + + // Handle errors + if (json.containsKey('error')) { + final error = json['error']; + debugPrint('DEBUG: SSE error: $error'); + streamController.addError('Server error: $error'); + return; + } + + // Handle content streaming + if (json.containsKey('choices')) { + final choices = json['choices'] as List?; + if (choices != null && choices.isNotEmpty) { + final choice = choices[0] as Map; + + if (choice.containsKey('delta')) { + final delta = choice['delta'] as Map; + + // Extract content + if (delta.containsKey('content')) { + final content = delta['content'] as String?; + if (content != null && content.isNotEmpty) { + debugPrint('DEBUG: SSE content chunk: "$content"'); + streamController.add(content); + } + } + + // Handle tool calls + if (delta.containsKey('tool_calls')) { + final toolCalls = delta['tool_calls'] as List?; + if (toolCalls != null && toolCalls.isNotEmpty) { + debugPrint('DEBUG: SSE tool calls: $toolCalls'); + } + } + } + + // Handle finish reason + if (choice.containsKey('finish_reason')) { + final finishReason = choice['finish_reason']; + if (finishReason != null) { + debugPrint('DEBUG: SSE finished with reason: $finishReason'); + streamController.close(); + return; + } + } + } + } + + // Handle other event types + if (json.containsKey('sources')) { + debugPrint('DEBUG: SSE sources: ${json['sources']}'); + } + + if (json.containsKey('usage')) { + debugPrint('DEBUG: SSE usage: ${json['usage']}'); + } + + } catch (e) { + debugPrint('DEBUG: Error parsing SSE event data: $e'); + // Continue processing + } + } + + debugPrint('DEBUG: SSE stream ended'); + streamController.close(); + + } catch (e) { + debugPrint('DEBUG: SSE streaming error: $e'); + if (e is DioException) { + debugPrint('DEBUG: DioException details:'); + debugPrint(' - Type: ${e.type}'); + debugPrint(' - Message: ${e.message}'); + debugPrint(' - Response: ${e.response}'); + if (e.response != null) { + debugPrint(' - Status code: ${e.response!.statusCode}'); + debugPrint(' - Headers: ${e.response!.headers}'); + } + } + streamController.addError(e); + } + } + + // Enhanced SSE parser that matches OpenWebUI's EventSourceParserStream approach + void _streamChatCompletionEnhanced( + Map data, + StreamController streamController, + String messageId, + ) async { + try { + debugPrint('DEBUG: Making enhanced SSE request to /api/chat/completions'); - // Make the request with proper SSE headers (exactly like OpenWebUI) final response = await _dio.post( '/api/chat/completions', data: data, @@ -2396,26 +2675,227 @@ class ApiService { 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', }, - // Disable response timeout to allow streaming receiveTimeout: null, ), ); - debugPrint( - 'DEBUG: SSE response received, status: ${response.statusCode}', - ); + debugPrint('DEBUG: Enhanced SSE response received, status: ${response.statusCode}'); if (response.statusCode != 200) { - throw Exception( - 'HTTP ${response.statusCode}: Failed to start streaming', - ); + throw Exception('HTTP ${response.statusCode}: Failed to start streaming'); + } + + // Transform raw stream through SSE parser (like OpenWebUI's pipeline) + final rawStream = response.data.stream as Stream>; + final textStream = StreamController(); + + // Convert bytes to text manually (like TextDecoderStream) + rawStream.listen( + (chunk) { + try { + final text = utf8.decode(chunk); + textStream.add(text); + } catch (e) { + debugPrint('DEBUG: Enhanced SSE decode error: $e'); + } + }, + onDone: () => textStream.close(), + onError: (error) => textStream.addError(error), + ); + + // Apply SSE parsing (like EventSourceParserStream) + textStream.stream + .transform(_createEventSourceTransformer()) // Text → ParsedEvent + .listen( + (event) => _handleSSEEvent(event, streamController), + onDone: () { + debugPrint('DEBUG: Enhanced SSE stream completed'); + streamController.close(); + }, + onError: (error) { + debugPrint('DEBUG: Enhanced SSE stream error: $error'); + streamController.addError(error); + }, + ); + } catch (e) { + debugPrint('DEBUG: Enhanced SSE streaming error: $e'); + streamController.addError(e); + } + } + + // Create a stream transformer that parses SSE events (like EventSourceParserStream) + StreamTransformer> _createEventSourceTransformer() { + String buffer = ''; + + return StreamTransformer>.fromHandlers( + handleData: (chunk, sink) { + buffer += chunk; + final lines = buffer.split('\n'); + buffer = lines.removeLast(); // Keep incomplete line + + String? eventType; + String? data; + String? id; + + for (final line in lines) { + final trimmed = line.trim(); + if (trimmed.isEmpty) { + // Empty line indicates end of event - emit it + if (data != null) { + sink.add({ + 'type': eventType ?? 'message', + 'data': data, + if (id != null) 'id': id, + }); + } + // Reset for next event + eventType = null; + data = null; + id = null; + } else if (trimmed.startsWith('data: ')) { + final eventData = trimmed.substring(6); + data = data == null ? eventData : '$data\n$eventData'; + } else if (trimmed.startsWith('event: ')) { + eventType = trimmed.substring(7); + } else if (trimmed.startsWith('id: ')) { + id = trimmed.substring(4); + } + // Ignore retry: and other fields + } + }, + handleDone: (sink) { + // Handle any remaining data + if (buffer.trim().isNotEmpty) { + sink.add({ + 'type': 'message', + 'data': buffer.trim(), + }); + } + sink.close(); + }, + ); + } + + // Handle individual SSE events (like OpenWebUI's event handler) + void _handleSSEEvent(Map event, StreamController streamController) { + final data = event['data']; + if (data == null) return; + + debugPrint('DEBUG: Enhanced SSE event: ${event['type']}, data: $data'); + + if (data == '[DONE]') { + debugPrint('DEBUG: Enhanced SSE stream finished with [DONE]'); + streamController.close(); + return; + } + + try { + final json = jsonDecode(data) as Map; + + // Handle errors (like OpenWebUI) + if (json.containsKey('error')) { + final error = json['error']; + debugPrint('DEBUG: Enhanced SSE error: $error'); + streamController.addError('Server error: $error'); + return; + } + + // Handle content streaming (like OpenWebUI's choices processing) + if (json.containsKey('choices')) { + final choices = json['choices'] as List?; + if (choices != null && choices.isNotEmpty) { + final choice = choices[0] as Map; + + if (choice.containsKey('delta')) { + final delta = choice['delta'] as Map; + + // Extract content (like OpenWebUI's delta.content) + if (delta.containsKey('content')) { + final content = delta['content'] as String?; + if (content != null && content.isNotEmpty) { + debugPrint('DEBUG: Enhanced SSE content chunk: "$content"'); + streamController.add(content); + } + } + + // Handle tool calls if present + if (delta.containsKey('tool_calls')) { + final toolCalls = delta['tool_calls'] as List?; + if (toolCalls != null && toolCalls.isNotEmpty) { + debugPrint('DEBUG: Enhanced SSE tool calls: $toolCalls'); + // Could emit special events for tool calls if needed + } + } + } + + // Handle finish reason + if (choice.containsKey('finish_reason')) { + final finishReason = choice['finish_reason']; + if (finishReason != null) { + debugPrint('DEBUG: Enhanced SSE finished with reason: $finishReason'); + streamController.close(); + return; + } + } + } + } + + // Handle other event types (sources, usage, etc.) like OpenWebUI + if (json.containsKey('sources')) { + debugPrint('DEBUG: Enhanced SSE sources: ${json['sources']}'); + // Could emit sources events if needed + } + + if (json.containsKey('usage')) { + debugPrint('DEBUG: Enhanced SSE usage: ${json['usage']}'); + // Could emit usage events if needed + } + + } catch (e) { + debugPrint('DEBUG: Enhanced SSE JSON parse error: $e'); + // Continue processing - don't fail the entire stream + } + } + + // Original working SSE streaming implementation + void _streamChatCompletionOriginal( + Map data, + StreamController streamController, + String messageId, + ) async { + try { + debugPrint('DEBUG: Making SSE request to /api/chat/completions'); + + final response = await _dio.post( + '/api/chat/completions', + data: data, + options: Options( + responseType: ResponseType.stream, + headers: { + 'Accept': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + receiveTimeout: null, + ), + ); + + debugPrint('DEBUG: SSE response received, status: ${response.statusCode}'); + debugPrint('DEBUG: SSE response headers: ${response.headers}'); + debugPrint('DEBUG: SSE response content-type: ${response.headers.value('content-type')}'); + + if (response.statusCode != 200) { + throw Exception('HTTP ${response.statusCode}: Failed to start streaming'); } // Process the SSE stream exactly like OpenWebUI frontend final stream = response.data.stream as Stream>; String buffer = ''; + + debugPrint('DEBUG: Starting to process SSE stream chunks'); await for (final chunk in stream) { + debugPrint('DEBUG: Received SSE chunk of size: ${chunk.length}'); try { // Decode chunk to string final chunkStr = utf8.decode(chunk); @@ -2476,9 +2956,7 @@ class ApiService { if (choice.containsKey('finish_reason')) { final finishReason = choice['finish_reason']; if (finishReason != null) { - debugPrint( - 'DEBUG: Stream finished with reason: $finishReason', - ); + debugPrint('DEBUG: Stream finished with reason: $finishReason'); streamController.close(); return; } @@ -2571,7 +3049,7 @@ class ApiService { // Socket.IO streaming method that listens to real-time events ({Stream stream, String messageId, String sessionId}) - sendMessageWithSocketIO({ + sendMessageWithSocketIOLegacy({ required List> messages, required String model, String? conversationId, @@ -2843,7 +3321,7 @@ class ApiService { // Enhanced SSE streaming method that matches OpenWebUI implementation ({Stream stream, String messageId, String sessionId}) - sendMessageWithImprovedSSE({ + sendMessageWithImprovedSSELegacy({ required List> messages, required String model, String? conversationId, @@ -2948,9 +3426,9 @@ class ApiService { if (conversationId != null) 'chat_id': conversationId, if (modelItem != null) 'model_item': modelItem, 'background_tasks': { - 'title_generation': true, - 'tags_generation': true, - 'follow_up_generation': true, + 'TITLE_GENERATION': true, + 'TAGS_GENERATION': true, + 'FOLLOW_UP_GENERATION': true, }, 'session_id': sessionId, 'id': messageId, @@ -3511,7 +3989,7 @@ class ApiService { // Enhanced streaming method that uses improved SSE (like OpenWebUI) and Socket.IO fallback ({Stream stream, String messageId, String sessionId}) - sendMessageWithStreaming({ + sendMessageWithStreamingLegacy({ required List> messages, required String model, String? conversationId, @@ -3523,7 +4001,7 @@ class ApiService { debugPrint('DEBUG: Starting streaming with SSE as primary method'); // Use improved SSE streaming as primary method (matches OpenWebUI exactly) - return sendMessageDirect( + return sendMessage( messages: messages, model: model, conversationId: conversationId, @@ -3535,7 +4013,7 @@ class ApiService { // Enhanced streaming method with Socket.IO preference ({Stream stream, String messageId, String sessionId}) - sendMessageWithEnhancedStreaming({ + sendMessageWithEnhancedStreamingLegacy({ required List> messages, required String model, String? conversationId, @@ -3552,7 +4030,7 @@ class ApiService { if (preferSocketIO) { try { debugPrint('DEBUG: Attempting Socket.IO streaming...'); - return sendMessageWithSocketIO( + return sendMessageWithSocketIOLegacy( messages: messages, model: model, conversationId: conversationId, @@ -3570,7 +4048,7 @@ class ApiService { // Use SSE streaming as fallback debugPrint('DEBUG: Using SSE streaming as fallback'); - return sendMessageDirect( + return sendMessage( messages: messages, model: model, conversationId: conversationId, diff --git a/lib/core/services/sse_parser.dart b/lib/core/services/sse_parser.dart new file mode 100644 index 0000000..ade0152 --- /dev/null +++ b/lib/core/services/sse_parser.dart @@ -0,0 +1,179 @@ +import 'dart:async'; +import 'dart:convert'; + +/// Event data from Server-Sent Events stream +class SSEEvent { + final String? id; + final String? event; + final String data; + final int? retry; + + SSEEvent({ + this.id, + this.event, + required this.data, + this.retry, + }); +} + +/// Parser for Server-Sent Events +class SSEParser { + final _controller = StreamController.broadcast(); + + String _buffer = ''; + String? _currentId; + String? _currentEvent; + String _currentData = ''; + int? _currentRetry; + + Stream get stream => _controller.stream; + + /// Feed raw text data to the parser + void feed(String chunk) { + _buffer += chunk; + _processBuffer(); + } + + /// Process buffered data and emit events + void _processBuffer() { + // Split by newlines but keep the last incomplete line + final lines = _buffer.split('\n'); + + // Keep the last line in buffer if it doesn't end with newline + if (!_buffer.endsWith('\n')) { + _buffer = lines.removeLast(); + } else { + _buffer = ''; + } + + for (final line in lines) { + _processLine(line); + } + } + + /// Process a single line according to SSE spec + void _processLine(String line) { + // Empty line signals end of event + if (line.trim().isEmpty) { + if (_currentData.isNotEmpty) { + _emitEvent(); + } + _resetCurrentEvent(); + return; + } + + // Comment line (starts with :) + // OpenRouter sends ": OPENROUTER PROCESSING" messages + if (line.startsWith(':')) { + // Log but ignore comments + if (line.contains('OPENROUTER')) { + // OpenRouter processing indicator - ignore silently + } + return; // Ignore comments + } + + // Parse field and value + final colonIndex = line.indexOf(':'); + String field; + String value; + + if (colonIndex == -1) { + field = line; + value = ''; + } else { + field = line.substring(0, colonIndex); + value = line.substring(colonIndex + 1); + // Remove leading space from value if present + if (value.startsWith(' ')) { + value = value.substring(1); + } + } + + // Process field according to SSE spec + switch (field) { + case 'data': + if (_currentData.isNotEmpty) { + _currentData += '\n'; + } + _currentData += value; + break; + + case 'event': + _currentEvent = value; + break; + + case 'id': + _currentId = value; + break; + + case 'retry': + final retryValue = int.tryParse(value); + if (retryValue != null) { + _currentRetry = retryValue; + } + break; + + default: + // Ignore unknown fields + break; + } + } + + /// Emit the current event + void _emitEvent() { + _controller.add(SSEEvent( + id: _currentId, + event: _currentEvent, + data: _currentData, + retry: _currentRetry, + )); + } + + /// Reset current event state + void _resetCurrentEvent() { + _currentEvent = null; + _currentData = ''; + // Note: id and retry are not reset per SSE spec + } + + /// Close the parser + void close() { + // Emit any remaining data + if (_currentData.isNotEmpty) { + _emitEvent(); + } + _controller.close(); + } + + /// Parse SSE events from a stream of bytes + static Stream parseStream(Stream> byteStream) { + final parser = SSEParser(); + + // Convert bytes to text and feed to parser + byteStream + .transform(utf8.decoder) + .listen( + (chunk) => parser.feed(chunk), + onDone: () => parser.close(), + onError: (error) => parser._controller.addError(error), + ); + + return parser.stream; + } +} + +/// Transform a text stream into SSE events +class SSETransformer extends StreamTransformerBase { + @override + Stream bind(Stream stream) { + final parser = SSEParser(); + + stream.listen( + (chunk) => parser.feed(chunk), + onDone: () => parser.close(), + onError: (error) => parser._controller.addError(error), + ); + + return parser.stream; + } +} \ No newline at end of file diff --git a/lib/core/validation/validation_interceptor.dart b/lib/core/validation/validation_interceptor.dart index 7654dd0..fb621db 100644 --- a/lib/core/validation/validation_interceptor.dart +++ b/lib/core/validation/validation_interceptor.dart @@ -43,11 +43,12 @@ class ValidationInterceptor extends Interceptor { } // Transform data if validation succeeded - if (result.isValid && options.data is Map) { - options.data = _validator.transformForApi( - options.data as Map, - ); - } + // Temporarily disabled to preserve background_tasks and session_id parameters + // if (result.isValid && options.data is Map) { + // options.data = _validator.transformForApi( + // options.data as Map, + // ); + // } } catch (e) { if (e is ValidationException) { handler.reject( diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index 4d4eafe..19cb438 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -702,8 +702,8 @@ Future _sendMessageInternal( } } - // Stream response using chat completions endpoint directly - final response = await api.sendMessageWithStreaming( + // Stream response using SSE + final response = await api.sendMessage( messages: conversationMessages, model: selectedModel.id, conversationId: activeConversation?.id, @@ -837,6 +837,12 @@ Future _sendMessageInternal( messages.length <= 2 && updatedConv.title != 'New Chat' && updatedConv.title.isNotEmpty; + + // If title is still "New Chat" and this is the first exchange, trigger title generation + if (messages.length <= 2 && updatedConv.title == 'New Chat') { + debugPrint('DEBUG: Triggering title generation for conversation ${activeConversation.id}'); + _triggerTitleGeneration(ref, activeConversation.id, formattedMessages, selectedModel.id); + } // Always combine current local messages with updated server content final currentMessages = ref.read(chatMessagesProvider); @@ -1175,6 +1181,68 @@ Please try sending the message again, or try without attachments.''', } } +// Trigger title generation using the dedicated endpoint +Future _triggerTitleGeneration( + dynamic ref, + String conversationId, + List> messages, + String model, +) async { + try { + final api = ref.read(apiServiceProvider); + if (api == null) return; + + debugPrint('DEBUG: Requesting title generation for conversation $conversationId'); + + // Call the title generation endpoint + final generatedTitle = await api.generateTitle( + conversationId: conversationId, + messages: messages, + model: model, + ); + + if (generatedTitle != null && generatedTitle.isNotEmpty && generatedTitle != 'New Chat') { + debugPrint('DEBUG: Title generated successfully: $generatedTitle'); + + // Update the active conversation with the new title + final activeConversation = ref.read(activeConversationProvider); + if (activeConversation?.id == conversationId) { + final updated = activeConversation!.copyWith( + title: generatedTitle, + updatedAt: DateTime.now(), + ); + ref.read(activeConversationProvider.notifier).state = updated; + + // Save the updated title to the server + try { + debugPrint('DEBUG: Saving generated title to server: $generatedTitle'); + final currentMessages = ref.read(chatMessagesProvider); + await api.updateConversationWithMessages( + conversationId, + currentMessages, + title: generatedTitle, + model: model, + ); + debugPrint('DEBUG: Title saved to server successfully'); + } catch (e) { + debugPrint('DEBUG: Failed to save title to server: $e'); + } + + // Refresh the conversations list + ref.invalidate(conversationsProvider); + } + } else { + debugPrint('DEBUG: Title generation did not return a valid title'); + // Fall back to background checking + _checkForTitleInBackground(ref, conversationId); + } + } catch (e) { + debugPrint('DEBUG: Title generation failed: $e'); + // Fall back to background checking + _checkForTitleInBackground(ref, conversationId); + } +} + // Background function to check for title updates without blocking UI Future _checkForTitleInBackground(dynamic ref, String conversationId) async { try {