diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index 566fce8..5ca0672 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -4,8 +4,7 @@ import 'dart:io'; import 'package:flutter/foundation.dart'; import 'package:dio/dio.dart'; import 'package:http_parser/http_parser.dart'; -import 'package:web_socket_channel/web_socket_channel.dart'; -import 'package:socket_io_client/socket_io_client.dart' as io; +// Removed legacy websocket/socket.io imports import 'package:uuid/uuid.dart'; import '../models/server_config.dart'; import '../models/user.dart'; @@ -23,8 +22,7 @@ class ApiService { final Dio _dio; final ServerConfig serverConfig; late final ApiAuthInterceptor _authInterceptor; - WebSocketChannel? _wsChannel; - io.Socket? _socket; + // Removed legacy websocket/socket.io fields // Callback to notify when auth token becomes invalid void Function()? onAuthTokenInvalid; @@ -2378,18 +2376,6 @@ class ApiService { } // Chat streaming with conversation context - String _getCurrentWeekday() { - final weekdays = [ - 'Monday', - 'Tuesday', - 'Wednesday', - 'Thursday', - 'Friday', - 'Saturday', - 'Sunday', - ]; - return weekdays[DateTime.now().weekday - 1]; - } // Send message with SSE streaming // Returns a record with (stream, messageId, sessionId) @@ -2942,545 +2928,7 @@ class ApiService { } } - - - - - // Initialize Socket.IO connection - Future _initializeSocket() async { - if (_socket != null && _socket!.connected) { - return; // Already connected - } - - try { - debugPrint( - 'DEBUG: Initializing Socket.IO connection to ${serverConfig.url}', - ); - - _socket = io.io( - serverConfig.url, - io.OptionBuilder() - .setTransports(['websocket', 'polling']) - .enableReconnection() - .setReconnectionDelay(1000) - .setReconnectionDelayMax(5000) - .setPath('/ws/socket.io') - .setAuth({'token': _authInterceptor.authToken}) - .build(), - ); - - _socket!.onConnect((_) { - debugPrint('DEBUG: Socket.IO connected with ID: ${_socket!.id}'); - - // Emit user-join event with auth token - _socket!.emit('user-join', { - 'auth': {'token': _authInterceptor.authToken}, - }); - }); - - _socket!.onDisconnect((_) { - debugPrint('DEBUG: Socket.IO disconnected'); - }); - - _socket!.onError((error) { - debugPrint('DEBUG: Socket.IO error: $error'); - }); - - _socket!.onReconnect((_) { - debugPrint('DEBUG: Socket.IO reconnected'); - }); - } catch (e) { - debugPrint('DEBUG: Failed to initialize Socket.IO: $e'); - } - } - - // Socket.IO streaming method that listens to real-time events - ({Stream stream, String messageId, String sessionId}) - sendMessageWithSocketIOLegacy({ - required List> messages, - required String model, - String? conversationId, - List>? tools, - bool enableWebSearch = false, - Map? modelItem, - }) { - final streamController = StreamController(); - - // Generate unique IDs - final messageId = const Uuid().v4(); - final sessionId = const Uuid().v4().substring(0, 20); - - debugPrint('DEBUG: Starting Socket.IO streaming for message: $messageId'); - - // Initialize socket connection - _initializeSocket() - .then((_) { - _handleSocketIOStreamingResponse(messageId, streamController); - - // Send the chat completion request via API - // This will trigger the server to emit Socket.IO events - _sendChatCompletionForSocketIO( - messages: messages, - model: model, - conversationId: conversationId, - messageId: messageId, - tools: tools, - enableWebSearch: enableWebSearch, - modelItem: modelItem, - ); - }) - .catchError((error) { - debugPrint('DEBUG: Socket.IO initialization failed: $error'); - streamController.addError('Failed to initialize Socket.IO: $error'); - }); - - return ( - stream: streamController.stream, - messageId: messageId, - sessionId: sessionId, - ); - } - - // Handle Socket.IO real-time streaming events - void _handleSocketIOStreamingResponse( - String messageId, - StreamController streamController, - ) async { - // Check if socket is available - if (_socket == null || !_socket!.connected) { - debugPrint( - 'DEBUG: Socket not available for real-time streaming, falling back to polling', - ); - streamController.addError('Socket.IO not connected'); - streamController.close(); - return; - } - - debugPrint( - 'DEBUG: Setting up Socket.IO real-time streaming for message: $messageId', - ); - bool streamCompleted = false; - Timer? timeoutTimer; - - // Set up timeout to prevent hanging - timeoutTimer = Timer(const Duration(seconds: 30), () { - if (!streamCompleted) { - debugPrint( - 'DEBUG: Socket.IO streaming timeout for message: $messageId', - ); - streamCompleted = true; - streamController.addError('Streaming timeout'); - streamController.close(); - } - }); - - // Set up listener for chat-events from the server (OpenWebUI pattern) - void handleChatEvent(dynamic data) { - try { - if (streamCompleted) return; - - debugPrint('DEBUG: Received Socket.IO chat event: $data'); - - final Map eventData = data is Map - ? data - : (data as Map).cast(); - - final chatId = eventData['chat_id']?.toString(); - final eventMessageId = eventData['message_id']?.toString(); - final eventDetails = eventData['data'] as Map? ?? {}; - - final eventType = eventDetails['type']?.toString(); - final eventDataContent = - eventDetails['data'] as Map? ?? {}; - - debugPrint( - 'DEBUG: Event type: $eventType, chat_id: $chatId, message_id: $eventMessageId', - ); - - // Only process events for our message - if (eventMessageId != messageId && eventMessageId != null) { - return; - } - - switch (eventType) { - case 'message': - // Incremental content streaming - add the new chunk - final content = eventDataContent['content']?.toString() ?? ''; - if (content.isNotEmpty) { - debugPrint('DEBUG: Adding Socket.IO content chunk: "$content"'); - streamController.add(content); - } - break; - - case 'replace': - // Full content replacement - replace entire content - final content = eventDataContent['content']?.toString() ?? ''; - debugPrint('DEBUG: Replacing Socket.IO content: "$content"'); - streamController.add('__REPLACE_CONTENT__$content'); - break; - - case 'status': - // Status update (like "generating", "thinking", etc.) - final status = eventDataContent['status']?.toString() ?? ''; - if (status.isNotEmpty) { - debugPrint('DEBUG: Socket.IO Status update: $status'); - // Optionally emit status as a special event - streamController.add('__STATUS__$status'); - } - break; - - case 'error': - // Error occurred during generation - final error = - eventDataContent['error']?.toString() ?? 'Unknown error'; - debugPrint('DEBUG: Socket.IO streaming error: $error'); - streamCompleted = true; - timeoutTimer?.cancel(); - _socket?.off('chat-events', handleChatEvent); - streamController.addError(error); - streamController.close(); - break; - - case 'done': - // Streaming completed successfully - debugPrint( - 'DEBUG: Socket.IO streaming completed for message: $messageId', - ); - streamCompleted = true; - timeoutTimer?.cancel(); - _socket?.off('chat-events', handleChatEvent); - streamController.close(); - break; - - default: - debugPrint('DEBUG: Unknown Socket.IO event type: $eventType'); - break; - } - } catch (e, stackTrace) { - debugPrint('DEBUG: Error handling Socket.IO event: $e'); - debugPrint('DEBUG: Stack trace: $stackTrace'); - if (!streamCompleted) { - streamCompleted = true; - timeoutTimer?.cancel(); - _socket?.off('chat-events', handleChatEvent); - streamController.addError('Error processing streaming event: $e'); - streamController.close(); - } - } - } - - // Listen for chat-events - _socket!.on('chat-events', handleChatEvent); - - // Clean up when stream is closed - streamController.onCancel = () { - debugPrint( - 'DEBUG: Cleaning up Socket.IO listeners for message: $messageId', - ); - streamCompleted = true; - timeoutTimer?.cancel(); - _socket?.off('chat-events', handleChatEvent); - }; - } - - // Send chat completion request that will trigger Socket.IO events - Future _sendChatCompletionForSocketIO({ - required List> messages, - required String model, - String? conversationId, - required String messageId, - List>? tools, - bool enableWebSearch = false, - Map? modelItem, - }) async { - try { - // Process messages same as SSE version - final processedMessages = messages.map((message) { - final role = message['role'] as String; - final content = message['content']; - final files = message['files'] as List>?; - - final isContentArray = content is List; - final hasImages = - files?.any((file) => file['type'] == 'image') ?? false; - - if (isContentArray) { - return {'role': role, 'content': content}; - } else if (hasImages && role == 'user') { - final imageFiles = files! - .where((file) => file['type'] == 'image') - .toList(); - final contentText = content is String ? content : ''; - final contentArray = >[ - {'type': 'text', 'text': contentText}, - ]; - - for (final file in imageFiles) { - contentArray.add({ - 'type': 'image_url', - 'image_url': {'url': file['url']}, - }); - } - - return {'role': role, 'content': contentArray}; - } else { - final contentText = content is String ? content : ''; - return {'role': role, 'content': contentText}; - } - }).toList(); - - // Separate files from messages - final allFiles = >[]; - for (final message in messages) { - final files = message['files'] as List>?; - if (files != null) { - final nonImageFiles = files - .where((file) => file['type'] != 'image') - .toList(); - allFiles.addAll(nonImageFiles); - } - } - - // Create request data - final data = { - 'model': model, - 'messages': processedMessages, - 'stream': true, // Enable streaming - 'message_id': messageId, // Include message ID for Socket.IO events - if (conversationId != null) 'chat_id': conversationId, - if (tools != null && tools.isNotEmpty) 'tools': tools, - if (allFiles.isNotEmpty) 'files': allFiles, - if (enableWebSearch) 'web_search': enableWebSearch, - 'session_id': _socket?.id, // Include Socket.IO session ID - }; - - debugPrint('DEBUG: Sending Socket.IO-enabled chat completion request'); - debugPrint('DEBUG: Message ID: $messageId'); - debugPrint('DEBUG: Socket ID: ${_socket?.id}'); - - // Send the request - server should emit Socket.IO events in response - await _dio.post('/api/chat/completions', data: data); - } catch (e) { - debugPrint('DEBUG: Error sending Socket.IO chat completion request: $e'); - rethrow; - } - } - - // Enhanced SSE streaming method that matches OpenWebUI implementation - ({Stream stream, String messageId, String sessionId}) - sendMessageWithImprovedSSELegacy({ - required List> messages, - required String model, - String? conversationId, - List>? tools, - bool enableWebSearch = false, - Map? modelItem, - }) { - final streamController = StreamController(); - - // Generate a unique message ID and session ID for the request - final messageId = const Uuid().v4(); - final sessionId = const Uuid().v4().substring(0, 20); // Match WebUI format - - // Check if this is a Gemini model that requires special handling - final isGeminiModel = model.toLowerCase().contains('gemini'); - debugPrint('DEBUG: Is Gemini model in API: $isGeminiModel'); - debugPrint('DEBUG: Model ID in API: $model'); - - // Process messages to match OpenWebUI format - final processedMessages = messages.map((message) { - final role = message['role'] as String; - final content = message['content']; - final files = message['files'] as List>?; - - // Check if content is already a List (content array format) - final isContentArray = content is List; - - // Check if this message has image files - final hasImages = files?.any((file) => file['type'] == 'image') ?? false; - - if (isContentArray) { - // Content is already in the correct array format - return {'role': role, 'content': content}; - } else if (hasImages && role == 'user') { - // For user messages with images, use OpenWebUI's content array format - final imageFiles = files! - .where((file) => file['type'] == 'image') - .toList(); - - final contentText = content is String ? content : ''; - final contentArray = >[ - {'type': 'text', 'text': contentText}, - ]; - - for (final file in imageFiles) { - contentArray.add({ - 'type': 'image_url', - 'image_url': {'url': file['url']}, - }); - } - - return {'role': role, 'content': contentArray}; - } else { - // For messages without images or non-user messages, use regular format - final contentText = content is String ? content : ''; - return {'role': role, 'content': contentText}; - } - }).toList(); - - // Separate files from messages (OpenWebUI format) - final allFiles = >[]; - for (final message in messages) { - final files = message['files'] as List>?; - if (files != null) { - // Only include non-image files in the files array - final nonImageFiles = files - .where((file) => file['type'] != 'image') - .toList(); - allFiles.addAll(nonImageFiles); - } - } - - // Prepare the request in OpenWebUI format - final data = { - 'stream': true, - 'model': model, - 'messages': processedMessages, - 'params': { - 'temperature': 0.7, - 'top_p': 1.0, - 'max_tokens': 4096, - 'stream_response': true, - }, - 'files': allFiles.isNotEmpty ? allFiles : null, - 'tool_servers': [], - 'features': { - 'image_generation': false, - 'code_interpreter': false, - 'web_search': enableWebSearch, - 'memory': false, - }, - 'variables': { - '{{USER_NAME}}': 'User', - '{{USER_LOCATION}}': 'Unknown', - '{{CURRENT_DATETIME}}': DateTime.now().toString().substring(0, 19), - '{{CURRENT_DATE}}': DateTime.now().toString().substring(0, 10), - '{{CURRENT_TIME}}': DateTime.now().toString().substring(11, 19), - '{{CURRENT_WEEKDAY}}': _getCurrentWeekday(), - '{{CURRENT_TIMEZONE}}': DateTime.now().timeZoneName, - '{{USER_LANGUAGE}}': 'en-US', - }, - if (conversationId != null) 'chat_id': conversationId, - if (modelItem != null) 'model_item': modelItem, - 'background_tasks': { - 'TITLE_GENERATION': true, - 'TAGS_GENERATION': true, - 'FOLLOW_UP_GENERATION': true, - }, - 'session_id': sessionId, - 'id': messageId, - }; - - debugPrint('DEBUG: Sending chat completion request:'); - debugPrint('DEBUG: Model: $model'); - debugPrint('DEBUG: Messages count: ${processedMessages.length}'); - debugPrint('DEBUG: Files count: ${allFiles.length}'); - debugPrint('DEBUG: Web search enabled: $enableWebSearch'); - - // Use Server-Sent Events for streaming - const url = '/api/chat/completions'; - - _dio - .post( - url, - data: data, - options: Options( - responseType: ResponseType.stream, - headers: {'Accept': 'text/event-stream'}, - // Increase timeout for streaming responses - receiveTimeout: const Duration(minutes: 5), - ), - ) - .then((response) { - final stream = response.data.stream; - - stream.listen( - (data) { - final decodedData = utf8.decode(data); - debugPrint('DEBUG: SSE Raw data: $decodedData'); - final lines = decodedData.split('\n'); - for (final line in lines) { - if (line.startsWith('data: ')) { - final jsonStr = line.substring(6); - debugPrint('DEBUG: SSE JSON: $jsonStr'); - if (jsonStr == '[DONE]') { - debugPrint('DEBUG: Stream finished with [DONE]'); - streamController.close(); - return; - } - try { - final json = jsonDecode(jsonStr); - if (json is Map) { - final choices = json['choices']; - if (choices is List && choices.isNotEmpty) { - final delta = choices[0]['delta']; - if (delta is Map) { - // Handle regular content - final content = delta['content']; - if (content is String && content.isNotEmpty) { - debugPrint( - 'DEBUG: Adding content chunk: "$content"', - ); - streamController.add(content); - } - - // Handle function calls - final toolCalls = delta['tool_calls']; - if (toolCalls is List && toolCalls.isNotEmpty) { - for (final toolCall in toolCalls) { - if (toolCall is Map) { - final function = toolCall['function']; - if (function is Map) { - final name = function['name']; - final arguments = function['arguments']; - debugPrint( - 'DEBUG: Function call - Name: $name, Arguments: $arguments', - ); - } - } - } - } - } - } - } - } catch (e) { - debugPrint('DEBUG: Error parsing SSE data: $e'); - } - } - } - }, - onError: (error) { - debugPrint('DEBUG: Stream error: $error'); - debugPrint('DEBUG: Stream error type: ${error.runtimeType}'); - streamController.addError(error); - }, - onDone: () { - debugPrint('DEBUG: Stream completed'); - streamController.close(); - }, - ); - }) - .catchError((error) { - debugPrint('DEBUG: Request error: $error'); - streamController.addError(error); - }); - - return ( - stream: streamController.stream, - messageId: messageId, - sessionId: sessionId, - ); - } + // Legacy Socket.IO and older SSE methods removed // File upload for RAG Future uploadFile(String filePath, String fileName) async { @@ -3584,10 +3032,7 @@ class ApiService { debugPrint('=== END API DOCS CHECK ==='); } - void dispose() { - _wsChannel?.sink.close(); - _wsChannel = null; - } + // dispose() removed – no legacy websocket resources to clean up // Helper method to get current weekday name // ==================== ADVANCED CHAT FEATURES ==================== @@ -3934,74 +3379,5 @@ class ApiService { // ==================== END ADVANCED CHAT FEATURES ==================== - // Enhanced streaming method that uses improved SSE (like OpenWebUI) and Socket.IO fallback - ({Stream stream, String messageId, String sessionId}) - sendMessageWithStreamingLegacy({ - required List> messages, - required String model, - String? conversationId, - List>? tools, - bool enableWebSearch = false, - Map? modelItem, - bool preferSocketIO = false, // Changed default to false - SSE is primary - }) { - debugPrint('DEBUG: Starting streaming with SSE as primary method'); - - // Use improved SSE streaming as primary method (matches OpenWebUI exactly) - return sendMessage( - messages: messages, - model: model, - conversationId: conversationId, - tools: tools, - enableWebSearch: enableWebSearch, - modelItem: modelItem, - ); - } - - // Enhanced streaming method with Socket.IO preference - ({Stream stream, String messageId, String sessionId}) - sendMessageWithEnhancedStreamingLegacy({ - required List> messages, - required String model, - String? conversationId, - List>? tools, - bool enableWebSearch = false, - Map? modelItem, - bool preferSocketIO = true, - }) { - debugPrint( - 'DEBUG: Starting enhanced streaming with preferSocketIO: $preferSocketIO', - ); - - // Try Socket.IO first if preferred and available - if (preferSocketIO) { - try { - debugPrint('DEBUG: Attempting Socket.IO streaming...'); - return sendMessageWithSocketIOLegacy( - messages: messages, - model: model, - conversationId: conversationId, - tools: tools, - enableWebSearch: enableWebSearch, - modelItem: modelItem, - ); - } catch (e) { - debugPrint( - 'DEBUG: Socket.IO streaming failed, falling back to SSE: $e', - ); - // Fall through to SSE - } - } - - // Use SSE streaming as fallback - debugPrint('DEBUG: Using SSE streaming as fallback'); - return sendMessage( - messages: messages, - model: model, - conversationId: conversationId, - tools: tools, - enableWebSearch: enableWebSearch, - modelItem: modelItem, - ); - } + // Legacy streaming wrapper methods removed } diff --git a/pubspec.lock b/pubspec.lock index eb1616c..9f96cf2 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -1053,22 +1053,6 @@ packages: description: flutter source: sdk version: "0.0.0" - socket_io_client: - dependency: "direct main" - description: - name: socket_io_client - sha256: c8471c2c6843cf308a5532ff653f2bcdb7fa9ae79d84d1179920578a06624f0d - url: "https://pub.dev" - source: hosted - version: "3.1.2" - socket_io_common: - dependency: transitive - description: - name: socket_io_common - sha256: "162fbaecbf4bf9a9372a62a341b3550b51dcef2f02f3e5830a297fd48203d45b" - url: "https://pub.dev" - source: hosted - version: "3.1.1" source_gen: dependency: transitive description: @@ -1141,14 +1125,6 @@ packages: url: "https://pub.dev" source: hosted version: "2.4.0" - sse: - dependency: "direct main" - description: - name: sse - sha256: fcc97470240bb37377f298e2bd816f09fd7216c07928641c0560719f50603643 - url: "https://pub.dev" - source: hosted - version: "4.1.8" stack_trace: dependency: transitive description: @@ -1398,7 +1374,7 @@ packages: source: hosted version: "1.0.1" web_socket_channel: - dependency: "direct main" + dependency: transitive description: name: web_socket_channel sha256: d645757fb0f4773d602444000a8131ff5d48c9e47adfe9772652dd1a4f2d45c8 diff --git a/pubspec.yaml b/pubspec.yaml index 91b6e6b..aaffac9 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -16,9 +16,7 @@ dependencies: # Network & API dio: ^5.5.0 http_parser: ^4.0.2 - web_socket_channel: ^3.0.1 - socket_io_client: ^3.0.4 - sse: ^4.1.2 + # Storage flutter_secure_storage: ^9.2.2