@@ -60,29 +60,12 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
6060 let ( taskStream, taskStreamContinuation) = AsyncStream< ActorTask> . makeStream( )
6161 self . taskStreamContinuation = taskStreamContinuation
6262
63- func beginExecuting(
64- _ operation: sending @escaping ( isolated ActorType) async -> Void ,
65- in context: isolated ActorType,
66- priority: TaskPriority ?
67- ) {
68- // In Swift 6, a `Task` enqueued from an actor begins executing immediately on that actor.
69- // Since we're running on our actor's context already, we can just dispatch a Task to get first-enqueued-first-start task execution.
70- Task ( priority: priority) {
71- await operation ( context)
72- }
73- }
74-
7563 Task {
7664 // In an ideal world, we would isolate this `for await` loop to the `ActorType`.
7765 // However, there's no good way to do that without retaining the actor and creating a cycle.
7866 for await actorTask in taskStream {
7967 // Await switching to the ActorType context.
80- await beginExecuting (
81- actorTask. task,
82- in: actorTask. executionContext,
83- priority: actorTask. priority
84- )
85- await actorTask. sempahore. signal ( )
68+ await actorTask. task ( actorTask. executionContext)
8669 }
8770 }
8871 }
@@ -120,17 +103,13 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
120103 fileprivate struct ActorTask : Sendable {
121104 init (
122105 executionContext: ActorType ,
123- priority: TaskPriority ? ,
124106 task: @escaping @Sendable ( isolated ActorType) async -> Void
125107 ) {
126108 self . executionContext = executionContext
127- self . priority = priority
128109 self . task = task
129110 }
130-
111+
131112 let executionContext : ActorType
132- let sempahore = Semaphore ( )
133- let priority : TaskPriority ?
134113 let task : @Sendable ( isolated ActorType) async -> Void
135114 }
136115
@@ -177,17 +156,25 @@ extension Task {
177156 operation: @Sendable @escaping ( isolated ActorType) async -> Success
178157 ) where Failure == Never {
179158 let delivery = Delivery < Success , Failure > ( )
159+ let semaphore = Semaphore ( )
180160 let task = ActorQueue< ActorType> . ActorTask(
181161 executionContext: actorQueue. executionContext,
182- priority: priority,
183162 task: { executionContext in
184- await delivery. sendValue ( operation ( executionContext) )
163+ await semaphore. wait ( )
164+ delivery. execute ( { @Sendable executionContext in
165+ await delivery. sendValue ( operation ( executionContext) )
166+ } , in: executionContext, priority: priority)
185167 }
186168 )
187169 actorQueue. taskStreamContinuation. yield ( task)
188170 self . init ( priority: priority) {
189- await task. sempahore. wait ( )
190- return await delivery. getValue ( )
171+ await withTaskCancellationHandler (
172+ operation: {
173+ await semaphore. signal ( )
174+ return await delivery. getValue ( )
175+ } ,
176+ onCancel: delivery. cancel
177+ )
191178 }
192179 }
193180
@@ -224,22 +211,29 @@ extension Task {
224211 operation: @escaping @Sendable ( isolated ActorType) async throws -> Success
225212 ) where Failure == any Error {
226213 let delivery = Delivery < Success , Failure > ( )
214+ let semaphore = Semaphore ( )
227215 let task = ActorQueue< ActorType> . ActorTask(
228216 executionContext: actorQueue. executionContext,
229- priority: priority,
230217 task: { executionContext in
231- do {
232- try await delivery. sendValue ( operation ( executionContext) )
233- } catch {
234- await delivery. sendFailure ( error)
235- }
218+ await semaphore. wait ( )
219+ delivery. execute ( { @Sendable executionContext in
220+ do {
221+ try await delivery. sendValue ( operation ( executionContext) )
222+ } catch {
223+ await delivery. sendFailure ( error)
224+ }
225+ } , in: executionContext, priority: priority)
236226 }
237227 )
238-
239228 actorQueue. taskStreamContinuation. yield ( task)
240229 self . init ( priority: priority) {
241- await task. sempahore. wait ( )
242- return try await delivery. getValue ( )
230+ try await withTaskCancellationHandler (
231+ operation: {
232+ await semaphore. signal ( )
233+ return try await delivery. getValue ( )
234+ } ,
235+ onCancel: delivery. cancel
236+ )
243237 }
244238 }
245239
@@ -276,17 +270,25 @@ extension Task {
276270 operation: @MainActor @escaping ( ) async -> Success
277271 ) where Failure == Never {
278272 let delivery = Delivery < Success , Failure > ( )
273+ let semaphore = Semaphore ( )
279274 let task = ActorQueue< MainActor> . ActorTask(
280275 executionContext: actorQueue. executionContext,
281- priority: priority,
282276 task: { executionContext in
283- await delivery. sendValue ( operation ( ) )
277+ await semaphore. wait ( )
278+ delivery. execute ( { @Sendable executionContext in
279+ await delivery. sendValue ( operation ( ) )
280+ } , in: executionContext, priority: priority)
284281 }
285282 )
286283 actorQueue. taskStreamContinuation. yield ( task)
287284 self . init ( priority: priority) {
288- await task. sempahore. wait ( )
289- return await delivery. getValue ( )
285+ return await withTaskCancellationHandler (
286+ operation: {
287+ await semaphore. signal ( )
288+ return await delivery. getValue ( )
289+ } ,
290+ onCancel: delivery. cancel
291+ )
290292 }
291293 }
292294
@@ -323,22 +325,29 @@ extension Task {
323325 operation: @escaping @MainActor ( ) async throws -> Success
324326 ) where Failure == any Error {
325327 let delivery = Delivery < Success , Failure > ( )
328+ let semaphore = Semaphore ( )
326329 let task = ActorQueue< MainActor> . ActorTask(
327330 executionContext: actorQueue. executionContext,
328- priority: priority,
329331 task: { executionContext in
330- do {
331- try await delivery. sendValue ( operation ( ) )
332- } catch {
333- await delivery. sendFailure ( error)
334- }
332+ await semaphore. wait ( )
333+ delivery. execute ( { @Sendable executionContext in
334+ do {
335+ try await delivery. sendValue ( operation ( ) )
336+ } catch {
337+ await delivery. sendFailure ( error)
338+ }
339+ } , in: executionContext, priority: priority)
335340 }
336341 )
337-
338342 actorQueue. taskStreamContinuation. yield ( task)
339343 self . init ( priority: priority) {
340- await task. sempahore. wait ( )
341- return try await delivery. getValue ( )
344+ try await withTaskCancellationHandler (
345+ operation: {
346+ await semaphore. signal ( )
347+ return try await delivery. getValue ( )
348+ } ,
349+ onCancel: delivery. cancel
350+ )
342351 }
343352 }
344353}
0 commit comments