chore: remove unused legacy methods

This commit is contained in:
cogwheel0
2025-08-19 13:33:31 +05:30
parent 8c6a4dd2a2
commit dbe66ece8c
3 changed files with 7 additions and 657 deletions

View File

@@ -4,8 +4,7 @@ import 'dart:io';
import 'package:flutter/foundation.dart';
import 'package:dio/dio.dart';
import 'package:http_parser/http_parser.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:socket_io_client/socket_io_client.dart' as io;
// Removed legacy websocket/socket.io imports
import 'package:uuid/uuid.dart';
import '../models/server_config.dart';
import '../models/user.dart';
@@ -23,8 +22,7 @@ class ApiService {
final Dio _dio;
final ServerConfig serverConfig;
late final ApiAuthInterceptor _authInterceptor;
WebSocketChannel? _wsChannel;
io.Socket? _socket;
// Removed legacy websocket/socket.io fields
// Callback to notify when auth token becomes invalid
void Function()? onAuthTokenInvalid;
@@ -2378,18 +2376,6 @@ class ApiService {
}
// Chat streaming with conversation context
String _getCurrentWeekday() {
final weekdays = [
'Monday',
'Tuesday',
'Wednesday',
'Thursday',
'Friday',
'Saturday',
'Sunday',
];
return weekdays[DateTime.now().weekday - 1];
}
// Send message with SSE streaming
// Returns a record with (stream, messageId, sessionId)
@@ -2942,545 +2928,7 @@ class ApiService {
}
}
// Initialize Socket.IO connection
Future<void> _initializeSocket() async {
if (_socket != null && _socket!.connected) {
return; // Already connected
}
try {
debugPrint(
'DEBUG: Initializing Socket.IO connection to ${serverConfig.url}',
);
_socket = io.io(
serverConfig.url,
io.OptionBuilder()
.setTransports(['websocket', 'polling'])
.enableReconnection()
.setReconnectionDelay(1000)
.setReconnectionDelayMax(5000)
.setPath('/ws/socket.io')
.setAuth({'token': _authInterceptor.authToken})
.build(),
);
_socket!.onConnect((_) {
debugPrint('DEBUG: Socket.IO connected with ID: ${_socket!.id}');
// Emit user-join event with auth token
_socket!.emit('user-join', {
'auth': {'token': _authInterceptor.authToken},
});
});
_socket!.onDisconnect((_) {
debugPrint('DEBUG: Socket.IO disconnected');
});
_socket!.onError((error) {
debugPrint('DEBUG: Socket.IO error: $error');
});
_socket!.onReconnect((_) {
debugPrint('DEBUG: Socket.IO reconnected');
});
} catch (e) {
debugPrint('DEBUG: Failed to initialize Socket.IO: $e');
}
}
// Socket.IO streaming method that listens to real-time events
({Stream<String> stream, String messageId, String sessionId})
sendMessageWithSocketIOLegacy({
required List<Map<String, dynamic>> messages,
required String model,
String? conversationId,
List<Map<String, dynamic>>? tools,
bool enableWebSearch = false,
Map<String, dynamic>? modelItem,
}) {
final streamController = StreamController<String>();
// Generate unique IDs
final messageId = const Uuid().v4();
final sessionId = const Uuid().v4().substring(0, 20);
debugPrint('DEBUG: Starting Socket.IO streaming for message: $messageId');
// Initialize socket connection
_initializeSocket()
.then((_) {
_handleSocketIOStreamingResponse(messageId, streamController);
// Send the chat completion request via API
// This will trigger the server to emit Socket.IO events
_sendChatCompletionForSocketIO(
messages: messages,
model: model,
conversationId: conversationId,
messageId: messageId,
tools: tools,
enableWebSearch: enableWebSearch,
modelItem: modelItem,
);
})
.catchError((error) {
debugPrint('DEBUG: Socket.IO initialization failed: $error');
streamController.addError('Failed to initialize Socket.IO: $error');
});
return (
stream: streamController.stream,
messageId: messageId,
sessionId: sessionId,
);
}
// Handle Socket.IO real-time streaming events
void _handleSocketIOStreamingResponse(
String messageId,
StreamController<String> streamController,
) async {
// Check if socket is available
if (_socket == null || !_socket!.connected) {
debugPrint(
'DEBUG: Socket not available for real-time streaming, falling back to polling',
);
streamController.addError('Socket.IO not connected');
streamController.close();
return;
}
debugPrint(
'DEBUG: Setting up Socket.IO real-time streaming for message: $messageId',
);
bool streamCompleted = false;
Timer? timeoutTimer;
// Set up timeout to prevent hanging
timeoutTimer = Timer(const Duration(seconds: 30), () {
if (!streamCompleted) {
debugPrint(
'DEBUG: Socket.IO streaming timeout for message: $messageId',
);
streamCompleted = true;
streamController.addError('Streaming timeout');
streamController.close();
}
});
// Set up listener for chat-events from the server (OpenWebUI pattern)
void handleChatEvent(dynamic data) {
try {
if (streamCompleted) return;
debugPrint('DEBUG: Received Socket.IO chat event: $data');
final Map<String, dynamic> eventData = data is Map<String, dynamic>
? data
: (data as Map).cast<String, dynamic>();
final chatId = eventData['chat_id']?.toString();
final eventMessageId = eventData['message_id']?.toString();
final eventDetails = eventData['data'] as Map<String, dynamic>? ?? {};
final eventType = eventDetails['type']?.toString();
final eventDataContent =
eventDetails['data'] as Map<String, dynamic>? ?? {};
debugPrint(
'DEBUG: Event type: $eventType, chat_id: $chatId, message_id: $eventMessageId',
);
// Only process events for our message
if (eventMessageId != messageId && eventMessageId != null) {
return;
}
switch (eventType) {
case 'message':
// Incremental content streaming - add the new chunk
final content = eventDataContent['content']?.toString() ?? '';
if (content.isNotEmpty) {
debugPrint('DEBUG: Adding Socket.IO content chunk: "$content"');
streamController.add(content);
}
break;
case 'replace':
// Full content replacement - replace entire content
final content = eventDataContent['content']?.toString() ?? '';
debugPrint('DEBUG: Replacing Socket.IO content: "$content"');
streamController.add('__REPLACE_CONTENT__$content');
break;
case 'status':
// Status update (like "generating", "thinking", etc.)
final status = eventDataContent['status']?.toString() ?? '';
if (status.isNotEmpty) {
debugPrint('DEBUG: Socket.IO Status update: $status');
// Optionally emit status as a special event
streamController.add('__STATUS__$status');
}
break;
case 'error':
// Error occurred during generation
final error =
eventDataContent['error']?.toString() ?? 'Unknown error';
debugPrint('DEBUG: Socket.IO streaming error: $error');
streamCompleted = true;
timeoutTimer?.cancel();
_socket?.off('chat-events', handleChatEvent);
streamController.addError(error);
streamController.close();
break;
case 'done':
// Streaming completed successfully
debugPrint(
'DEBUG: Socket.IO streaming completed for message: $messageId',
);
streamCompleted = true;
timeoutTimer?.cancel();
_socket?.off('chat-events', handleChatEvent);
streamController.close();
break;
default:
debugPrint('DEBUG: Unknown Socket.IO event type: $eventType');
break;
}
} catch (e, stackTrace) {
debugPrint('DEBUG: Error handling Socket.IO event: $e');
debugPrint('DEBUG: Stack trace: $stackTrace');
if (!streamCompleted) {
streamCompleted = true;
timeoutTimer?.cancel();
_socket?.off('chat-events', handleChatEvent);
streamController.addError('Error processing streaming event: $e');
streamController.close();
}
}
}
// Listen for chat-events
_socket!.on('chat-events', handleChatEvent);
// Clean up when stream is closed
streamController.onCancel = () {
debugPrint(
'DEBUG: Cleaning up Socket.IO listeners for message: $messageId',
);
streamCompleted = true;
timeoutTimer?.cancel();
_socket?.off('chat-events', handleChatEvent);
};
}
// Send chat completion request that will trigger Socket.IO events
Future<void> _sendChatCompletionForSocketIO({
required List<Map<String, dynamic>> messages,
required String model,
String? conversationId,
required String messageId,
List<Map<String, dynamic>>? tools,
bool enableWebSearch = false,
Map<String, dynamic>? modelItem,
}) async {
try {
// Process messages same as SSE version
final processedMessages = messages.map((message) {
final role = message['role'] as String;
final content = message['content'];
final files = message['files'] as List<Map<String, dynamic>>?;
final isContentArray = content is List;
final hasImages =
files?.any((file) => file['type'] == 'image') ?? false;
if (isContentArray) {
return {'role': role, 'content': content};
} else if (hasImages && role == 'user') {
final imageFiles = files!
.where((file) => file['type'] == 'image')
.toList();
final contentText = content is String ? content : '';
final contentArray = <Map<String, dynamic>>[
{'type': 'text', 'text': contentText},
];
for (final file in imageFiles) {
contentArray.add({
'type': 'image_url',
'image_url': {'url': file['url']},
});
}
return {'role': role, 'content': contentArray};
} else {
final contentText = content is String ? content : '';
return {'role': role, 'content': contentText};
}
}).toList();
// Separate files from messages
final allFiles = <Map<String, dynamic>>[];
for (final message in messages) {
final files = message['files'] as List<Map<String, dynamic>>?;
if (files != null) {
final nonImageFiles = files
.where((file) => file['type'] != 'image')
.toList();
allFiles.addAll(nonImageFiles);
}
}
// Create request data
final data = {
'model': model,
'messages': processedMessages,
'stream': true, // Enable streaming
'message_id': messageId, // Include message ID for Socket.IO events
if (conversationId != null) 'chat_id': conversationId,
if (tools != null && tools.isNotEmpty) 'tools': tools,
if (allFiles.isNotEmpty) 'files': allFiles,
if (enableWebSearch) 'web_search': enableWebSearch,
'session_id': _socket?.id, // Include Socket.IO session ID
};
debugPrint('DEBUG: Sending Socket.IO-enabled chat completion request');
debugPrint('DEBUG: Message ID: $messageId');
debugPrint('DEBUG: Socket ID: ${_socket?.id}');
// Send the request - server should emit Socket.IO events in response
await _dio.post('/api/chat/completions', data: data);
} catch (e) {
debugPrint('DEBUG: Error sending Socket.IO chat completion request: $e');
rethrow;
}
}
// Enhanced SSE streaming method that matches OpenWebUI implementation
({Stream<String> stream, String messageId, String sessionId})
sendMessageWithImprovedSSELegacy({
required List<Map<String, dynamic>> messages,
required String model,
String? conversationId,
List<Map<String, dynamic>>? tools,
bool enableWebSearch = false,
Map<String, dynamic>? modelItem,
}) {
final streamController = StreamController<String>();
// Generate a unique message ID and session ID for the request
final messageId = const Uuid().v4();
final sessionId = const Uuid().v4().substring(0, 20); // Match WebUI format
// Check if this is a Gemini model that requires special handling
final isGeminiModel = model.toLowerCase().contains('gemini');
debugPrint('DEBUG: Is Gemini model in API: $isGeminiModel');
debugPrint('DEBUG: Model ID in API: $model');
// Process messages to match OpenWebUI format
final processedMessages = messages.map((message) {
final role = message['role'] as String;
final content = message['content'];
final files = message['files'] as List<Map<String, dynamic>>?;
// Check if content is already a List (content array format)
final isContentArray = content is List;
// Check if this message has image files
final hasImages = files?.any((file) => file['type'] == 'image') ?? false;
if (isContentArray) {
// Content is already in the correct array format
return {'role': role, 'content': content};
} else if (hasImages && role == 'user') {
// For user messages with images, use OpenWebUI's content array format
final imageFiles = files!
.where((file) => file['type'] == 'image')
.toList();
final contentText = content is String ? content : '';
final contentArray = <Map<String, dynamic>>[
{'type': 'text', 'text': contentText},
];
for (final file in imageFiles) {
contentArray.add({
'type': 'image_url',
'image_url': {'url': file['url']},
});
}
return {'role': role, 'content': contentArray};
} else {
// For messages without images or non-user messages, use regular format
final contentText = content is String ? content : '';
return {'role': role, 'content': contentText};
}
}).toList();
// Separate files from messages (OpenWebUI format)
final allFiles = <Map<String, dynamic>>[];
for (final message in messages) {
final files = message['files'] as List<Map<String, dynamic>>?;
if (files != null) {
// Only include non-image files in the files array
final nonImageFiles = files
.where((file) => file['type'] != 'image')
.toList();
allFiles.addAll(nonImageFiles);
}
}
// Prepare the request in OpenWebUI format
final data = {
'stream': true,
'model': model,
'messages': processedMessages,
'params': {
'temperature': 0.7,
'top_p': 1.0,
'max_tokens': 4096,
'stream_response': true,
},
'files': allFiles.isNotEmpty ? allFiles : null,
'tool_servers': [],
'features': {
'image_generation': false,
'code_interpreter': false,
'web_search': enableWebSearch,
'memory': false,
},
'variables': {
'{{USER_NAME}}': 'User',
'{{USER_LOCATION}}': 'Unknown',
'{{CURRENT_DATETIME}}': DateTime.now().toString().substring(0, 19),
'{{CURRENT_DATE}}': DateTime.now().toString().substring(0, 10),
'{{CURRENT_TIME}}': DateTime.now().toString().substring(11, 19),
'{{CURRENT_WEEKDAY}}': _getCurrentWeekday(),
'{{CURRENT_TIMEZONE}}': DateTime.now().timeZoneName,
'{{USER_LANGUAGE}}': 'en-US',
},
if (conversationId != null) 'chat_id': conversationId,
if (modelItem != null) 'model_item': modelItem,
'background_tasks': {
'TITLE_GENERATION': true,
'TAGS_GENERATION': true,
'FOLLOW_UP_GENERATION': true,
},
'session_id': sessionId,
'id': messageId,
};
debugPrint('DEBUG: Sending chat completion request:');
debugPrint('DEBUG: Model: $model');
debugPrint('DEBUG: Messages count: ${processedMessages.length}');
debugPrint('DEBUG: Files count: ${allFiles.length}');
debugPrint('DEBUG: Web search enabled: $enableWebSearch');
// Use Server-Sent Events for streaming
const url = '/api/chat/completions';
_dio
.post(
url,
data: data,
options: Options(
responseType: ResponseType.stream,
headers: {'Accept': 'text/event-stream'},
// Increase timeout for streaming responses
receiveTimeout: const Duration(minutes: 5),
),
)
.then((response) {
final stream = response.data.stream;
stream.listen(
(data) {
final decodedData = utf8.decode(data);
debugPrint('DEBUG: SSE Raw data: $decodedData');
final lines = decodedData.split('\n');
for (final line in lines) {
if (line.startsWith('data: ')) {
final jsonStr = line.substring(6);
debugPrint('DEBUG: SSE JSON: $jsonStr');
if (jsonStr == '[DONE]') {
debugPrint('DEBUG: Stream finished with [DONE]');
streamController.close();
return;
}
try {
final json = jsonDecode(jsonStr);
if (json is Map<String, dynamic>) {
final choices = json['choices'];
if (choices is List && choices.isNotEmpty) {
final delta = choices[0]['delta'];
if (delta is Map<String, dynamic>) {
// Handle regular content
final content = delta['content'];
if (content is String && content.isNotEmpty) {
debugPrint(
'DEBUG: Adding content chunk: "$content"',
);
streamController.add(content);
}
// Handle function calls
final toolCalls = delta['tool_calls'];
if (toolCalls is List && toolCalls.isNotEmpty) {
for (final toolCall in toolCalls) {
if (toolCall is Map<String, dynamic>) {
final function = toolCall['function'];
if (function is Map<String, dynamic>) {
final name = function['name'];
final arguments = function['arguments'];
debugPrint(
'DEBUG: Function call - Name: $name, Arguments: $arguments',
);
}
}
}
}
}
}
}
} catch (e) {
debugPrint('DEBUG: Error parsing SSE data: $e');
}
}
}
},
onError: (error) {
debugPrint('DEBUG: Stream error: $error');
debugPrint('DEBUG: Stream error type: ${error.runtimeType}');
streamController.addError(error);
},
onDone: () {
debugPrint('DEBUG: Stream completed');
streamController.close();
},
);
})
.catchError((error) {
debugPrint('DEBUG: Request error: $error');
streamController.addError(error);
});
return (
stream: streamController.stream,
messageId: messageId,
sessionId: sessionId,
);
}
// Legacy Socket.IO and older SSE methods removed
// File upload for RAG
Future<String> uploadFile(String filePath, String fileName) async {
@@ -3584,10 +3032,7 @@ class ApiService {
debugPrint('=== END API DOCS CHECK ===');
}
void dispose() {
_wsChannel?.sink.close();
_wsChannel = null;
}
// dispose() removed no legacy websocket resources to clean up
// Helper method to get current weekday name
// ==================== ADVANCED CHAT FEATURES ====================
@@ -3934,74 +3379,5 @@ class ApiService {
// ==================== END ADVANCED CHAT FEATURES ====================
// Enhanced streaming method that uses improved SSE (like OpenWebUI) and Socket.IO fallback
({Stream<String> stream, String messageId, String sessionId})
sendMessageWithStreamingLegacy({
required List<Map<String, dynamic>> messages,
required String model,
String? conversationId,
List<Map<String, dynamic>>? tools,
bool enableWebSearch = false,
Map<String, dynamic>? modelItem,
bool preferSocketIO = false, // Changed default to false - SSE is primary
}) {
debugPrint('DEBUG: Starting streaming with SSE as primary method');
// Use improved SSE streaming as primary method (matches OpenWebUI exactly)
return sendMessage(
messages: messages,
model: model,
conversationId: conversationId,
tools: tools,
enableWebSearch: enableWebSearch,
modelItem: modelItem,
);
}
// Enhanced streaming method with Socket.IO preference
({Stream<String> stream, String messageId, String sessionId})
sendMessageWithEnhancedStreamingLegacy({
required List<Map<String, dynamic>> messages,
required String model,
String? conversationId,
List<Map<String, dynamic>>? tools,
bool enableWebSearch = false,
Map<String, dynamic>? modelItem,
bool preferSocketIO = true,
}) {
debugPrint(
'DEBUG: Starting enhanced streaming with preferSocketIO: $preferSocketIO',
);
// Try Socket.IO first if preferred and available
if (preferSocketIO) {
try {
debugPrint('DEBUG: Attempting Socket.IO streaming...');
return sendMessageWithSocketIOLegacy(
messages: messages,
model: model,
conversationId: conversationId,
tools: tools,
enableWebSearch: enableWebSearch,
modelItem: modelItem,
);
} catch (e) {
debugPrint(
'DEBUG: Socket.IO streaming failed, falling back to SSE: $e',
);
// Fall through to SSE
}
}
// Use SSE streaming as fallback
debugPrint('DEBUG: Using SSE streaming as fallback');
return sendMessage(
messages: messages,
model: model,
conversationId: conversationId,
tools: tools,
enableWebSearch: enableWebSearch,
modelItem: modelItem,
);
}
// Legacy streaming wrapper methods removed
}