fix: socket issues

This commit is contained in:
cogwheel0
2025-09-01 16:47:41 +05:30
parent d801fe9371
commit 530c0d0efb

View File

@@ -14,6 +14,8 @@ import '../../../core/services/persistent_streaming_service.dart';
import '../../../core/utils/debug_logger.dart';
import '../services/reviewer_mode_service.dart';
const bool kSocketVerboseLogging = false;
// Chat messages for current conversation
final chatMessagesProvider =
StateNotifierProvider<ChatMessagesNotifier, List<ChatMessage>>((ref) {
@@ -1092,6 +1094,12 @@ Future<void> _sendMessageInternal(
final fn = call['function'];
final name = (fn is Map && fn['name'] is String) ? fn['name'] as String : null;
if (name is String && name.isNotEmpty) {
final msgs = ref.read(chatMessagesProvider);
final exists = (msgs.isNotEmpty) && RegExp(
r'<details\s+type=\"tool_calls\"[^>]*\bname=\"' + RegExp.escape(name) + r'\"',
multiLine: true,
).hasMatch(msgs.last.content);
if (!exists) {
final status = '\n<details type="tool_calls" done="false" name="$name"><summary>Executing...</summary>\n</details>\n';
ref.read(chatMessagesProvider.notifier).appendToLastMessage(status);
}
@@ -1099,6 +1107,7 @@ Future<void> _sendMessageInternal(
}
}
}
}
if (!suppressSocketContent && payload.containsKey('choices')) {
final choices = payload['choices'];
if (choices is List && choices.isNotEmpty) {
@@ -1114,6 +1123,12 @@ Future<void> _sendMessageInternal(
final fn = call['function'];
final name = (fn is Map && fn['name'] is String) ? fn['name'] as String : null;
if (name is String && name.isNotEmpty) {
final msgs = ref.read(chatMessagesProvider);
final exists = (msgs.isNotEmpty) && RegExp(
r'<details\s+type=\"tool_calls\"[^>]*\bname=\"' + RegExp.escape(name) + r'\"',
multiLine: true,
).hasMatch(msgs.last.content);
if (!exists) {
final status = '\n<details type="tool_calls" done="false" name="$name"><summary>Executing...</summary>\n</details>\n';
ref.read(chatMessagesProvider.notifier).appendToLastMessage(status);
}
@@ -1121,6 +1136,7 @@ Future<void> _sendMessageInternal(
}
}
}
}
final content = delta['content']?.toString() ?? '';
if (content.isNotEmpty) {
ref.read(chatMessagesProvider.notifier).appendToLastMessage(content);
@@ -1158,6 +1174,25 @@ Future<void> _sendMessageInternal(
// Stop listening to further socket events for this session.
try { socketService.offChatEvents(); } catch (_) {}
// Notify server that chat is completed (mirrors web client)
try {
final apiSvc = ref.read(apiServiceProvider);
final chatId = activeConversation?.id ?? '';
if (apiSvc != null && chatId.isNotEmpty) {
unawaited(apiSvc
.sendChatCompleted(
chatId: chatId,
messageId: assistantMessageId,
messages: const [],
model: selectedModel.id,
modelItem: modelItem,
sessionId: sessionId,
)
.timeout(const Duration(seconds: 3))
.catchError((_) {}));
}
} catch (_) {}
// If no content was rendered yet, fetch final assistant message from server
final msgs = ref.read(chatMessagesProvider);
if (msgs.isNotEmpty && msgs.last.role == 'assistant') {
@@ -1253,6 +1288,16 @@ Future<void> _sendMessageInternal(
if (s == '[DONE]' || s == 'DONE') {
socketService.offEvent(channel);
// Channel completed
try {
unawaited(api.sendChatCompleted(
chatId: activeConversation?.id ?? '',
messageId: assistantMessageId,
messages: const [],
model: selectedModel.id,
modelItem: modelItem,
sessionId: sessionId,
));
} catch (_) {}
ref.read(chatMessagesProvider.notifier).finishStreaming();
return;
}
@@ -1260,6 +1305,16 @@ Future<void> _sendMessageInternal(
final dataStr = s.substring(5).trim();
if (dataStr == '[DONE]') {
socketService.offEvent(channel);
try {
unawaited(api.sendChatCompleted(
chatId: activeConversation?.id ?? '',
messageId: assistantMessageId,
messages: const [],
model: selectedModel.id,
modelItem: modelItem,
sessionId: sessionId,
));
} catch (_) {}
ref.read(chatMessagesProvider.notifier).finishStreaming();
return;
}
@@ -1279,7 +1334,9 @@ Future<void> _sendMessageInternal(
}
// Surface tool_calls status
if (delta.containsKey('tool_calls')) {
if (kSocketVerboseLogging) {
DebugLogger.stream('Socket [' + channel + '] delta.tool_calls detected');
}
final tc = delta['tool_calls'];
if (tc is List) {
for (final call in tc) {
@@ -1289,6 +1346,12 @@ Future<void> _sendMessageInternal(
? fn['name'] as String
: null;
if (name is String && name.isNotEmpty) {
final msgs = ref.read(chatMessagesProvider);
final exists = (msgs.isNotEmpty) && RegExp(
r'<details\\s+type=\"tool_calls\"[^>]*\\bname=\"' + RegExp.escape(name) + r'\"',
multiLine: true,
).hasMatch(msgs.last.content);
if (!exists) {
final status = '\n<details type="tool_calls" done="false" name="$name"><summary>Executing...</summary>\n</details>\n';
ref.read(chatMessagesProvider.notifier).appendToLastMessage(status);
}
@@ -1296,6 +1359,7 @@ Future<void> _sendMessageInternal(
}
}
}
}
// Append streamed content
final content = delta['content']?.toString() ?? '';
if (content.isNotEmpty) {
@@ -1320,6 +1384,16 @@ Future<void> _sendMessageInternal(
final done = line['done'] == true;
if (done) {
socketService.offEvent(channel);
try {
unawaited(api.sendChatCompleted(
chatId: activeConversation?.id ?? '',
messageId: assistantMessageId,
messages: const [],
model: selectedModel.id,
modelItem: modelItem,
sessionId: sessionId,
));
} catch (_) {}
ref.read(chatMessagesProvider.notifier).finishStreaming();
return;
}