-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue-sketch.diff
More file actions
150 lines (141 loc) · 5.65 KB
/
queue-sketch.diff
File metadata and controls
150 lines (141 loc) · 5.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
diff --git a/TODO.org b/TODO.org
index cd0d7a0..9cc1583 100644
--- a/TODO.org
+++ b/TODO.org
@@ -13,4 +13,6 @@
* TODO написать тесты на работу с nullable аргументами
+* TODO написать тесты на queue
+
* TODO потестить, что работает кейс, когда в start передаётся аргумент равный =null=
diff --git a/sample/src/main/kotlin/ru/kode/newremothing/Main.kt b/sample/src/main/kotlin/ru/kode/newremothing/Main.kt
index 87427bd..72faff5 100644
--- a/sample/src/main/kotlin/ru/kode/newremothing/Main.kt
+++ b/sample/src/main/kotlin/ru/kode/newremothing/Main.kt
@@ -3,6 +3,7 @@ package ru.kode.newremothing
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.CountDownLatch
+import java.util.concurrent.ConcurrentLinkedDeque
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineExceptionHandler
@@ -51,10 +52,11 @@ class TaskHandle<A, R>(
val name: String,
val id: Uuid,
internal val scope: CoroutineScope,
+ internal val queue: DefaultTaskQueue<A>,
internal val body: suspend (A) -> R,
) {
companion object {
- val NIL = TaskHandle<Any?, Any?>("", Uuid.NIL, CoroutineScope(EmptyCoroutineContext)) {}
+ val NIL = TaskHandle<Any?, Any?>("", Uuid.NIL, CoroutineScope(EmptyCoroutineContext), DefaultTaskQueue<Any?>()) {}
}
override fun toString(): String {
@@ -104,9 +106,42 @@ data class TaskState(
@OptIn(ExperimentalUuidApi::class)
interface TaskStateChangeListener {
suspend fun onTaskStateChanged(state: TaskState)
+ suspend fun onTaskQueueEntryScheduled(taskId: Uuid, startHandle: StartHandle, argument: Any)
+
fun onTaskCancelled(taskId: Uuid, startId: Uuid)
}
+interface Queue<A> {
+ fun addFirst(argument: A): Queue<A>
+ fun addLast(argument: A): Queue<A>
+
+ val isEmpty: Boolean
+
+ // TODO add start() here, so that user can manually start queued tasks without first starting task
+ // by normal means (which would trigger queue scheduling on completion). I.e. something like this should work:
+ // (no scheduler.start(handde) prior to this at all)
+ // scheduler.queue(handle).addLast(11)
+ // scheduler.queue(handle).addLast(12)
+ // scheduler.queue(handle).addLast(13)
+ // scheduler.queue(handle).start() OR scheduler.startQueued(handle)
+}
+
+class DefaultTaskQueue<A> : Queue<A> {
+ internal val impl = ConcurrentLinkedDeque<A>()
+
+ override fun addFirst(argument: A): Queue<A> {
+ impl.addFirst(argument)
+ return this
+ }
+
+ override fun addLast(argument: A): Queue<A> {
+ impl.addLast(argument)
+ return this
+ }
+
+ override val isEmpty: Boolean get() = impl.isEmpty()
+}
+
@OptIn(ExperimentalUuidApi::class)
class Scheduler(
val scope: CoroutineScope,
@@ -149,11 +184,16 @@ class Scheduler(
return TaskHandle(
name = name,
id = taskId,
- body = body,
scope = scope,
+ queue = DefaultTaskQueue(),
+ body = body,
)
}
+ fun <A> queue(handle: TaskHandle<A, *>): Queue<A> {
+ return handle.queue
+ }
+
fun <A> start(handle: TaskHandle<A, *>, argument: A): StartHandle {
return startInternal(handle, argument, cancelPrevious = false)
}
@@ -191,6 +231,12 @@ class Scheduler(
taskState[startId] = state
stateChangeListeners.forEach { it.onTaskStateChanged(state) }
}
+ } finally {
+ val nextQueuedArgument = handle.queue.impl.pollFirst()
+ if (nextQueuedArgument != null) {
+ val nextQueuedStartHandle = startInternal(handle, nextQueuedArgument, cancelPrevious)
+ stateChangeListeners.forEach { it.onTaskQueueEntryScheduled(handle.id, nextQueuedStartHandle, nextQueuedArgument) }
+ }
}
}
job.invokeOnCompletion { cause ->
@@ -290,6 +336,10 @@ fun testBasicStart(scheduler: Scheduler) {
}
}
+ override suspend fun onTaskQueueEntryScheduled(taskId: Uuid, startHandle: StartHandle, argument: Any) {
+ println("task queue entry scheduled: startId = ${startHandle.id}, taskId = $taskId, argument = $argument")
+ }
+
// TODO mention in docs that implementation should be fast: copy from invokeOnCompletion docs
override fun onTaskCancelled(taskId: Uuid, startId: Uuid) {
println("task is cancelled: startId = $startId, taskId = $taskId")
@@ -325,6 +375,11 @@ fun testStartLatest(scheduler: Scheduler) {
}
}
+ override suspend fun onTaskQueueEntryScheduled(taskId: Uuid, startHandle: StartHandle, argument: Any) {
+ println("task queue entry scheduled: startId = ${startHandle.id}, taskId = $taskId, argument = $argument")
+ }
+
+
// TODO mention in docs that implementation should be fast: copy from invokeOnCompletion docs
override fun onTaskCancelled(taskId: Uuid, startId: Uuid) {
println("task is cancelled: startId = $startId, taskId = $taskId")
@@ -343,7 +398,6 @@ fun testStartLatest(scheduler: Scheduler) {
scheduler.startLatest(queryHandle, "final-query")
}
latch.await()
-
}
@OptIn(ExperimentalUuidApi::class)
@@ -353,7 +407,7 @@ fun main() {
val scheduler = Scheduler(scope = schedulerScope)
println("ready to start")
- val mode = "startlatest"
+ val mode = "basic"
if (mode == "basic") {
testBasicStart(scheduler)
} else {