refactor: migrate to riverpod 3

This commit is contained in:
cogwheel0
2025-09-21 22:31:44 +05:30
parent 37e5633c5c
commit 462bf4cde2
20 changed files with 834 additions and 453 deletions

View File

@@ -10,18 +10,23 @@ import 'outbound_task.dart';
import 'task_worker.dart';
final taskQueueProvider =
StateNotifierProvider<TaskQueueNotifier, List<OutboundTask>>((ref) {
return TaskQueueNotifier(ref);
});
class TaskQueueNotifier extends StateNotifier<List<OutboundTask>> {
TaskQueueNotifier(this._ref) : super(const []) {
_load();
}
NotifierProvider<TaskQueueNotifier, List<OutboundTask>>(
TaskQueueNotifier.new,
);
class TaskQueueNotifier extends Notifier<List<OutboundTask>> {
static const _prefsKey = 'outbound_task_queue_v1';
final Ref _ref;
final _uuid = const Uuid();
bool _bootstrapScheduled = false;
@override
List<OutboundTask> build() {
if (!_bootstrapScheduled) {
_bootstrapScheduled = true;
Future.microtask(_load);
}
return const [];
}
bool _processing = false;
final Set<String> _activeThreads = <String>{};
@@ -29,19 +34,24 @@ class TaskQueueNotifier extends StateNotifier<List<OutboundTask>> {
Future<void> _load() async {
try {
final prefs = _ref.read(sharedPreferencesProvider);
final prefs = ref.read(sharedPreferencesProvider);
final jsonStr = prefs.getString(_prefsKey);
if (jsonStr == null || jsonStr.isEmpty) return;
final raw = (jsonDecode(jsonStr) as List).cast<Map<String, dynamic>>();
final tasks = raw.map(OutboundTask.fromJson).toList();
// Only restore non-completed tasks
state = tasks
.where((t) => t.status == TaskStatus.queued || t.status == TaskStatus.running)
.map((t) => t.copyWith(
status: TaskStatus.queued,
startedAt: null,
completedAt: null,
))
.where(
(t) =>
t.status == TaskStatus.queued || t.status == TaskStatus.running,
)
.map(
(t) => t.copyWith(
status: TaskStatus.queued,
startedAt: null,
completedAt: null,
),
)
.toList();
// Kick processing after load
_process();
@@ -52,7 +62,7 @@ class TaskQueueNotifier extends StateNotifier<List<OutboundTask>> {
Future<void> _save() async {
try {
final prefs = _ref.read(sharedPreferencesProvider);
final prefs = ref.read(sharedPreferencesProvider);
final raw = state.map((t) => t.toJson()).toList(growable: false);
await prefs.setString(_prefsKey, jsonEncode(raw));
} catch (e) {
@@ -87,10 +97,7 @@ class TaskQueueNotifier extends StateNotifier<List<OutboundTask>> {
state = [
for (final t in state)
if (t.id == id)
t.copyWith(
status: TaskStatus.cancelled,
completedAt: DateTime.now(),
)
t.copyWith(status: TaskStatus.cancelled, completedAt: DateTime.now())
else
t,
];
@@ -102,10 +109,7 @@ class TaskQueueNotifier extends StateNotifier<List<OutboundTask>> {
for (final t in state)
if ((t.maybeConversationId ?? '') == conversationId &&
(t.status == TaskStatus.queued || t.status == TaskStatus.running))
t.copyWith(
status: TaskStatus.cancelled,
completedAt: DateTime.now(),
)
t.copyWith(status: TaskStatus.cancelled, completedAt: DateTime.now())
else
t,
];
@@ -177,7 +181,9 @@ class TaskQueueNotifier extends StateNotifier<List<OutboundTask>> {
// Pump while there is capacity and queued tasks remain
while (true) {
// Filter runnable tasks
final queued = state.where((t) => t.status == TaskStatus.queued).toList();
final queued = state
.where((t) => t.status == TaskStatus.queued)
.toList();
if (queued.isEmpty) break;
// Respect parallelism and one-per-thread
@@ -202,18 +208,23 @@ class TaskQueueNotifier extends StateNotifier<List<OutboundTask>> {
state = [
for (final t in state)
if (t.id == next.id)
next.copyWith(status: TaskStatus.running, startedAt: DateTime.now())
next.copyWith(
status: TaskStatus.running,
startedAt: DateTime.now(),
)
else
t,
];
await _save();
// Launch worker
unawaited(_run(next).whenComplete(() {
_activeThreads.remove(threadKey);
// After a task completes, try to schedule more
_process();
}));
unawaited(
_run(next).whenComplete(() {
_activeThreads.remove(threadKey);
// After a task completes, try to schedule more
_process();
}),
);
}
} finally {
_processing = false;
@@ -222,11 +233,14 @@ class TaskQueueNotifier extends StateNotifier<List<OutboundTask>> {
Future<void> _run(OutboundTask task) async {
try {
await TaskWorker(_ref).perform(task);
await TaskWorker(ref).perform(task);
state = [
for (final t in state)
if (t.id == task.id)
t.copyWith(status: TaskStatus.succeeded, completedAt: DateTime.now())
t.copyWith(
status: TaskStatus.succeeded,
completedAt: DateTime.now(),
)
else
t,
];