feat: proper socket implementation

This commit is contained in:
cogwheel0
2025-08-31 14:02:44 +05:30
parent e7494d0408
commit e752a27781
8 changed files with 1321 additions and 160 deletions

View File

@@ -20,6 +20,7 @@ import '../models/file_info.dart';
import '../models/knowledge_base.dart';
import '../services/settings_service.dart';
import '../services/optimized_storage_service.dart';
import '../services/socket_service.dart';
import '../utils/debug_logger.dart';
// Storage providers
@@ -188,6 +189,27 @@ final apiServiceProvider = Provider<ApiService?>((ref) {
);
});
// Socket.IO service provider
final socketServiceProvider = Provider<SocketService?>((ref) {
final reviewerMode = ref.watch(reviewerModeProvider);
if (reviewerMode) return null;
final activeServer = ref.watch(activeServerProvider);
final token = ref.watch(authTokenProvider3);
return activeServer.maybeWhen(
data: (server) {
if (server == null) return null;
final s = SocketService(serverConfig: server, authToken: token);
// best-effort connect; errors handled internally
// ignore unawaited_futures
s.connect();
return s;
},
orElse: () => null,
);
});
// Attachment upload queue provider
final attachmentUploadQueueProvider = Provider<AttachmentUploadQueue?>((ref) {
final api = ref.watch(apiServiceProvider);

View File

@@ -15,6 +15,7 @@ import '../auth/api_auth_interceptor.dart';
import '../validation/validation_interceptor.dart';
import '../error/api_error_interceptor.dart';
import 'sse_parser.dart';
// Tool-call details are parsed in the UI layer to render collapsible blocks
import 'stream_recovery_service.dart';
import 'persistent_streaming_service.dart';
import '../utils/debug_logger.dart';
@@ -2355,12 +2356,17 @@ class ApiService {
bool enableWebSearch = false,
bool enableImageGeneration = false,
Map<String, dynamic>? modelItem,
String? sessionIdOverride,
List<Map<String, dynamic>>? toolServers,
Map<String, dynamic>? backgroundTasks,
}) {
final streamController = StreamController<String>();
// Generate unique IDs
final messageId = const Uuid().v4();
final sessionId = const Uuid().v4().substring(0, 20);
final sessionId = (sessionIdOverride != null && sessionIdOverride.isNotEmpty)
? sessionIdOverride
: const Uuid().v4().substring(0, 20);
// NOTE: Previously used to branch for Gemini-specific handling; not needed now.
@@ -2455,6 +2461,23 @@ class ApiService {
if (toolIds != null && toolIds.isNotEmpty) {
data['tool_ids'] = toolIds;
debugPrint('DEBUG: Including tool_ids in SSE request: $toolIds');
// Hint server to use native function calling when tools are selected
// This enables provider-native tool execution paths and consistent UI events
try {
final params = (data['params'] as Map<String, dynamic>? ) ?? <String, dynamic>{};
params['function_calling'] = 'native';
data['params'] = params;
debugPrint('DEBUG: Set params.function_calling = native');
} catch (_) {
// Non-fatal; continue without forcing native mode
}
}
// Include tool_servers if provided (for native function calling with OpenAPI servers)
if (toolServers != null && toolServers.isNotEmpty) {
data['tool_servers'] = toolServers;
debugPrint('DEBUG: Including tool_servers in request (${toolServers.length})');
}
// Include non-image files at the top level as expected by Open WebUI
@@ -2482,8 +2505,53 @@ class ApiService {
debugPrint('DEBUG: session_id value: ${data['session_id']}');
debugPrint('DEBUG: id value: ${data['id']}');
// Use SSE streaming with proper parser
_streamSSE(data, streamController, messageId);
// If tools are requested, use background task flow to allow server-side execution.
// Open WebUI executes tools and continues the response outside of the
// provider SSE. That path requires background task mode (session_id + id + chat_id).
if (conversationId != null) {
// Attach identifiers to trigger background task processing on the server
data['session_id'] = sessionId;
data['id'] = messageId;
data['chat_id'] = conversationId;
// Attach background_tasks if provided
if (backgroundTasks != null && backgroundTasks.isNotEmpty) {
data['background_tasks'] = backgroundTasks;
}
debugPrint('DEBUG: Initiating background tools flow (task-based)');
debugPrint('DEBUG: Posting to /api/chat/completions (no SSE)');
// Fire in background; poll chat for updates and stream deltas to UI
() async {
try {
final resp = await _dio.post('/api/chat/completions', data: data);
final respData = resp.data;
final taskId = (respData is Map) ? (respData['task_id']?.toString()) : null;
debugPrint('DEBUG: Background task created: $taskId');
// If no session/socket provided, fall back to polling for updates.
if (sessionIdOverride == null || sessionIdOverride.isEmpty) {
await _pollChatForMessageUpdates(
chatId: conversationId!,
messageId: messageId,
streamController: streamController,
);
} else {
// Close the controller so listeners don't hang waiting for chunks
if (!streamController.isClosed) {
streamController.close();
}
}
} catch (e) {
debugPrint('DEBUG: Background tools flow failed: $e');
if (!streamController.isClosed) streamController.close();
}
}();
} else {
// Use SSE streaming with proper parser
_streamSSE(data, streamController, messageId);
}
return (
stream: streamController.stream,
@@ -2492,6 +2560,189 @@ class ApiService {
);
}
// Poll the server chat until the assistant message is populated with tool results,
// then stream deltas to the UI and close.
Future<void> _pollChatForMessageUpdates({
required String chatId,
required String messageId,
required StreamController<String> streamController,
}) async {
String last = '';
final started = DateTime.now();
bool containsDone(String s) =>
s.contains('<details type="tool_calls"') && s.contains('done="true"');
while (DateTime.now().difference(started).inSeconds < 60) {
try {
// Small delay between polls
await Future.delayed(const Duration(milliseconds: 900));
final resp = await _dio.get('/api/v1/chats/$chatId');
final data = resp.data as Map<String, dynamic>;
// Locate assistant content from multiple shapes
String content = '';
Map<String, dynamic>? chatObj =
(data['chat'] is Map<String, dynamic>) ? data['chat'] as Map<String, dynamic> : null;
// 1) Preferred: chat.messages (list) try exact id first
if (chatObj != null && chatObj['messages'] is List) {
final List messagesList = chatObj['messages'] as List;
final target = messagesList.firstWhere(
(m) => (m is Map && (m['id']?.toString() == messageId)),
orElse: () => null,
);
if (target != null) {
final rawContent = (target as Map)['content'];
if (rawContent is List) {
final textItem = rawContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content = textItem['text']?.toString() ?? '';
}
} else if (rawContent is String) {
content = rawContent;
}
}
}
// 2) Fallback: chat.history.messages (map) try exact id
if (content.isEmpty && chatObj != null) {
final history = chatObj['history'];
if (history is Map && history['messages'] is Map) {
final Map<String, dynamic> messagesMap =
(history['messages'] as Map).cast<String, dynamic>();
final msg = messagesMap[messageId];
if (msg is Map) {
final rawContent = msg['content'];
if (rawContent is String) {
content = rawContent;
} else if (rawContent is List) {
final textItem = rawContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content = textItem['text']?.toString() ?? '';
}
}
}
}
}
// 3) Last resort: top-level messages (list) try exact id
if (content.isEmpty && data['messages'] is List) {
final List topMessages = data['messages'] as List;
final target = topMessages.firstWhere(
(m) => (m is Map && (m['id']?.toString() == messageId)),
orElse: () => null,
);
if (target != null) {
final rawContent = (target as Map)['content'];
if (rawContent is String) {
content = rawContent;
} else if (rawContent is List) {
final textItem = rawContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content = textItem['text']?.toString() ?? '';
}
}
}
}
// 4) If nothing found by id, fall back to the latest assistant message
if (content.isEmpty) {
// Prefer chat.messages list
if (chatObj != null && chatObj['messages'] is List) {
final List messagesList = chatObj['messages'] as List;
// Find last assistant
for (int i = messagesList.length - 1; i >= 0; i--) {
final m = messagesList[i];
if (m is Map && (m['role']?.toString() == 'assistant')) {
final rawContent = m['content'];
if (rawContent is String) {
content = rawContent;
} else if (rawContent is List) {
final textItem = rawContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content = textItem['text']?.toString() ?? '';
}
}
if (content.isNotEmpty) break;
}
}
}
// Try history map if still empty
if (content.isEmpty && chatObj != null) {
final history = chatObj['history'];
if (history is Map && history['messages'] is Map) {
final Map<dynamic, dynamic> msgMapDyn = history['messages'] as Map;
// Iterate by values; no guaranteed ordering, but often sufficient
for (final entry in msgMapDyn.values) {
if (entry is Map && (entry['role']?.toString() == 'assistant')) {
final rawContent = entry['content'];
if (rawContent is String) {
content = rawContent;
} else if (rawContent is List) {
final textItem = rawContent.firstWhere(
(i) => i is Map && i['type'] == 'text',
orElse: () => null,
);
if (textItem != null) {
content = textItem['text']?.toString() ?? '';
}
}
if (content.isNotEmpty) break;
}
}
}
}
}
if (content.isEmpty) {
continue;
}
// Stream only the delta when content grows monotonically
if (content.startsWith(last)) {
final delta = content.substring(last.length);
if (delta.isNotEmpty && !streamController.isClosed) {
streamController.add(delta);
}
} else {
// Fallback: replace entire content by emitting a separator + full content
if (!streamController.isClosed) {
streamController.add('\n');
streamController.add(content);
}
}
last = content;
// Stop when we detect done=true on tool_calls or when content stabilizes
if (containsDone(content)) {
break;
}
} catch (e) {
// Ignore transient errors and continue polling
}
}
if (!streamController.isClosed) {
streamController.close();
}
}
// SSE streaming with persistent background support - Main Implementation
void _streamSSE(
Map<String, dynamic> data,
@@ -2873,6 +3124,26 @@ class ApiService {
// We do NOT return here; model can send content alongside reasoning later
}
// 1a) Surface tool call deltas as lightweight status updates
// Some providers stream tool_calls without content; show a hint so UI isn't stuck
if (delta.containsKey('tool_calls')) {
final tc = delta['tool_calls'];
if (tc is List) {
for (final call in tc) {
if (call is Map<String, dynamic>) {
final fn = call['function'];
final name = (fn is Map && fn['name'] is String) ? fn['name'] as String : null;
if (name is String && name.isNotEmpty) {
final status = '\n<details type="tool_calls" done="false" name="$name"><summary>Executing...</summary>\n</details>\n';
if (!streamController.isClosed) {
streamController.add(status);
}
}
}
}
}
}
// Extract content
if (delta.containsKey('content')) {
final content = delta['content'] as String?;
@@ -2904,17 +3175,19 @@ class ApiService {
debugPrint(
'Persistent: Stream finished with reason: $finishReason',
);
// Ensure reasoning block is closed when finishing
_closeReasoningBlockIfOpen(streamController, persistentStreamId);
if (!streamController.isClosed) {
streamController.close();
// Do NOT close on tool_calls; server will continue with tool execution updates
if (finishReason != 'tool_calls') {
_closeReasoningBlockIfOpen(streamController, persistentStreamId);
if (!streamController.isClosed) {
streamController.close();
}
return;
}
return;
}
} else if (choice.containsKey('finish_reason')) {
// Check for completion at choice level
final finishReason = choice['finish_reason'];
if (finishReason != null) {
if (finishReason != null && finishReason != 'tool_calls') {
debugPrint(
'Persistent: Stream finished with reason: $finishReason',
);
@@ -2969,14 +3242,103 @@ class ApiService {
_closeReasoningBlockIfOpen(streamController, persistentStreamId);
if (!streamController.isClosed) {
streamController.add(content);
// Emit only the delta when server sends cumulative content
try {
final meta =
persistentService.getStreamMetadata(persistentStreamId);
final last = (meta != null && meta['lastContent'] is String)
? (meta['lastContent'] as String)
: '';
String toEmit;
if (content.startsWith(last)) {
toEmit = content.substring(last.length);
} else {
// Fallback: emit suffix after longest common prefix
int i = 0;
final minLen = last.length < content.length
? last.length
: content.length;
while (i < minLen && last.codeUnitAt(i) == content.codeUnitAt(i)) {
i++;
}
toEmit = content.substring(i);
}
if (toEmit.isNotEmpty && !streamController.isClosed) {
streamController.add(toEmit);
}
// Update persistent progress with the full content snapshot
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
content: content,
);
} catch (_) {
// Best-effort fallback: append as-is
if (!streamController.isClosed) {
streamController.add(content);
}
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
content: content,
);
}
}
}
}
// Handle Open WebUI aggregated content blocks
// Server emits top-level { content: "...serialized blocks..." } updates
if (json.containsKey('content')) {
final contentVal = json['content'];
if (contentVal is String && contentVal.isNotEmpty) {
// Close reasoning section before appending rich content
_closeReasoningBlockIfOpen(streamController, persistentStreamId);
// Emit only the delta when server sends cumulative content
try {
final meta =
persistentService.getStreamMetadata(persistentStreamId);
final last = (meta != null && meta['lastContent'] is String)
? (meta['lastContent'] as String)
: '';
String toEmit;
if ((contentVal as String).startsWith(last)) {
toEmit = contentVal.substring(last.length);
} else {
// Fallback: emit suffix after longest common prefix
int i = 0;
final s = contentVal as String;
final minLen = last.length < s.length ? last.length : s.length;
while (i < minLen && last.codeUnitAt(i) == s.codeUnitAt(i)) {
i++;
}
toEmit = s.substring(i);
}
if (toEmit.isNotEmpty && !streamController.isClosed) {
streamController.add(toEmit);
}
// Update persistent progress with the full content snapshot
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
content: content, // Full content, not appended
content: contentVal,
);
} catch (_) {
// Best-effort fallback: append as-is
if (!streamController.isClosed) {
streamController.add(contentVal);
}
persistentService.updateStreamProgress(
persistentStreamId,
chunkSequence: chunkSequence,
content: contentVal,
);
}
}

View File

@@ -0,0 +1,82 @@
import 'package:socket_io_client/socket_io_client.dart' as IO;
import 'package:flutter/foundation.dart';
import '../models/server_config.dart';
class SocketService {
final ServerConfig serverConfig;
final String? authToken;
IO.Socket? _socket;
SocketService({required this.serverConfig, required this.authToken});
String? get sessionId => _socket?.id;
IO.Socket? get socket => _socket;
bool get isConnected => _socket?.connected == true;
Future<void> connect({bool force = false}) async {
if (_socket != null && _socket!.connected && !force) return;
try {
_socket?.dispose();
} catch (_) {}
final base = serverConfig.url.replaceFirst(RegExp(r'/+$'), '');
final path = '/ws/socket.io';
_socket = IO.io(
base,
IO.OptionBuilder()
.setTransports(['websocket'])
.setPath(path)
.setExtraHeaders(
authToken != null && authToken!.isNotEmpty
? {
'Authorization': 'Bearer $authToken',
}
: {},
)
.build(),
);
_socket!.on('connect', (_) {
debugPrint('Socket connected: ${_socket!.id}');
if (authToken != null && authToken!.isNotEmpty) {
_socket!.emit('user-join', {
'auth': {'token': authToken}
});
}
});
_socket!.on('connect_error', (err) {
debugPrint('Socket connect_error: $err');
});
_socket!.on('disconnect', (reason) {
debugPrint('Socket disconnected: $reason');
});
}
void onChatEvents(void Function(Map<String, dynamic> event) handler) {
_socket?.on('chat-events', (data) {
try {
if (data is Map<String, dynamic>) {
handler(data);
} else if (data is Map) {
handler(Map<String, dynamic>.from(data));
}
} catch (_) {}
});
}
void offChatEvents() {
_socket?.off('chat-events');
}
void dispose() {
try {
_socket?.dispose();
} catch (_) {}
_socket = null;
}
}

View File

@@ -0,0 +1,227 @@
import 'dart:convert';
/// Parsed representation of one tool call emitted as a `<details type="tool_calls" ...>` block
class ToolCallEntry {
final String id;
final String name;
final bool done;
final dynamic arguments; // decoded JSON when possible, else String
final dynamic result; // decoded JSON when possible, else String
final List<dynamic>? files; // decoded JSON array when present
const ToolCallEntry({
required this.id,
required this.name,
required this.done,
this.arguments,
this.result,
this.files,
});
}
/// Container for extracted tool calls and the remaining main content
class ToolCallsContent {
final List<ToolCallEntry> toolCalls;
final String mainContent;
final String originalContent;
const ToolCallsContent({
required this.toolCalls,
required this.mainContent,
required this.originalContent,
});
}
/// Utility to parse <details type="tool_calls"> blocks from content
class ToolCallsParser {
/// Represents a mixed stream of text and tool-call entries in original order
/// as they appeared in the content.
static List<ToolCallsSegment>? segments(String content) {
if (content.isEmpty || !content.contains('<details')) return null;
final detailsRegex = RegExp(
r'<details\b([^>]*)>\s*<summary>[^<]*<\/summary>\s*<\/details>',
multiLine: true,
dotAll: true,
);
final matches = detailsRegex.allMatches(content).toList();
if (matches.isEmpty) return null;
final segs = <ToolCallsSegment>[];
int lastEnd = 0;
for (final m in matches) {
// Text before this block
if (m.start > lastEnd) {
segs.add(ToolCallsSegment.text(content.substring(lastEnd, m.start)));
}
final fullMatch = m.group(0) ?? '';
final attrs = m.group(1) ?? '';
if (attrs.contains('type="tool_calls"')) {
String? _attr(String name) {
final r = RegExp('$name="([^"]*)"');
final mm = r.firstMatch(attrs);
return mm != null ? _unescapeHtml(mm.group(1) ?? '') : null;
}
final id = _attr('id') ?? '';
final name = _attr('name') ?? 'tool';
final done = (_attr('done') == 'true');
final args = _tryDecodeJson(_attr('arguments'));
final result = _tryDecodeJson(_attr('result'));
final files = _tryDecodeJson(_attr('files'));
final entry = ToolCallEntry(
id: id.isNotEmpty ? id : '${name}_${m.start}',
name: name,
done: done,
arguments: args,
result: result,
files: (files is List) ? files : null,
);
segs.add(ToolCallsSegment.entry(entry));
} else {
// Not a tool_calls block: keep it as text
segs.add(ToolCallsSegment.text(fullMatch));
}
lastEnd = m.end;
}
// Tail text
if (lastEnd < content.length) {
segs.add(ToolCallsSegment.text(content.substring(lastEnd)));
}
return segs;
}
/// Extracts tool call blocks and returns the remaining content with those blocks removed.
static ToolCallsContent? parse(String content) {
if (content.isEmpty || !content.contains('<details')) return null;
final detailsRegex = RegExp(
r'<details\b([^>]*)>\s*<summary>[^<]*<\/summary>\s*<\/details>',
multiLine: true,
dotAll: true,
);
final matches = detailsRegex.allMatches(content).toList();
if (matches.isEmpty) return null;
final calls = <ToolCallEntry>[];
for (final m in matches) {
final attrs = m.group(1) ?? '';
if (!attrs.contains('type="tool_calls"')) continue;
String? _attr(String name) {
final r = RegExp('$name="([^"]*)"');
final mm = r.firstMatch(attrs);
return mm != null ? _unescapeHtml(mm.group(1) ?? '') : null;
}
final id = _attr('id') ?? '';
final name = _attr('name') ?? 'tool';
final done = (_attr('done') == 'true');
final args = _tryDecodeJson(_attr('arguments'));
final result = _tryDecodeJson(_attr('result'));
final files = _tryDecodeJson(_attr('files'));
calls.add(
ToolCallEntry(
id: id.isNotEmpty ? id : '${name}_${m.start}',
name: name,
done: done,
arguments: args,
result: result,
files: (files is List) ? files : null,
),
);
}
if (calls.isEmpty) return null;
final main = content.replaceAll(detailsRegex, '').trim();
return ToolCallsContent(toolCalls: calls, mainContent: main, originalContent: content);
}
/// Legacy helper that summarizes tool blocks to text (kept for fallback)
static String summarize(String content) {
final parsed = parse(content);
if (parsed == null) return content;
final buf = StringBuffer();
for (final c in parsed.toolCalls) {
buf.writeln(c.done ? 'Tool Executed: ${c.name}' : 'Running tool: ${c.name}');
final args = _prettyMaybe(c.arguments, max: 400);
final res = _prettyMaybe(c.result, max: 800);
if (args.isNotEmpty) {
buf.writeln('\nArguments:\n```json');
buf.writeln(args);
buf.writeln('```');
}
if (res.isNotEmpty) {
buf.writeln('\nResult:\n```json');
buf.writeln(res);
buf.writeln('```');
}
buf.writeln();
}
buf.writeln(parsed.mainContent);
return buf.toString().trim();
}
static dynamic _tryDecodeJson(String? raw) {
if (raw == null || raw.trim().isEmpty) return null;
try {
dynamic decoded = json.decode(raw);
if (decoded is String) {
final s = decoded.trim();
if ((s.startsWith('{') && s.endsWith('}')) || (s.startsWith('[') && s.endsWith(']'))) {
try {
decoded = json.decode(s);
} catch (_) {}
}
}
return decoded;
} catch (_) {
return raw;
}
}
static String _prettyMaybe(dynamic value, {int max = 600}) {
if (value == null) return '';
try {
final pretty = const JsonEncoder.withIndent(' ').convert(value);
return pretty.length > max ? pretty.substring(0, max) + '\n' : pretty;
} catch (_) {
final raw = value.toString();
return raw.length > max ? raw.substring(0, max) + '' : raw;
}
}
static String _unescapeHtml(String input) {
return input
.replaceAll('&quot;', '"')
.replaceAll('&#34;', '"')
.replaceAll('&apos;', "'")
.replaceAll('&#39;', "'")
.replaceAll('&lt;', '<')
.replaceAll('&gt;', '>')
.replaceAll('&amp;', '&');
}
}
/// Ordered piece of content: either plain text or a tool-call entry
class ToolCallsSegment {
final String? text;
final ToolCallEntry? entry;
const ToolCallsSegment._({this.text, this.entry});
factory ToolCallsSegment.text(String text) => ToolCallsSegment._(text: text);
factory ToolCallsSegment.entry(ToolCallEntry entry) =>
ToolCallsSegment._(entry: entry);
bool get isToolCall => entry != null;
}