From 7daf331daf9b75cda28f1b6d2f14e0abea2b852f Mon Sep 17 00:00:00 2001 From: cogwheel0 <172976095+cogwheel0@users.noreply.github.com> Date: Sun, 31 Aug 2025 19:07:19 +0530 Subject: [PATCH] feat: enhance message parsing and tool call handling in ApiService and ChatProviders --- lib/core/services/api_service.dart | 262 +++++++++++++++++- .../chat/providers/chat_providers.dart | 177 ++++++------ openwebui-src | 1 + 3 files changed, 332 insertions(+), 108 deletions(-) create mode 160000 openwebui-src diff --git a/lib/core/services/api_service.dart b/lib/core/services/api_service.dart index 467d583..b059192 100644 --- a/lib/core/services/api_service.dart +++ b/lib/core/services/api_service.dart @@ -572,6 +572,7 @@ class ApiService { // Try multiple locations for messages - prefer list format to avoid duplication List? messagesList; + Map? historyMessagesMap; if (chatObject != null) { // Check for messages in chat.messages (list format) - PREFERRED @@ -580,11 +581,17 @@ class ApiService { debugPrint( 'DEBUG: Found ${messagesList.length} messages in chat.messages', ); + // Also capture history map for richer assistant entries (tool_calls, files) + final history = chatObject['history'] as Map?; + if (history != null && history['messages'] is Map) { + historyMessagesMap = history['messages'] as Map; + } } else { // Fallback: Check for messages in chat.history.messages (map format) final history = chatObject['history'] as Map?; if (history != null && history['messages'] != null) { final messagesMap = history['messages'] as Map; + historyMessagesMap = messagesMap; debugPrint( 'DEBUG: Found ${messagesMap.length} messages in chat.history.messages (converting to list)', ); @@ -609,13 +616,67 @@ class ApiService { // Parse messages from list format only (avoiding duplication) if (messagesList != null) { - for (final msgData in messagesList) { + for (int idx = 0; idx < messagesList.length; idx++) { + final msgData = messagesList[idx] as Map; try { debugPrint( 'DEBUG: Parsing message: ${msgData['id']} - role: ${msgData['role']} - content length: ${msgData['content']?.toString().length ?? 0}', ); - // Convert OpenWebUI message format to our ChatMessage format - final message = _parseOpenWebUIMessage(msgData); + + // If this assistant message includes tool_calls, merge following tool results + final historyMsg = historyMessagesMap != null + ? (historyMessagesMap![msgData['id']] as Map?) + : null; + + final toolCalls = (msgData['tool_calls'] is List) + ? (msgData['tool_calls'] as List) + : (historyMsg != null && historyMsg['tool_calls'] is List) + ? (historyMsg['tool_calls'] as List) + : null; + + if ((msgData['role']?.toString() == 'assistant') && toolCalls is List) { + // Collect subsequent tool results associated with this assistant turn + final List> results = []; + int j = idx + 1; + while (j < messagesList.length) { + final next = messagesList[j] as Map; + if ((next['role']?.toString() ?? '') != 'tool') break; + final toolCallId = next['tool_call_id']?.toString(); + final resContent = next['content']; + final resFiles = next['files']; + results.add({ + 'tool_call_id': toolCallId, + 'content': resContent, + if (resFiles != null) 'files': resFiles, + }); + j++; + } + + // Synthesize content from tool_calls and results + final synthesized = _synthesizeToolDetailsFromToolCallsWithResults( + toolCalls, + results, + ); + + final mergedAssistant = Map.from(msgData); + mergedAssistant['content'] = synthesized; + + final message = _parseOpenWebUIMessage( + mergedAssistant, + historyMsg: historyMsg, + ); + messages.add(message); + + // Skip the tool messages we just merged + idx = j - 1; + debugPrint( + 'DEBUG: Successfully parsed tool_call assistant turn: ${message.id}', + ); + continue; + } + + // Default path: parse message as-is + final message = _parseOpenWebUIMessage(msgData, historyMsg: historyMsg); messages.add(message); debugPrint( 'DEBUG: Successfully parsed message: ${message.id} - ${message.role}', @@ -643,25 +704,52 @@ class ApiService { } // Parse OpenWebUI message format to our ChatMessage format - ChatMessage _parseOpenWebUIMessage(Map msgData) { + ChatMessage _parseOpenWebUIMessage( + Map msgData, { + Map? historyMsg, + }) { // OpenWebUI message format may vary, but typically: // { "role": "user|assistant", "content": "text", ... } // Create a single UUID instance to reuse const uuid = Uuid(); - // Handle content that could be either String or List (for content arrays) - final content = msgData['content']; + // Prefer richer content from history entry if present + dynamic content = msgData['content']; + if ((content == null || (content is String && content.isEmpty)) && + historyMsg != null && historyMsg['content'] != null) { + content = historyMsg['content']; + } String contentString; if (content is List) { - // For content arrays, extract the text content + // Extract text content from array; if none, build from tool-like items later final textContent = content.firstWhere( (item) => item is Map && item['type'] == 'text', orElse: () => {'text': ''}, ); - contentString = textContent['text'] as String? ?? ''; + contentString = (textContent['text'] as String?) ?? ''; + if (contentString.trim().isEmpty) { + // Fallback: look for tool-related entries in the array and synthesize details blocks + final synthesized = _synthesizeToolDetailsFromContentArray(content); + if (synthesized.isNotEmpty) { + contentString = synthesized; + } + } } else { - contentString = content as String? ?? ''; + contentString = (content as String?) ?? ''; + } + + // Final fallback: some servers store tool calls under tool_calls instead of content + final toolCallsList = (msgData['tool_calls'] is List) + ? (msgData['tool_calls'] as List) + : (historyMsg != null && historyMsg['tool_calls'] is List) + ? (historyMsg['tool_calls'] as List) + : null; + if (contentString.trim().isEmpty && toolCallsList is List) { + final synthesized = _synthesizeToolDetailsFromToolCalls(toolCallsList); + if (synthesized.isNotEmpty) { + contentString = synthesized; + } } // Determine role based on available fields @@ -680,8 +768,9 @@ class ApiService { List? attachmentIds; List>? files; - if (msgData['files'] != null) { - final filesList = msgData['files'] as List; + final effectiveFiles = msgData['files'] ?? historyMsg?['files']; + if (effectiveFiles != null) { + final filesList = effectiveFiles as List; // Separate user uploads (with file_id) from generated images (with type and url) final userAttachments = []; @@ -714,6 +803,137 @@ class ApiService { ); } + // ===== Helpers to synthesize tool-call details blocks for UI parsing ===== + String _escapeHtmlAttr(String s) { + return s + .replaceAll('&', '&') + .replaceAll('"', '"') + .replaceAll("'", ''') + .replaceAll('<', '<') + .replaceAll('>', '>'); + } + + String _jsonStringify(dynamic v) { + try { + return jsonEncode(v); + } catch (_) { + return v?.toString() ?? ''; + } + } + + String _synthesizeToolDetailsFromToolCalls(List toolCalls) { + final buf = StringBuffer(); + for (final c in toolCalls) { + if (c is! Map) continue; + final func = c['function'] as Map?; + final name = (func != null ? func['name'] : c['name'])?.toString() ?? 'tool'; + final id = (c['id']?.toString() ?? 'call_${DateTime.now().millisecondsSinceEpoch}'); + final done = (c['done']?.toString() ?? 'true'); + final argsRaw = func != null ? func['arguments'] : c['arguments']; + final resRaw = c['result'] ?? c['output'] ?? (func != null ? func['result'] : null); + final argsStr = _jsonStringify(argsRaw); + final resStr = resRaw != null ? _jsonStringify(resRaw) : null; + final attrs = StringBuffer() + ..write('type="tool_calls"') + ..write(' done="${_escapeHtmlAttr(done)}"') + ..write(' id="${_escapeHtmlAttr(id)}"') + ..write(' name="${_escapeHtmlAttr(name)}"') + ..write(' arguments="${_escapeHtmlAttr(argsStr)}"'); + if (resStr != null && resStr.isNotEmpty) { + attrs.write(' result="${_escapeHtmlAttr(resStr)}"'); + } + buf.writeln('
Tool Executed'); + buf.writeln('
'); + } + return buf.toString().trim(); + } + + String _synthesizeToolDetailsFromToolCallsWithResults( + List toolCalls, + List> results, + ) { + final buf = StringBuffer(); + Map> resultsMap = {}; + for (final r in results) { + final id = r['tool_call_id']?.toString(); + if (id != null) resultsMap[id] = r; + } + + for (final c in toolCalls) { + if (c is! Map) continue; + final func = c['function'] as Map?; + final name = (func != null ? func['name'] : c['name'])?.toString() ?? 'tool'; + final id = (c['id']?.toString() ?? 'call_${DateTime.now().millisecondsSinceEpoch}'); + final argsRaw = func != null ? func['arguments'] : c['arguments']; + final argsStr = _jsonStringify(argsRaw); + final resultEntry = resultsMap[id]; + final resRaw = resultEntry != null ? resultEntry['content'] : null; + final filesRaw = resultEntry != null ? resultEntry['files'] : null; + final resStr = resRaw != null ? _jsonStringify(resRaw) : null; + final filesStr = filesRaw != null ? _jsonStringify(filesRaw) : null; + + final attrs = StringBuffer() + ..write('type="tool_calls"') + ..write(' done="${_escapeHtmlAttr(resultEntry != null ? 'true' : 'false')}"') + ..write(' id="${_escapeHtmlAttr(id)}"') + ..write(' name="${_escapeHtmlAttr(name)}"') + ..write(' arguments="${_escapeHtmlAttr(argsStr)}"'); + if (resStr != null && resStr.isNotEmpty) { + attrs.write(' result="${_escapeHtmlAttr(resStr)}"'); + } + if (filesStr != null && filesStr.isNotEmpty) { + attrs.write(' files="${_escapeHtmlAttr(filesStr)}"'); + } + + buf.writeln('
${resultEntry != null ? 'Tool Executed' : 'Executing...'}'); + buf.writeln('
'); + } + return buf.toString().trim(); + } + + String _synthesizeToolDetailsFromContentArray(List content) { + final buf = StringBuffer(); + for (final item in content) { + if (item is! Map) continue; + final type = item['type']?.toString(); + if (type == null) continue; + // OpenWebUI content-blocks shape: { type: 'tool_calls', content: [...], results: [...] } + if (type == 'tool_calls') { + final calls = (item['content'] is List) ? (item['content'] as List) : []; + final results = >[]; + if (item['results'] is List) { + for (final r in (item['results'] as List)) { + if (r is Map) results.add(r); + } + } + final synthesized = _synthesizeToolDetailsFromToolCallsWithResults(calls, results); + if (synthesized.isNotEmpty) buf.writeln(synthesized); + continue; + } + + // Heuristics: handle other variants (single tool/function call entries) + if (type == 'tool_call' || type == 'function_call') { + final name = (item['name'] ?? item['tool'] ?? 'tool').toString(); + final id = (item['id']?.toString() ?? 'call_${DateTime.now().millisecondsSinceEpoch}'); + final argsStr = _jsonStringify(item['arguments'] ?? item['args']); + final resStr = item['result'] ?? item['output'] ?? item['response']; + final attrs = StringBuffer() + ..write('type="tool_calls"') + ..write(' done="${_escapeHtmlAttr(resStr != null ? 'true' : 'false')}"') + ..write(' id="${_escapeHtmlAttr(id)}"') + ..write(' name="${_escapeHtmlAttr(name)}"') + ..write(' arguments="${_escapeHtmlAttr(argsStr)}"'); + if (resStr != null) { + final r = _jsonStringify(resStr); + if (r.isNotEmpty) attrs.write(' result="${_escapeHtmlAttr(r)}"'); + } + buf.writeln('
${resStr != null ? 'Tool Executed' : 'Executing...'}'); + buf.writeln('
'); + } + } + return buf.toString().trim(); + } + // Create new conversation using OpenWebUI API Future createConversation({ required String title, @@ -1565,6 +1785,10 @@ class ApiService { final response = await _dio.post( '/api/chat/completed', data: requestData, + options: Options( + sendTimeout: const Duration(seconds: 4), + receiveTimeout: const Duration(seconds: 4), + ), ); debugPrint('DEBUG: Chat completed response: ${response.statusCode}'); } catch (e) { @@ -2568,6 +2792,7 @@ class ApiService { required StreamController streamController, }) async { String last = ''; + int stableCount = 0; final started = DateTime.now(); bool containsDone(String s) => @@ -2727,12 +2952,23 @@ class ApiService { streamController.add(content); } } - last = content; - // Stop when we detect done=true on tool_calls or when content stabilizes if (containsDone(content)) { break; } + + // If content hasn't changed for a few polls, assume completion + final prev = last; + if (content == prev) { + stableCount++; + } else { + stableCount = 0; + } + if (stableCount >= 3) { + break; + } + + last = content; } catch (e) { // Ignore transient errors and continue polling } diff --git a/lib/features/chat/providers/chat_providers.dart b/lib/features/chat/providers/chat_providers.dart index fc86f3d..8838eb0 100644 --- a/lib/features/chat/providers/chat_providers.dart +++ b/lib/features/chat/providers/chat_providers.dart @@ -188,6 +188,10 @@ class ChatMessagesNotifier extends StateNotifier> { if (lastMessage.role != 'assistant') { return; } + if (!lastMessage.isStreaming) { + // Ignore late chunks when streaming already finished + return; + } // Strip a leading typing indicator if present, then append delta const ti = '[TYPING_INDICATOR]'; @@ -397,7 +401,7 @@ Future regenerateMessage( final assistantMessage = ChatMessage( id: const Uuid().v4(), role: 'assistant', - content: '[TYPING_INDICATOR]', + content: '', timestamp: DateTime.now(), model: selectedModel.name, isStreaming: true, @@ -464,7 +468,7 @@ Future regenerateMessage( final assistantMessage = ChatMessage( id: assistantMessageId, role: 'assistant', - content: '[TYPING_INDICATOR]', + content: '', timestamp: DateTime.now(), model: selectedModel.name, isStreaming: true, @@ -636,7 +640,7 @@ Future _sendMessageInternal( final assistantMessage = ChatMessage( id: const Uuid().v4(), role: 'assistant', - content: '[TYPING_INDICATOR]', + content: '', timestamp: DateTime.now(), model: selectedModel.name, isStreaming: true, @@ -1050,7 +1054,7 @@ Future _sendMessageInternal( final assistantMessage = ChatMessage( id: assistantMessageId, role: 'assistant', - content: '[TYPING_INDICATOR]', // Show typing indicator immediately + content: '', timestamp: DateTime.now(), model: selectedModel.name, isStreaming: true, @@ -1058,6 +1062,9 @@ Future _sendMessageInternal( ref.read(chatMessagesProvider.notifier).addMessage(assistantMessage); // If socket is available, start listening for chat-events immediately + // For background-tools flow (when socket session is present), socket is the primary stream. + // In that case, do NOT suppress socket content. + bool suppressSocketContent = (socketSessionId == null); // only suppress when using SSE stream if (socketService != null) { void chatHandler(Map ev) { try { @@ -1067,24 +1074,58 @@ Future _sendMessageInternal( final payload = data['data']; if (type == 'chat:completion' && payload != null) { if (payload is Map) { - if (payload.containsKey('choices')) { + // Provider may emit tool_calls at the top level + if (!suppressSocketContent && payload.containsKey('tool_calls')) { + final tc = payload['tool_calls']; + if (tc is List) { + for (final call in tc) { + if (call is Map) { + final fn = call['function']; + final name = (fn is Map && fn['name'] is String) ? fn['name'] as String : null; + if (name is String && name.isNotEmpty) { + final status = '\n
Executing...\n
\n'; + ref.read(chatMessagesProvider.notifier).appendToLastMessage(status); + } + } + } + } + } + if (!suppressSocketContent && payload.containsKey('choices')) { final choices = payload['choices']; if (choices is List && choices.isNotEmpty) { final choice = choices.first; final delta = choice is Map ? choice['delta'] : null; - final content = (delta is Map) ? (delta['content']?.toString() ?? '') : ''; - if (content.isNotEmpty) { - ref.read(chatMessagesProvider.notifier).appendToLastMessage(content); + if (delta is Map) { + // Surface tool_calls status like SSE path + if (delta.containsKey('tool_calls')) { + final tc = delta['tool_calls']; + if (tc is List) { + for (final call in tc) { + if (call is Map) { + final fn = call['function']; + final name = (fn is Map && fn['name'] is String) ? fn['name'] as String : null; + if (name is String && name.isNotEmpty) { + final status = '\n
Executing...\n
\n'; + ref.read(chatMessagesProvider.notifier).appendToLastMessage(status); + } + } + } + } + } + final content = delta['content']?.toString() ?? ''; + if (content.isNotEmpty) { + ref.read(chatMessagesProvider.notifier).appendToLastMessage(content); + } } } } - if (payload.containsKey('content')) { + if (!suppressSocketContent && payload.containsKey('content')) { final content = payload['content']?.toString() ?? ''; if (content.isNotEmpty) { final msgs = ref.read(chatMessagesProvider); if (msgs.isNotEmpty && msgs.last.role == 'assistant') { final prev = msgs.last.content; - if (prev == '[TYPING_INDICATOR]') { + if (prev.isEmpty || prev == '[TYPING_INDICATOR]') { ref .read(chatMessagesProvider.notifier) .replaceLastMessageContent(content); @@ -1105,8 +1146,9 @@ Future _sendMessageInternal( } } if (payload['done'] == true) { - ref.read(chatMessagesProvider.notifier).finishStreaming(); - socketService.offChatEvents(); + // Do not force finish here to avoid cutting off active streams. + // Just stop listening to further socket events for this session. + try { socketService.offChatEvents(); } catch (_) {} } } } @@ -1384,6 +1426,12 @@ Future _sendMessageInternal( onDone: () async { // Unregister from persistent service persistentService.unregisterStream(streamId); + // Stop socket events now that streaming finished only for SSE-driven streams + if (socketService != null && suppressSocketContent == true) { + try { socketService.offChatEvents(); } catch (_) {} + } + // Allow socket content again for future sessions (harmless if already false) + suppressSocketContent = false; // Mark streaming as complete immediately for better UX ref.read(chatMessagesProvider.notifier).finishStreaming(); @@ -1425,17 +1473,24 @@ Future _sendMessageInternal( } // Send chat completed notification to OpenWebUI first + // Fire-and-forget with a short timeout; non-critical endpoint try { - await api.sendChatCompleted( - chatId: activeConversation.id, - messageId: assistantMessageId, // Use message ID from response - messages: formattedMessages, - model: selectedModel.id, - modelItem: modelItem, // Include model metadata - sessionId: sessionId, // Include session ID + unawaited( + api + .sendChatCompleted( + chatId: activeConversation.id, + messageId: + assistantMessageId, // Use message ID from response + messages: formattedMessages, + model: selectedModel.id, + modelItem: modelItem, // Include model metadata + sessionId: sessionId, // Include session ID + ) + .timeout(const Duration(seconds: 3)) + .catchError((_) {}), ); - } catch (e) { - // Continue even if this fails - it's non-critical + } catch (_) { + // Ignore } // Fetch the latest conversation state @@ -1452,71 +1507,6 @@ Future _sendMessageInternal( updatedConv.title != 'New Chat' && updatedConv.title.isNotEmpty; - // Always combine current local messages with updated server content - final currentMessages = ref.read(chatMessagesProvider); - final serverMessages = updatedConv.messages; - - // Create a map of server messages by ID for quick lookup - final serverMessageMap = {}; - for (final serverMsg in serverMessages) { - serverMessageMap[serverMsg.id] = serverMsg; - } - - // Update local messages with server content while preserving all messages - final updatedMessages = []; - final lastLocal = currentMessages.isNotEmpty ? currentMessages.last : null; - for (final localMsg in currentMessages) { - final serverMsg = serverMessageMap[localMsg.id]; - - if (serverMsg != null && serverMsg.content.isNotEmpty && - lastLocal != null && localMsg.id == lastLocal.id && - localMsg.role == 'assistant') { - // Prefer non-disruptive merge to avoid flashing typing indicator - final oldContent = localMsg.content; - final newContent = serverMsg.content; - - if (oldContent.trim().isEmpty || oldContent == '[TYPING_INDICATOR]') { - // Direct replacement without toggling streaming - ref - .read(chatMessagesProvider.notifier) - .replaceLastMessageContent(newContent); - ref.read(chatMessagesProvider.notifier).finishStreaming(); - updatedMessages.add( - localMsg.copyWith(content: newContent, isStreaming: false), - ); - } else if (newContent == oldContent) { - // Already in sync - updatedMessages.add(localMsg.copyWith(isStreaming: false)); - } else if (newContent.startsWith(oldContent)) { - // Append only the delta - final delta = newContent.substring(oldContent.length); - if (delta.isNotEmpty) { - ref.read(chatMessagesProvider.notifier).appendToLastMessage(delta); - } - ref.read(chatMessagesProvider.notifier).finishStreaming(); - updatedMessages.add( - localMsg.copyWith(content: newContent, isStreaming: false), - ); - } else { - // Fallback: replace full content without re-streaming - ref - .read(chatMessagesProvider.notifier) - .replaceLastMessageContent(newContent); - ref.read(chatMessagesProvider.notifier).finishStreaming(); - updatedMessages.add( - localMsg.copyWith(content: newContent, isStreaming: false), - ); - } - } else { - // Keep local message as-is (strip typing indicator if it slipped through) - if (localMsg.content == '[TYPING_INDICATOR]') { - updatedMessages.add(localMsg.copyWith(content: '', isStreaming: false)); - } else { - updatedMessages.add(localMsg); - } - } - } - if (shouldUpdateTitle) { // Ensure the title is reasonable (not too long) final cleanTitle = updatedConv.title.length > 100 @@ -1526,21 +1516,14 @@ Future _sendMessageInternal( // Update the conversation with title and combined messages final updatedConversation = activeConversation.copyWith( title: cleanTitle, - messages: updatedMessages, // Use combined messages! updatedAt: DateTime.now(), ); ref.read(activeConversationProvider.notifier).state = updatedConversation; } else { - // Update just the messages without changing title - final updatedConversation = activeConversation.copyWith( - messages: updatedMessages, // Use combined messages! - updatedAt: DateTime.now(), - ); - - ref.read(activeConversationProvider.notifier).state = - updatedConversation; + // Keep local messages and only refresh conversations list + ref.invalidate(conversationsProvider); } // Streaming already marked as complete when stream ended @@ -1566,6 +1549,10 @@ Future _sendMessageInternal( onError: (error) { // Mark streaming as complete on error ref.read(chatMessagesProvider.notifier).finishStreaming(); + // Stop socket events to avoid duplicates after error (only for SSE-driven) + if (socketService != null && suppressSocketContent == true) { + try { socketService.offChatEvents(); } catch (_) {} + } // Special handling for Socket.IO streaming failures // These indicate the server generated a response but we couldn't stream it diff --git a/openwebui-src b/openwebui-src new file mode 160000 index 0000000..2407d9b --- /dev/null +++ b/openwebui-src @@ -0,0 +1 @@ +Subproject commit 2407d9b905978d68619bdce4021e424046ec8df9