Skip to content

Commit aa84eef

Browse files
authored
Create a MainActorQueue (#14)
1 parent f9a4073 commit aa84eef

File tree

5 files changed

+330
-2
lines changed

5 files changed

+330
-2
lines changed

README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,46 @@ func testActorQueueOrdering() async {
129129
}
130130
```
131131

132+
### Sending ordered asynchronous tasks to the `@MainActor` from a nonisolated context
133+
134+
Use a `MainActorQueue` to send ordered asynchronous tasks to the `@MainActor`’s isolated context from nonisolated or synchronous contexts. Tasks sent to this queue type are guaranteed to begin executing in the order in which they are enqueued. Like an `ActorQueue`, execution order is guaranteed only until the first [suspension point](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID639) within the enqueued task. A `MainActorQueue` executes tasks within its adopted actor’s isolated context, resulting in `MainActorQueue` task execution having the same properties as a `@MainActor`'s' code execution: code between suspension points is executed atomically, and tasks sent to a single `MainActorQueue` can await results from the queue without deadlocking.
135+
136+
A `MainActorQueue` can easily execute asynchronous tasks from a nonisolated context in FIFO order:
137+
```swift
138+
@MainActor
139+
func testMainActorQueueOrdering() async {
140+
@MainActor
141+
final class Counter {
142+
nonisolated
143+
func incrementAndAssertCountEquals(_ expectedCount: Int) {
144+
MainActorQueue.shared.enqueue {
145+
self.increment()
146+
let incrementedCount = self.count
147+
XCTAssertEqual(incrementedCount, expectedCount) // always succeeds
148+
}
149+
}
150+
151+
nonisolated
152+
func flushQueue() async {
153+
await MainActorQueue.shared.enqueueAndWait { }
154+
}
155+
156+
func increment() {
157+
count += 1
158+
}
159+
160+
var count = 0
161+
}
162+
163+
let counter = Counter()
164+
for iteration in 1...100 {
165+
counter.incrementAndAssertCountEquals(iteration)
166+
}
167+
// Wait for all enqueued tasks to finish.
168+
await counter.flushQueue()
169+
}
170+
```
171+
132172
## Requirements
133173

134174
* Xcode 14.1 or later.

Sources/AsyncQueue/ActorQueue.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
154154
}
155155

156156
extension Actor {
157-
func suspendUntilStarted(_ task: @escaping @Sendable (isolated Self) async -> Void) async {
157+
fileprivate func suspendUntilStarted(_ task: @escaping @Sendable (isolated Self) async -> Void) async {
158158
// Suspend the calling code until our enqueued task starts.
159159
await withUnsafeContinuation { continuation in
160160
// Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete.
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// MIT License
2+
//
3+
// Copyright (c) 2023 Dan Federman
4+
//
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
/// A queue that enables enqueing ordered asynchronous tasks from a nonisolated context onto the `@MainActor`'s isolated context.
24+
/// Tasks are guaranteed to begin executing in the order in which they are enqueued. However, if a task suspends it will allow subsequently enqueued tasks to begin executing.
25+
/// This queue exhibits the execution behavior of an actor: tasks sent to this queue can re-enter the queue, and tasks may execute in non-FIFO order when a task suspends.
26+
///
27+
/// A `MainActorQueue` ensures tasks sent from a nonisolated context to the `@MainActor`'s isolated context begin execution in order.
28+
public final class MainActorQueue: Sendable {
29+
30+
// MARK: Initialization
31+
32+
/// Instantiates a main actor queue.
33+
init() {
34+
var capturedTaskStreamContinuation: AsyncStream<@Sendable @MainActor () async -> Void>.Continuation? = nil
35+
let taskStream = AsyncStream<@Sendable @MainActor () async -> Void> { continuation in
36+
capturedTaskStreamContinuation = continuation
37+
}
38+
// Continuation will be captured during stream creation, so it is safe to force unwrap here.
39+
// If this force-unwrap fails, something is fundamentally broken in the Swift runtime.
40+
taskStreamContinuation = capturedTaskStreamContinuation!
41+
42+
Task.detached { @MainActor in
43+
for await task in taskStream {
44+
await MainActor.shared.suspendUntilStarted(task)
45+
}
46+
}
47+
}
48+
49+
deinit {
50+
taskStreamContinuation.finish()
51+
}
52+
53+
// MARK: Public
54+
55+
/// The global `MainActorQueue` instance.
56+
public static let shared = MainActorQueue()
57+
58+
/// Schedules an asynchronous task for execution and immediately returns.
59+
/// The scheduled task will not execute until all prior tasks have completed or suspended.
60+
/// - Parameter task: The task to enqueue.
61+
public func enqueue(_ task: @escaping @Sendable @MainActor () async -> Void) {
62+
taskStreamContinuation.yield(task)
63+
}
64+
65+
/// Schedules an asynchronous task and returns after the task is complete.
66+
/// The scheduled task will not execute until all prior tasks have completed or suspended.
67+
/// - Parameter task: The task to enqueue.
68+
/// - Returns: The value returned from the enqueued task.
69+
public func enqueueAndWait<T: Sendable>(_ task: @escaping @Sendable @MainActor () async -> T) async -> T {
70+
await withUnsafeContinuation { continuation in
71+
taskStreamContinuation.yield {
72+
continuation.resume(returning: await task())
73+
}
74+
}
75+
}
76+
77+
/// Schedules an asynchronous throwing task and returns after the task is complete.
78+
/// The scheduled task will not execute until all prior tasks have completed or suspended.
79+
/// - Parameter task: The task to enqueue.
80+
/// - Returns: The value returned from the enqueued task.
81+
public func enqueueAndWait<T: Sendable>(_ task: @escaping @Sendable @MainActor () async throws -> T) async throws -> T {
82+
try await withUnsafeThrowingContinuation { continuation in
83+
taskStreamContinuation.yield {
84+
do {
85+
continuation.resume(returning: try await task())
86+
} catch {
87+
continuation.resume(throwing: error)
88+
}
89+
}
90+
}
91+
}
92+
93+
// MARK: Private
94+
95+
private let taskStreamContinuation: AsyncStream<@Sendable @MainActor () async -> Void>.Continuation
96+
}
97+
98+
extension MainActor {
99+
@MainActor
100+
fileprivate func suspendUntilStarted(_ task: @escaping @Sendable @MainActor () async -> Void) async {
101+
// Suspend the calling code until our enqueued task starts.
102+
await withUnsafeContinuation { continuation in
103+
// Utilize the serial (but not FIFO) @MainActor context to execute the task without requiring the calling method to wait for the task to complete.
104+
Task { @MainActor in
105+
// Signal that the task has started. Since this `task` is executing on the main actor's execution context, the order of execution is guaranteed.
106+
continuation.resume()
107+
await task()
108+
}
109+
}
110+
}
111+
}

Tests/AsyncQueueTests/ActorQueueTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,5 +221,5 @@ final class ActorQueueTests: XCTestCase {
221221
// MARK: Private
222222

223223
private var systemUnderTest = ActorQueue<Counter>()
224-
private var counter: Counter = Counter()
224+
private var counter = Counter()
225225
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// MIT License
2+
//
3+
// Copyright (c) 2023 Dan Federman
4+
//
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
import XCTest
24+
25+
@testable import AsyncQueue
26+
27+
final class MainActorQueueTests: XCTestCase {
28+
29+
// MARK: XCTestCase
30+
31+
override func setUp() async throws {
32+
try await super.setUp()
33+
34+
systemUnderTest = MainActorQueue()
35+
counter = Counter()
36+
}
37+
38+
// MARK: Behavior Tests
39+
40+
func test_shared_returnsSameInstance() async {
41+
XCTAssertTrue(MainActorQueue.shared === MainActorQueue.shared)
42+
}
43+
44+
func test_enqueue_executesOnMainThread() async {
45+
systemUnderTest.enqueue {
46+
XCTAssertTrue(Thread.isMainThread)
47+
}
48+
await systemUnderTest.enqueueAndWait { /* Drain the queue */ }
49+
}
50+
51+
func test_enqueue_sendsEventsInOrder() async {
52+
for iteration in 1...1_000 {
53+
systemUnderTest.enqueue { [counter] in
54+
await counter.incrementAndExpectCount(equals: iteration)
55+
}
56+
}
57+
await systemUnderTest.enqueueAndWait { /* Drain the queue */ }
58+
}
59+
60+
func test_enqueue_startsExecutionOfNextTaskAfterSuspension() async {
61+
let semaphore = Semaphore()
62+
63+
systemUnderTest.enqueue {
64+
await semaphore.wait()
65+
}
66+
systemUnderTest.enqueue {
67+
// Signal the semaphore from the actor queue.
68+
// If the actor queue were FIFO, this test would hang since this code would never execute:
69+
// we'd still be waiting for the prior `wait()` tasks to finish.
70+
await semaphore.signal()
71+
}
72+
await systemUnderTest.enqueueAndWait { /* Drain the queue */ }
73+
}
74+
75+
func test_enqueueAndWait_executesOnMainThread() async {
76+
await systemUnderTest.enqueueAndWait {
77+
XCTAssertTrue(Thread.isMainThread)
78+
}
79+
}
80+
81+
func test_enqueueAndWait_allowsReentrancy() async {
82+
await systemUnderTest.enqueueAndWait { [systemUnderTest, counter] in
83+
await systemUnderTest.enqueueAndWait { [counter] in
84+
await counter.incrementAndExpectCount(equals: 1)
85+
}
86+
await counter.incrementAndExpectCount(equals: 2)
87+
}
88+
}
89+
90+
func test_enqueue_doesNotRetainTaskAfterExecution() async {
91+
final class Reference: Sendable {}
92+
final class ReferenceHolder: @unchecked Sendable {
93+
init() {
94+
reference = Reference()
95+
weakReference = reference
96+
}
97+
private(set) var reference: Reference?
98+
private(set) weak var weakReference: Reference?
99+
100+
func clearReference() {
101+
reference = nil
102+
}
103+
}
104+
let referenceHolder = ReferenceHolder()
105+
let asyncSemaphore = Semaphore()
106+
let syncSemaphore = Semaphore()
107+
let systemUnderTest = ActorQueue<Semaphore>()
108+
systemUnderTest.adoptExecutionContext(of: syncSemaphore)
109+
110+
let expectation = self.expectation(description: #function)
111+
systemUnderTest.enqueue { [reference = referenceHolder.reference] syncSemaphore in
112+
// Now that we've started the task and captured the reference, release the synchronous code.
113+
syncSemaphore.signal()
114+
// Wait for the synchronous setup to complete and the reference to be nil'd out.
115+
await asyncSemaphore.wait()
116+
// Retain the unsafe counter until the task is completed.
117+
_ = reference
118+
systemUnderTest.enqueue { _ in
119+
// Signal that this task has cleaned up.
120+
// This closure will not execute until the prior closure completes.
121+
expectation.fulfill()
122+
}
123+
}
124+
// Wait for the asynchronous task to start.
125+
await syncSemaphore.wait()
126+
referenceHolder.clearReference()
127+
XCTAssertNotNil(referenceHolder.weakReference)
128+
// Allow the enqueued task to complete.
129+
await asyncSemaphore.signal()
130+
// Make sure the task has completed.
131+
await waitForExpectations(timeout: 1.0)
132+
133+
XCTAssertNil(referenceHolder.weakReference)
134+
}
135+
136+
func test_enqueueAndWait_sendsEventsInOrder() async {
137+
for iteration in 1...1_000 {
138+
systemUnderTest.enqueue { [counter] in
139+
await counter.incrementAndExpectCount(equals: iteration)
140+
}
141+
142+
guard iteration % 25 == 0 else {
143+
// Keep sending async events to the queue.
144+
continue
145+
}
146+
147+
await systemUnderTest.enqueueAndWait { [counter] in
148+
let count = await counter.count
149+
XCTAssertEqual(count, iteration)
150+
}
151+
}
152+
await systemUnderTest.enqueueAndWait { /* Drain the queue */ }
153+
}
154+
155+
func test_enqueueAndWait_canReturn() async {
156+
let expectedValue = UUID()
157+
let returnedValue = await systemUnderTest.enqueueAndWait { expectedValue }
158+
XCTAssertEqual(expectedValue, returnedValue)
159+
}
160+
161+
func test_enqueueAndWait_canThrow() async {
162+
struct TestError: Error, Equatable {
163+
private let identifier = UUID()
164+
}
165+
let expectedError = TestError()
166+
do {
167+
try await systemUnderTest.enqueueAndWait { throw expectedError }
168+
} catch {
169+
XCTAssertEqual(error as? TestError, expectedError)
170+
}
171+
}
172+
173+
// MARK: Private
174+
175+
private var systemUnderTest = MainActorQueue()
176+
private var counter = Counter()
177+
}

0 commit comments

Comments
 (0)