chore: formatting

This commit is contained in:
cogwheel0
2025-08-19 13:35:32 +05:30
parent dbe66ece8c
commit d3742944bc

View File

@@ -47,7 +47,7 @@ class ApiService {
) { ) {
// Use API key from server config if provided and no explicit auth token // Use API key from server config if provided and no explicit auth token
final effectiveAuthToken = authToken ?? serverConfig.apiKey; final effectiveAuthToken = authToken ?? serverConfig.apiKey;
// Initialize the consistent auth interceptor // Initialize the consistent auth interceptor
_authInterceptor = ApiAuthInterceptor( _authInterceptor = ApiAuthInterceptor(
authToken: effectiveAuthToken, authToken: effectiveAuthToken,
@@ -90,15 +90,19 @@ class ApiService {
debugPrint('Method: ${options.method}'); debugPrint('Method: ${options.method}');
debugPrint('Headers: ${options.headers}'); debugPrint('Headers: ${options.headers}');
debugPrint('Content-Type: ${options.contentType}'); debugPrint('Content-Type: ${options.contentType}');
// Log the raw data being sent // Log the raw data being sent
if (options.data != null) { if (options.data != null) {
if (options.data is Map) { if (options.data is Map) {
final dataMap = options.data as Map<String, dynamic>; final dataMap = options.data as Map<String, dynamic>;
debugPrint('Data type: Map'); debugPrint('Data type: Map');
debugPrint('Data keys: ${dataMap.keys.toList()}'); debugPrint('Data keys: ${dataMap.keys.toList()}');
debugPrint('Has background_tasks: ${dataMap.containsKey('background_tasks')}'); debugPrint(
debugPrint('Has session_id: ${dataMap.containsKey('session_id')}'); 'Has background_tasks: ${dataMap.containsKey('background_tasks')}',
);
debugPrint(
'Has session_id: ${dataMap.containsKey('session_id')}',
);
debugPrint('Has id: ${dataMap.containsKey('id')}'); debugPrint('Has id: ${dataMap.containsKey('id')}');
debugPrint('Full data: ${jsonEncode(dataMap)}'); debugPrint('Full data: ${jsonEncode(dataMap)}');
} else { } else {
@@ -112,7 +116,7 @@ class ApiService {
}, },
), ),
); );
// 5. Standard logging interceptor // 5. Standard logging interceptor
_dio.interceptors.add( _dio.interceptors.add(
LogInterceptor( LogInterceptor(
@@ -324,56 +328,63 @@ class ApiService {
debugPrint('DEBUG: Auth token present: ${authToken != null}'); debugPrint('DEBUG: Auth token present: ${authToken != null}');
List<dynamic> allRegularChats = []; List<dynamic> allRegularChats = [];
if (limit == null) { if (limit == null) {
// Fetch all conversations using pagination // Fetch all conversations using pagination
debugPrint('DEBUG: Fetching ALL conversations using pagination'); debugPrint('DEBUG: Fetching ALL conversations using pagination');
int currentPage = 0; int currentPage = 0;
while (true) { while (true) {
debugPrint('DEBUG: Fetching page $currentPage'); debugPrint('DEBUG: Fetching page $currentPage');
final response = await _dio.get( final response = await _dio.get(
'/api/v1/chats/', '/api/v1/chats/',
queryParameters: {'page': currentPage}, queryParameters: {'page': currentPage},
); );
if (response.data is! List) { if (response.data is! List) {
throw Exception('Expected array of chats, got ${response.data.runtimeType}'); throw Exception(
'Expected array of chats, got ${response.data.runtimeType}',
);
} }
final pageChats = response.data as List; final pageChats = response.data as List;
debugPrint('DEBUG: Page $currentPage returned ${pageChats.length} conversations'); debugPrint(
'DEBUG: Page $currentPage returned ${pageChats.length} conversations',
);
if (pageChats.isEmpty) { if (pageChats.isEmpty) {
debugPrint('DEBUG: No more conversations, stopping pagination'); debugPrint('DEBUG: No more conversations, stopping pagination');
break; break;
} }
allRegularChats.addAll(pageChats); allRegularChats.addAll(pageChats);
currentPage++; currentPage++;
// Safety break to avoid infinite loops (adjust as needed) // Safety break to avoid infinite loops (adjust as needed)
if (currentPage > 100) { if (currentPage > 100) {
debugPrint('WARNING: Reached maximum page limit (100), stopping pagination'); debugPrint(
'WARNING: Reached maximum page limit (100), stopping pagination',
);
break; break;
} }
} }
debugPrint('DEBUG: Fetched total of ${allRegularChats.length} conversations across $currentPage pages'); debugPrint(
'DEBUG: Fetched total of ${allRegularChats.length} conversations across $currentPage pages',
);
} else { } else {
// Original single page fetch // Original single page fetch
final regularResponse = await _dio.get( final regularResponse = await _dio.get(
'/api/v1/chats/', '/api/v1/chats/',
queryParameters: { queryParameters: {if (limit > 0) 'page': ((skip ?? 0) / limit).floor()},
if (limit > 0)
'page': ((skip ?? 0) / limit).floor(),
},
); );
if (regularResponse.data is! List) { if (regularResponse.data is! List) {
throw Exception('Expected array of chats, got ${regularResponse.data.runtimeType}'); throw Exception(
'Expected array of chats, got ${regularResponse.data.runtimeType}',
);
} }
allRegularChats = regularResponse.data as List; allRegularChats = regularResponse.data as List;
} }
@@ -381,7 +392,9 @@ class ApiService {
final archivedResponse = await _dio.get('/api/v1/chats/all/archived'); final archivedResponse = await _dio.get('/api/v1/chats/all/archived');
debugPrint('DEBUG: Pinned response status: ${pinnedResponse.statusCode}'); debugPrint('DEBUG: Pinned response status: ${pinnedResponse.statusCode}');
debugPrint('DEBUG: Archived response status: ${archivedResponse.statusCode}'); debugPrint(
'DEBUG: Archived response status: ${archivedResponse.statusCode}',
);
if (pinnedResponse.data is! List) { if (pinnedResponse.data is! List) {
throw Exception( throw Exception(
@@ -438,16 +451,23 @@ class ApiService {
for (final chatData in regularChatList) { for (final chatData in regularChatList) {
try { try {
// Debug: Check if conversation has folder_id in raw data // Debug: Check if conversation has folder_id in raw data
if (chatData.containsKey('folder_id') && chatData['folder_id'] != null) { if (chatData.containsKey('folder_id') &&
debugPrint('🔍 DEBUG: Found conversation with folder_id in raw data: ${chatData['id']} -> ${chatData['folder_id']}'); chatData['folder_id'] != null) {
debugPrint(
'🔍 DEBUG: Found conversation with folder_id in raw data: ${chatData['id']} -> ${chatData['folder_id']}',
);
} }
// Debug: Check what fields are available in the chat data // Debug: Check what fields are available in the chat data
if (regularChatList.indexOf(chatData) == 0) { if (regularChatList.indexOf(chatData) == 0) {
debugPrint('🔍 DEBUG: Sample chat data fields: ${chatData.keys.toList()}'); debugPrint(
debugPrint('🔍 DEBUG: Sample chat data: ${chatData.toString().substring(0, 200)}...'); '🔍 DEBUG: Sample chat data fields: ${chatData.keys.toList()}',
);
debugPrint(
'🔍 DEBUG: Sample chat data: ${chatData.toString().substring(0, 200)}...',
);
} }
final conversation = _parseOpenWebUIChat(chatData); final conversation = _parseOpenWebUIChat(chatData);
// Only add if not already added as pinned or archived // Only add if not already added as pinned or archived
if (!pinnedIds.contains(conversation.id) && if (!pinnedIds.contains(conversation.id) &&
@@ -521,10 +541,12 @@ class ApiService {
final archived = chatData['archived'] as bool? ?? false; final archived = chatData['archived'] as bool? ?? false;
final shareId = chatData['share_id'] as String?; final shareId = chatData['share_id'] as String?;
final folderId = chatData['folder_id'] as String?; final folderId = chatData['folder_id'] as String?;
// Debug logging for folder assignment // Debug logging for folder assignment
if (folderId != null) { if (folderId != null) {
debugPrint('🔍 DEBUG: Conversation ${id.substring(0, 8)} has folderId: $folderId'); debugPrint(
'🔍 DEBUG: Conversation ${id.substring(0, 8)} has folderId: $folderId',
);
} }
debugPrint( debugPrint(
@@ -561,10 +583,10 @@ class ApiService {
Conversation _parseFullOpenWebUIChat(Map<String, dynamic> chatData) { Conversation _parseFullOpenWebUIChat(Map<String, dynamic> chatData) {
debugPrint('DEBUG: Parsing full OpenWebUI chat data'); debugPrint('DEBUG: Parsing full OpenWebUI chat data');
debugPrint('DEBUG: Chat data keys: ${chatData.keys.toList()}'); debugPrint('DEBUG: Chat data keys: ${chatData.keys.toList()}');
final id = chatData['id'] as String; final id = chatData['id'] as String;
final title = chatData['title'] as String; final title = chatData['title'] as String;
debugPrint('DEBUG: Parsed chat ID: $id'); debugPrint('DEBUG: Parsed chat ID: $id');
debugPrint('DEBUG: Parsed chat title: $title'); debugPrint('DEBUG: Parsed chat title: $title');
@@ -581,7 +603,7 @@ class ApiService {
// Parse messages from the 'chat' object or top-level messages // Parse messages from the 'chat' object or top-level messages
final chatObject = chatData['chat'] as Map<String, dynamic>?; final chatObject = chatData['chat'] as Map<String, dynamic>?;
final messages = <ChatMessage>[]; final messages = <ChatMessage>[];
// Extract model from chat.models array // Extract model from chat.models array
String? model; String? model;
if (chatObject != null && chatObject['models'] != null) { if (chatObject != null && chatObject['models'] != null) {
@@ -610,11 +632,13 @@ class ApiService {
debugPrint( debugPrint(
'DEBUG: Found ${messagesMap.length} messages in chat.history.messages (converting to list)', 'DEBUG: Found ${messagesMap.length} messages in chat.history.messages (converting to list)',
); );
// Convert map to list format to use common parsing logic // Convert map to list format to use common parsing logic
messagesList = []; messagesList = [];
for (final entry in messagesMap.entries) { for (final entry in messagesMap.entries) {
final msgData = Map<String, dynamic>.from(entry.value as Map<String, dynamic>); final msgData = Map<String, dynamic>.from(
entry.value as Map<String, dynamic>,
);
msgData['id'] = entry.key; // Use the key as the message ID msgData['id'] = entry.key; // Use the key as the message ID
messagesList.add(msgData); messagesList.add(msgData);
} }
@@ -704,7 +728,7 @@ class ApiService {
.where((file) => file is Map && file['file_id'] != null) .where((file) => file is Map && file['file_id'] != null)
.map((file) => file['file_id'] as String) .map((file) => file['file_id'] as String)
.toList(); .toList();
if (attachmentIds.isEmpty) { if (attachmentIds.isEmpty) {
attachmentIds = null; attachmentIds = null;
} }
@@ -738,7 +762,7 @@ class ApiService {
for (final msg in messages) { for (final msg in messages) {
final messageId = msg.id; final messageId = msg.id;
// Build message for history.messages map // Build message for history.messages map
messagesMap[messageId] = { messagesMap[messageId] = {
'id': messageId, 'id': messageId,
@@ -749,12 +773,12 @@ class ApiService {
'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000, 'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000,
if (msg.role == 'user' && model != null) 'models': [model], if (msg.role == 'user' && model != null) 'models': [model],
}; };
// Update parent's childrenIds if there's a previous message // Update parent's childrenIds if there's a previous message
if (previousId != null && messagesMap.containsKey(previousId)) { if (previousId != null && messagesMap.containsKey(previousId)) {
(messagesMap[previousId]['childrenIds'] as List).add(messageId); (messagesMap[previousId]['childrenIds'] as List).add(messageId);
} }
// Build message for messages array // Build message for messages array
messagesArray.add({ messagesArray.add({
'id': messageId, 'id': messageId,
@@ -765,7 +789,7 @@ class ApiService {
'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000, 'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000,
if (msg.role == 'user' && model != null) 'models': [model], if (msg.role == 'user' && model != null) 'models': [model],
}); });
previousId = messageId; previousId = messageId;
currentId = messageId; currentId = messageId;
} }
@@ -793,7 +817,9 @@ class ApiService {
final response = await _dio.post('/api/v1/chats/new', data: chatData); final response = await _dio.post('/api/v1/chats/new', data: chatData);
debugPrint('DEBUG: Create conversation response status: ${response.statusCode}'); debugPrint(
'DEBUG: Create conversation response status: ${response.statusCode}',
);
debugPrint('DEBUG: Create conversation response data: ${response.data}'); debugPrint('DEBUG: Create conversation response data: ${response.data}');
// Parse the response // Parse the response
@@ -821,7 +847,7 @@ class ApiService {
for (final msg in messages) { for (final msg in messages) {
final messageId = msg.id; final messageId = msg.id;
// Build message for messages map (history.messages) // Build message for messages map (history.messages)
messagesMap[messageId] = { messagesMap[messageId] = {
'id': messageId, 'id': messageId,
@@ -831,19 +857,20 @@ class ApiService {
'content': msg.content, 'content': msg.content,
'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000, 'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000,
if (msg.role == 'assistant' && msg.model != null) 'model': msg.model, if (msg.role == 'assistant' && msg.model != null) 'model': msg.model,
if (msg.role == 'assistant' && msg.model != null) 'modelName': msg.model, if (msg.role == 'assistant' && msg.model != null)
'modelName': msg.model,
if (msg.role == 'assistant') 'modelIdx': 0, if (msg.role == 'assistant') 'modelIdx': 0,
if (msg.role == 'assistant') 'done': true, if (msg.role == 'assistant') 'done': true,
if (msg.role == 'user' && model != null) 'models': [model], if (msg.role == 'user' && model != null) 'models': [model],
if (msg.attachmentIds != null && msg.attachmentIds!.isNotEmpty) if (msg.attachmentIds != null && msg.attachmentIds!.isNotEmpty)
'files': msg.attachmentIds!.map((id) => {'file_id': id}).toList(), 'files': msg.attachmentIds!.map((id) => {'file_id': id}).toList(),
}; };
// Update parent's childrenIds // Update parent's childrenIds
if (previousId != null && messagesMap.containsKey(previousId)) { if (previousId != null && messagesMap.containsKey(previousId)) {
(messagesMap[previousId]['childrenIds'] as List).add(messageId); (messagesMap[previousId]['childrenIds'] as List).add(messageId);
} }
// Build message for messages array // Build message for messages array
messagesArray.add({ messagesArray.add({
'id': messageId, 'id': messageId,
@@ -853,14 +880,15 @@ class ApiService {
'content': msg.content, 'content': msg.content,
'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000, 'timestamp': msg.timestamp.millisecondsSinceEpoch ~/ 1000,
if (msg.role == 'assistant' && msg.model != null) 'model': msg.model, if (msg.role == 'assistant' && msg.model != null) 'model': msg.model,
if (msg.role == 'assistant' && msg.model != null) 'modelName': msg.model, if (msg.role == 'assistant' && msg.model != null)
'modelName': msg.model,
if (msg.role == 'assistant') 'modelIdx': 0, if (msg.role == 'assistant') 'modelIdx': 0,
if (msg.role == 'assistant') 'done': true, if (msg.role == 'assistant') 'done': true,
if (msg.role == 'user' && model != null) 'models': [model], if (msg.role == 'user' && model != null) 'models': [model],
if (msg.attachmentIds != null && msg.attachmentIds!.isNotEmpty) if (msg.attachmentIds != null && msg.attachmentIds!.isNotEmpty)
'files': msg.attachmentIds!.map((id) => {'file_id': id}).toList(), 'files': msg.attachmentIds!.map((id) => {'file_id': id}).toList(),
}); });
previousId = messageId; previousId = messageId;
currentId = messageId; currentId = messageId;
} }
@@ -868,7 +896,7 @@ class ApiService {
// Create the chat data structure matching OpenWebUI format exactly // Create the chat data structure matching OpenWebUI format exactly
final chatData = { final chatData = {
'chat': { 'chat': {
if (title != null) 'title': title, // Include the title if provided if (title != null) 'title': title, // Include the title if provided
'models': model != null ? [model] : [], 'models': model != null ? [model] : [],
'messages': messagesArray, 'messages': messagesArray,
'history': { 'history': {
@@ -950,8 +978,6 @@ class ApiService {
await _dio.post('/api/v1/users/user/settings', data: settings); await _dio.post('/api/v1/users/user/settings', data: settings);
} }
// Suggestions // Suggestions
Future<List<String>> getSuggestions() async { Future<List<String>> getSuggestions() async {
debugPrint('DEBUG: Fetching conversation suggestions'); debugPrint('DEBUG: Fetching conversation suggestions');
@@ -985,7 +1011,7 @@ class ApiService {
final response = await _dio.get('/api/v1/folders/'); final response = await _dio.get('/api/v1/folders/');
debugPrint('DEBUG: Folders response status: ${response.statusCode}'); debugPrint('DEBUG: Folders response status: ${response.statusCode}');
debugPrint('DEBUG: Folders response data: ${response.data}'); debugPrint('DEBUG: Folders response data: ${response.data}');
final data = response.data; final data = response.data;
if (data is List) { if (data is List) {
debugPrint('DEBUG: Found ${data.length} folders'); debugPrint('DEBUG: Found ${data.length} folders');
@@ -1496,12 +1522,14 @@ class ApiService {
Map<String, dynamic>? modelItem, Map<String, dynamic>? modelItem,
String? sessionId, String? sessionId,
}) async { }) async {
debugPrint('DEBUG: Sending chat completed notification (optional endpoint)'); debugPrint(
'DEBUG: Sending chat completed notification (optional endpoint)',
);
// This endpoint appears to be optional or deprecated in newer OpenWebUI versions // This endpoint appears to be optional or deprecated in newer OpenWebUI versions
// The main chat synchronization happens through /api/v1/chats/{id} updates // The main chat synchronization happens through /api/v1/chats/{id} updates
// We'll still try to call it but won't fail if it doesn't work // We'll still try to call it but won't fail if it doesn't work
// Format messages to match OpenWebUI expected structure // Format messages to match OpenWebUI expected structure
// Note: Removing 'id' field as it causes 400 error // Note: Removing 'id' field as it causes 400 error
final formattedMessages = messages.map((msg) { final formattedMessages = messages.map((msg) {
@@ -1509,9 +1537,10 @@ class ApiService {
// Don't include 'id' - it causes 400 error with detail: 'id' // Don't include 'id' - it causes 400 error with detail: 'id'
'role': msg['role'], 'role': msg['role'],
'content': msg['content'], 'content': msg['content'],
'timestamp': msg['timestamp'] ?? DateTime.now().millisecondsSinceEpoch ~/ 1000, 'timestamp':
msg['timestamp'] ?? DateTime.now().millisecondsSinceEpoch ~/ 1000,
}; };
// Add model info for assistant messages // Add model info for assistant messages
if (msg['role'] == 'assistant') { if (msg['role'] == 'assistant') {
formatted['model'] = model; formatted['model'] = model;
@@ -1519,17 +1548,18 @@ class ApiService {
formatted['usage'] = msg['usage']; formatted['usage'] = msg['usage'];
} }
} }
return formatted; return formatted;
}).toList(); }).toList();
// Include the message ID and session ID at the top level - server expects these // Include the message ID and session ID at the top level - server expects these
final requestData = { final requestData = {
'id': messageId, // The server expects the assistant message ID here 'id': messageId, // The server expects the assistant message ID here
'chat_id': chatId, 'chat_id': chatId,
'model': model, 'model': model,
'messages': formattedMessages, 'messages': formattedMessages,
'session_id': sessionId ?? const Uuid().v4().substring(0, 20), // Add session_id 'session_id':
sessionId ?? const Uuid().v4().substring(0, 20), // Add session_id
// Don't include model_item as it might not be expected // Don't include model_item as it might not be expected
}; };
@@ -1541,7 +1571,9 @@ class ApiService {
debugPrint('DEBUG: Chat completed response: ${response.statusCode}'); debugPrint('DEBUG: Chat completed response: ${response.statusCode}');
} catch (e) { } catch (e) {
// This is a non-critical endpoint - main sync happens via /api/v1/chats/{id} // This is a non-critical endpoint - main sync happens via /api/v1/chats/{id}
debugPrint('DEBUG: Chat completed endpoint not available or failed (non-critical): $e'); debugPrint(
'DEBUG: Chat completed endpoint not available or failed (non-critical): $e',
);
} }
} }
@@ -2379,8 +2411,7 @@ class ApiService {
// Send message with SSE streaming // Send message with SSE streaming
// Returns a record with (stream, messageId, sessionId) // Returns a record with (stream, messageId, sessionId)
({Stream<String> stream, String messageId, String sessionId}) ({Stream<String> stream, String messageId, String sessionId}) sendMessage({
sendMessage({
required List<Map<String, dynamic>> messages, required List<Map<String, dynamic>> messages,
required String model, required String model,
String? conversationId, String? conversationId,
@@ -2450,12 +2481,12 @@ class ApiService {
'model': model, 'model': model,
'messages': processedMessages, 'messages': processedMessages,
}; };
// Add only essential parameters // Add only essential parameters
if (conversationId != null) { if (conversationId != null) {
data['chat_id'] = conversationId; data['chat_id'] = conversationId;
} }
// Add web search flag if enabled // Add web search flag if enabled
if (enableWebSearch) { if (enableWebSearch) {
data['web_search'] = true; data['web_search'] = true;
@@ -2468,7 +2499,7 @@ class ApiService {
}; };
debugPrint('DEBUG: Web search enabled in SSE request'); debugPrint('DEBUG: Web search enabled in SSE request');
} }
// Don't add session_id or id - they break SSE streaming! // Don't add session_id or id - they break SSE streaming!
// The server falls back to task-based async when these are present // The server falls back to task-based async when these are present
@@ -2478,12 +2509,14 @@ class ApiService {
// Debug the data being sent // Debug the data being sent
debugPrint('DEBUG: SSE request data keys: ${data.keys.toList()}'); debugPrint('DEBUG: SSE request data keys: ${data.keys.toList()}');
debugPrint('DEBUG: Has background_tasks: ${data.containsKey('background_tasks')}'); debugPrint(
'DEBUG: Has background_tasks: ${data.containsKey('background_tasks')}',
);
debugPrint('DEBUG: Has session_id: ${data.containsKey('session_id')}'); debugPrint('DEBUG: Has session_id: ${data.containsKey('session_id')}');
debugPrint('DEBUG: background_tasks value: ${data['background_tasks']}'); debugPrint('DEBUG: background_tasks value: ${data['background_tasks']}');
debugPrint('DEBUG: session_id value: ${data['session_id']}'); debugPrint('DEBUG: session_id value: ${data['session_id']}');
debugPrint('DEBUG: id value: ${data['id']}'); debugPrint('DEBUG: id value: ${data['id']}');
// Use SSE streaming with proper parser // Use SSE streaming with proper parser
_streamSSE(data, streamController, messageId); _streamSSE(data, streamController, messageId);
@@ -2503,11 +2536,11 @@ class ApiService {
final persistentService = PersistentStreamingService(); final persistentService = PersistentStreamingService();
final recoveryService = StreamRecoveryService(); final recoveryService = StreamRecoveryService();
final streamId = DateTime.now().millisecondsSinceEpoch.toString(); final streamId = DateTime.now().millisecondsSinceEpoch.toString();
// Extract metadata for recovery // Extract metadata for recovery
final conversationId = data['conversation_id'] ?? data['chat_id'] ?? ''; final conversationId = data['conversation_id'] ?? data['chat_id'] ?? '';
final sessionId = data['session_id'] ?? const Uuid().v4().substring(0, 20); final sessionId = data['session_id'] ?? const Uuid().v4().substring(0, 20);
// Register stream for recovery // Register stream for recovery
recoveryService.registerStream( recoveryService.registerStream(
streamId, streamId,
@@ -2523,40 +2556,46 @@ class ApiService {
}, },
), ),
); );
// Recovery callback for persistent service // Recovery callback for persistent service
Future<void> recoveryCallback() async { Future<void> recoveryCallback() async {
debugPrint('Persistent: Attempting to recover stream $streamId'); debugPrint('Persistent: Attempting to recover stream $streamId');
// Restart the streaming request // Restart the streaming request
_streamSSE(data, streamController, messageId); _streamSSE(data, streamController, messageId);
} }
// Declare variables that need to be accessible in catch block // Declare variables that need to be accessible in catch block
String? persistentStreamId; String? persistentStreamId;
try { try {
debugPrint('DEBUG: Making SSE request with parser to /api/chat/completions'); debugPrint(
'DEBUG: Making SSE request with parser to /api/chat/completions',
);
// Create a fresh Dio instance optimized for SSE streaming // Create a fresh Dio instance optimized for SSE streaming
final streamDio = Dio(BaseOptions( final streamDio = Dio(
baseUrl: serverConfig.url, BaseOptions(
connectTimeout: const Duration(seconds: 60), // Longer for initial connection baseUrl: serverConfig.url,
receiveTimeout: null, // No timeout for streaming connectTimeout: const Duration(
sendTimeout: const Duration(seconds: 30), seconds: 60,
headers: { ), // Longer for initial connection
'Authorization': 'Bearer ${_authInterceptor.authToken}', receiveTimeout: null, // No timeout for streaming
'Accept': 'text/event-stream', sendTimeout: const Duration(seconds: 30),
'Cache-Control': 'no-cache', headers: {
'Connection': 'keep-alive', 'Authorization': 'Bearer ${_authInterceptor.authToken}',
...serverConfig.customHeaders, // Include any custom headers 'Accept': 'text/event-stream',
}, 'Cache-Control': 'no-cache',
validateStatus: (status) => status != null && status < 400, 'Connection': 'keep-alive',
followRedirects: true, ...serverConfig.customHeaders, // Include any custom headers
maxRedirects: 3, },
)); validateStatus: (status) => status != null && status < 400,
followRedirects: true,
maxRedirects: 3,
),
);
debugPrint('DEBUG: Sending SSE request with data: ${jsonEncode(data)}'); debugPrint('DEBUG: Sending SSE request with data: ${jsonEncode(data)}');
final response = await streamDio.post( final response = await streamDio.post(
'/api/chat/completions', '/api/chat/completions',
data: data, // Pass data directly as Map data: data, // Pass data directly as Map
@@ -2568,37 +2607,43 @@ class ApiService {
debugPrint('DEBUG: SSE response status: ${response.statusCode}'); debugPrint('DEBUG: SSE response status: ${response.statusCode}');
debugPrint('DEBUG: SSE response headers: ${response.headers}'); debugPrint('DEBUG: SSE response headers: ${response.headers}');
debugPrint('DEBUG: SSE content-type: ${response.headers.value('content-type')}'); debugPrint(
'DEBUG: SSE content-type: ${response.headers.value('content-type')}',
);
if (response.statusCode != 200) { if (response.statusCode != 200) {
throw Exception('HTTP ${response.statusCode}: Failed to start streaming'); throw Exception(
'HTTP ${response.statusCode}: Failed to start streaming',
);
} }
// Check if we got SSE or JSON response // Check if we got SSE or JSON response
final contentType = response.headers.value('content-type') ?? ''; final contentType = response.headers.value('content-type') ?? '';
if (!contentType.contains('text/event-stream')) { if (!contentType.contains('text/event-stream')) {
debugPrint('WARNING: Expected SSE but got content-type: $contentType'); debugPrint('WARNING: Expected SSE but got content-type: $contentType');
debugPrint('WARNING: This usually means the server didn\'t receive the streaming parameters'); debugPrint(
'WARNING: This usually means the server didn\'t receive the streaming parameters',
);
// Try to read the response to see what we got // Try to read the response to see what we got
final stream = response.data.stream as Stream<List<int>>; final stream = response.data.stream as Stream<List<int>>;
final bytes = await stream.toList(); final bytes = await stream.toList();
final fullBytes = bytes.expand((x) => x).toList(); final fullBytes = bytes.expand((x) => x).toList();
final responseText = utf8.decode(fullBytes); final responseText = utf8.decode(fullBytes);
debugPrint('DEBUG: Non-SSE response length: ${responseText.length}'); debugPrint('DEBUG: Non-SSE response length: ${responseText.length}');
// If it's JSON, parse and handle it // If it's JSON, parse and handle it
if (contentType.contains('application/json')) { if (contentType.contains('application/json')) {
try { try {
final json = jsonDecode(responseText); final json = jsonDecode(responseText);
// Check if it's an error // Check if it's an error
if (json is Map && json.containsKey('error')) { if (json is Map && json.containsKey('error')) {
debugPrint('ERROR: Server returned error: ${json['error']}'); debugPrint('ERROR: Server returned error: ${json['error']}');
streamController.addError('Server error: ${json['error']}'); streamController.addError('Server error: ${json['error']}');
return; return;
} }
// Try to extract content from non-streaming response // Try to extract content from non-streaming response
if (json is Map && json.containsKey('choices')) { if (json is Map && json.containsKey('choices')) {
final choices = json['choices'] as List?; final choices = json['choices'] as List?;
@@ -2608,7 +2653,9 @@ class ApiService {
final message = choice['message'] as Map<String, dynamic>; final message = choice['message'] as Map<String, dynamic>;
final content = message['content']?.toString() ?? ''; final content = message['content']?.toString() ?? '';
if (content.isNotEmpty) { if (content.isNotEmpty) {
debugPrint('DEBUG: Successfully extracted content from JSON response'); debugPrint(
'DEBUG: Successfully extracted content from JSON response',
);
// Stream the content word by word for better UX // Stream the content word by word for better UX
final words = content.split(' '); final words = content.split(' ');
for (final word in words) { for (final word in words) {
@@ -2619,15 +2666,17 @@ class ApiService {
} }
} }
} }
// Log what we got if we couldn't extract content // Log what we got if we couldn't extract content
if (!streamController.isClosed) { if (!streamController.isClosed) {
debugPrint('DEBUG: JSON response structure: ${json.keys}'); debugPrint('DEBUG: JSON response structure: ${json.keys}');
debugPrint('DEBUG: Full JSON response: $json'); debugPrint('DEBUG: Full JSON response: $json');
// Check if it's a task-based response // Check if it's a task-based response
if (json is Map && json.containsKey('task_id')) { if (json is Map && json.containsKey('task_id')) {
debugPrint('DEBUG: Got task-based response with task_id: ${json['task_id']}'); debugPrint(
'DEBUG: Got task-based response with task_id: ${json['task_id']}',
);
debugPrint('DEBUG: Status: ${json['status']}'); debugPrint('DEBUG: Status: ${json['status']}');
// This might be a polling-based async pattern // This might be a polling-based async pattern
// TODO: Implement polling for task completion // TODO: Implement polling for task completion
@@ -2636,7 +2685,9 @@ class ApiService {
} catch (e) { } catch (e) {
debugPrint('ERROR: Failed to parse JSON response: $e'); debugPrint('ERROR: Failed to parse JSON response: $e');
// Try to show something to the user // Try to show something to the user
streamController.add('Response received but could not be parsed properly.'); streamController.add(
'Response received but could not be parsed properly.',
);
} }
} else { } else {
// Not JSON, might be plain text // Not JSON, might be plain text
@@ -2645,14 +2696,14 @@ class ApiService {
streamController.add(responseText); streamController.add(responseText);
} }
} }
streamController.close(); streamController.close();
return; return;
} }
// Parse SSE stream using enhanced parser with heartbeat monitoring // Parse SSE stream using enhanced parser with heartbeat monitoring
final rawStream = response.data.stream; final rawStream = response.data.stream;
// Handle the stream properly based on its actual type // Handle the stream properly based on its actual type
Stream<List<int>> byteStream; Stream<List<int>> byteStream;
if (rawStream is Stream<Uint8List>) { if (rawStream is Stream<Uint8List>) {
@@ -2660,38 +2711,42 @@ class ApiService {
} else { } else {
byteStream = rawStream as Stream<List<int>>; byteStream = rawStream as Stream<List<int>>;
} }
// Parse SSE events with enhanced parser (includes heartbeat monitoring) // Parse SSE events with enhanced parser (includes heartbeat monitoring)
final sseParser = SSEParser(heartbeatTimeout: const Duration(seconds: 45)); final sseParser = SSEParser(
heartbeatTimeout: const Duration(seconds: 45),
);
int contentIndex = 0; int contentIndex = 0;
int chunkSequence = 0; int chunkSequence = 0;
String accumulatedContent = ''; String accumulatedContent = '';
// Monitor parser heartbeat for reconnection // Monitor parser heartbeat for reconnection
sseParser.heartbeat.listen((_) { sseParser.heartbeat.listen((_) {
debugPrint('Persistent: SSE heartbeat timeout detected'); debugPrint('Persistent: SSE heartbeat timeout detected');
}); });
sseParser.reconnectRequests.listen((lastEventId) { sseParser.reconnectRequests.listen((lastEventId) {
debugPrint('Persistent: SSE reconnection requested, lastEventId: $lastEventId'); debugPrint(
'Persistent: SSE reconnection requested, lastEventId: $lastEventId',
);
// The persistent service will handle the reconnection // The persistent service will handle the reconnection
}); });
// Convert bytes to SSE events // Convert bytes to SSE events
final sseEventStream = SSEParser.parseStream( final sseEventStream = SSEParser.parseStream(
byteStream, byteStream,
heartbeatTimeout: const Duration(seconds: 45), heartbeatTimeout: const Duration(seconds: 45),
); );
// Listen to the SSE event stream // Listen to the SSE event stream
final streamSubscription = sseEventStream.listen( final streamSubscription = sseEventStream.listen(
(event) { (event) {
try { try {
chunkSequence++; chunkSequence++;
// Update parser with chunk data for heartbeat monitoring // Update parser with chunk data for heartbeat monitoring
sseParser.feed(''); // Reset heartbeat timer sseParser.feed(''); // Reset heartbeat timer
// Process the event data // Process the event data
if (persistentStreamId != null) { if (persistentStreamId != null) {
_processSseEvent( _processSseEvent(
@@ -2703,10 +2758,13 @@ class ApiService {
persistentStreamId, persistentStreamId,
); );
} }
// Update recovery state // Update recovery state
recoveryService.updateStreamProgress(streamId, event.data, contentIndex++); recoveryService.updateStreamProgress(
streamId,
event.data,
contentIndex++,
);
} catch (e) { } catch (e) {
debugPrint('Persistent: Error processing SSE event: $e'); debugPrint('Persistent: Error processing SSE event: $e');
streamController.addError(e); streamController.addError(e);
@@ -2724,10 +2782,10 @@ class ApiService {
}, },
onError: (error) async { onError: (error) async {
debugPrint('Persistent: SSE stream error: $error'); debugPrint('Persistent: SSE stream error: $error');
// Try recovery through recovery service first // Try recovery through recovery service first
final recoveredStream = await recoveryService.recoverStream(streamId); final recoveredStream = await recoveryService.recoverStream(streamId);
if (recoveredStream != null) { if (recoveredStream != null) {
debugPrint('Persistent: Successfully recovered SSE stream'); debugPrint('Persistent: Successfully recovered SSE stream');
recoveredStream.listen( recoveredStream.listen(
@@ -2757,9 +2815,10 @@ class ApiService {
streamController.addError(error); streamController.addError(error);
} }
}, },
cancelOnError: false, // Continue processing despite individual event errors cancelOnError:
false, // Continue processing despite individual event errors
); );
// Register with persistent streaming service now that subscription is created // Register with persistent streaming service now that subscription is created
persistentStreamId = persistentService.registerStream( persistentStreamId = persistentService.registerStream(
subscription: streamSubscription, subscription: streamSubscription,
@@ -2775,14 +2834,13 @@ class ApiService {
'requestData': data, 'requestData': data,
}, },
); );
} catch (e) { } catch (e) {
debugPrint('Persistent: Failed to create SSE stream: $e'); debugPrint('Persistent: Failed to create SSE stream: $e');
if (persistentStreamId != null) { if (persistentStreamId != null) {
persistentService.unregisterStream(persistentStreamId); persistentService.unregisterStream(persistentStreamId);
} }
recoveryService.unregisterStream(streamId); recoveryService.unregisterStream(streamId);
if (e is DioException && e.response?.statusCode == 401) { if (e is DioException && e.response?.statusCode == 401) {
// Auth error - don't retry // Auth error - don't retry
streamController.addError('Authentication failed'); streamController.addError('Authentication failed');
@@ -2792,7 +2850,7 @@ class ApiService {
} }
} }
} }
/// Process individual SSE events with content extraction and progress tracking /// Process individual SSE events with content extraction and progress tracking
void _processSseEvent( void _processSseEvent(
SSEEvent event, SSEEvent event,
@@ -2802,8 +2860,10 @@ class ApiService {
PersistentStreamingService persistentService, PersistentStreamingService persistentService,
String persistentStreamId, String persistentStreamId,
) { ) {
debugPrint('Persistent: SSE event - type: ${event.event}, data: ${event.data}'); debugPrint(
'Persistent: SSE event - type: ${event.event}, data: ${event.data}',
);
// Handle completion signal // Handle completion signal
if (event.data == '[DONE]') { if (event.data == '[DONE]') {
debugPrint('Persistent: SSE stream finished with [DONE]'); debugPrint('Persistent: SSE stream finished with [DONE]');
@@ -2812,10 +2872,10 @@ class ApiService {
} }
return; return;
} }
try { try {
final json = jsonDecode(event.data) as Map<String, dynamic>; final json = jsonDecode(event.data) as Map<String, dynamic>;
// Handle errors // Handle errors
if (json.containsKey('error')) { if (json.containsKey('error')) {
final error = json['error']; final error = json['error'];
@@ -2823,110 +2883,113 @@ class ApiService {
streamController.addError('Server error: $error'); streamController.addError('Server error: $error');
return; return;
} }
// Handle content streaming // Handle content streaming
if (json.containsKey('choices')) { if (json.containsKey('choices')) {
final choices = json['choices'] as List?; final choices = json['choices'] as List?;
if (choices != null && choices.isNotEmpty) { if (choices != null && choices.isNotEmpty) {
final choice = choices[0] as Map<String, dynamic>; final choice = choices[0] as Map<String, dynamic>;
if (choice.containsKey('delta')) { if (choice.containsKey('delta')) {
final delta = choice['delta'] as Map<String, dynamic>; final delta = choice['delta'] as Map<String, dynamic>;
// Extract content // Extract content
if (delta.containsKey('content')) { if (delta.containsKey('content')) {
final content = delta['content'] as String?; final content = delta['content'] as String?;
if (content != null && content.isNotEmpty) { if (content != null && content.isNotEmpty) {
debugPrint('Persistent: SSE content chunk: "$content"'); debugPrint('Persistent: SSE content chunk: "$content"');
// Add content to stream // Add content to stream
if (!streamController.isClosed) { if (!streamController.isClosed) {
streamController.add(content); streamController.add(content);
} }
// Update persistent service progress // Update persistent service progress
persistentService.updateStreamProgress( persistentService.updateStreamProgress(
persistentStreamId, persistentStreamId,
chunkSequence: chunkSequence, chunkSequence: chunkSequence,
appendedContent: content, appendedContent: content,
); );
accumulatedContent += content; accumulatedContent += content;
} }
} }
// Check for completion in delta // Check for completion in delta
if (delta.containsKey('finish_reason')) { if (delta.containsKey('finish_reason')) {
final finishReason = delta['finish_reason']; final finishReason = delta['finish_reason'];
debugPrint('Persistent: Stream finished with reason: $finishReason'); debugPrint(
if (!streamController.isClosed) { 'Persistent: Stream finished with reason: $finishReason',
streamController.close(); );
} if (!streamController.isClosed) {
return; streamController.close();
} }
} else if (choice.containsKey('finish_reason')) { return;
// Check for completion at choice level }
final finishReason = choice['finish_reason']; } else if (choice.containsKey('finish_reason')) {
if (finishReason != null) { // Check for completion at choice level
debugPrint('Persistent: Stream finished with reason: $finishReason'); final finishReason = choice['finish_reason'];
if (!streamController.isClosed) { if (finishReason != null) {
streamController.close(); debugPrint(
} 'Persistent: Stream finished with reason: $finishReason',
return; );
} if (!streamController.isClosed) {
} streamController.close();
} }
} return;
}
// Handle streaming chat/completions format variations }
if (json.containsKey('delta')) { }
final delta = json['delta'] as Map<String, dynamic>;
if (delta.containsKey('content')) {
final content = delta['content'] as String?;
if (content != null && content.isNotEmpty) {
debugPrint('Persistent: Direct delta content: "$content"');
if (!streamController.isClosed) {
streamController.add(content);
}
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
appendedContent: content,
);
accumulatedContent += content;
}
}
}
// Handle OpenRouter-style streaming
if (json.containsKey('message')) {
final message = json['message'] as Map<String, dynamic>;
if (message.containsKey('content')) {
final content = message['content'] as String?;
if (content != null && content.isNotEmpty) {
debugPrint('Persistent: Message content: "$content"');
if (!streamController.isClosed) {
streamController.add(content);
}
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
content: content, // Full content, not appended
);
}
}
}
} catch (e) {
debugPrint('Persistent: Error parsing SSE event data: $e');
// Don't fail the entire stream for one bad event
} }
// Handle streaming chat/completions format variations
if (json.containsKey('delta')) {
final delta = json['delta'] as Map<String, dynamic>;
if (delta.containsKey('content')) {
final content = delta['content'] as String?;
if (content != null && content.isNotEmpty) {
debugPrint('Persistent: Direct delta content: "$content"');
if (!streamController.isClosed) {
streamController.add(content);
}
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
appendedContent: content,
);
accumulatedContent += content;
}
}
}
// Handle OpenRouter-style streaming
if (json.containsKey('message')) {
final message = json['message'] as Map<String, dynamic>;
if (message.containsKey('content')) {
final content = message['content'] as String?;
if (content != null && content.isNotEmpty) {
debugPrint('Persistent: Message content: "$content"');
if (!streamController.isClosed) {
streamController.add(content);
}
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
content: content, // Full content, not appended
);
}
}
}
} catch (e) {
debugPrint('Persistent: Error parsing SSE event data: $e');
// Don't fail the entire stream for one bad event
} }
}
// Legacy Socket.IO and older SSE methods removed // Legacy Socket.IO and older SSE methods removed