Skip to content

Commit d2487e2

Browse files
authored
Create AsyncQueue (#2)
1 parent 10aad91 commit d2487e2

File tree

4 files changed

+297
-5
lines changed

4 files changed

+297
-5
lines changed

README.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,31 @@
11
# swift-async-queue
2-
A queue that enables ordered sending of events from synchronous to asynchronous code
2+
[![Swift Package Manager compatible](https://img.shields.io/badge/SPM-compatible-4BC51D.svg?style=flat)](https://github.com/apple/swift-package-manager)
3+
[![codecov](https://codecov.io/gh/dfed/swift-async-queue/branch/main/graph/badge.svg?token=nZBHcZZ63F)](https://codecov.io/gh/dfed/swift-async-queue)
4+
[![Version](https://img.shields.io/cocoapods/v/swift-async-queue.svg)](https://cocoapods.org/pods/swift-async-queue)
5+
[![License](https://img.shields.io/cocoapods/l/swift-async-queue.svg)](https://cocoapods.org/pods/swift-async-queue)
6+
[![Platform](https://img.shields.io/cocoapods/p/swift-async-queue.svg)](https://cocoapods.org/pods/swift-async-queue)
7+
8+
A queue that enables sending FIFO-ordered tasks from synchronous to asynchronous contexts.
9+
10+
## Usage
11+
12+
### Basic Initialization
13+
14+
```swift
15+
let asyncQueue = AsyncQueue()
16+
```
17+
18+
### Sending events from a synchronous context
19+
20+
```swift
21+
asyncQueue.async { /* awaitable context that executes after all other enqueued work is completed */ }
22+
```
23+
24+
### Awaiting work from an asynchronous context
25+
26+
```swift
27+
await asyncQueue.await { /* throw-able, return-able, awaitable context that executes after all other enqueued work is completed */ }
28+
```
329

430
## Requirements
531

Scripts/build.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ for rawPlatform in rawPlatforms {
159159
"-scheme", "swift-async-queue",
160160
"-sdk", platform.sdk,
161161
"-derivedDataPath", platform.derivedDataPath,
162-
"-PBXBuildsContinueAfterErrors=0"
162+
"-PBXBuildsContinueAfterErrors=0",
163+
"OTHER_SWIFT_FLAGS=-warnings-as-errors",
163164
]
164165

165166
if !platform.destination.isEmpty {
@@ -174,6 +175,9 @@ for rawPlatform in rawPlatforms {
174175
if platform.shouldTest {
175176
xcodeBuildArguments.append("test")
176177
}
178+
xcodeBuildArguments.append("-test-iterations")
179+
xcodeBuildArguments.append("100")
180+
xcodeBuildArguments.append("-run-tests-until-failure")
177181

178182
try execute(commandPath: "/usr/bin/xcodebuild", arguments: xcodeBuildArguments)
179183
isFirstRun = false

Sources/AsyncQueue/AsyncQueue.swift

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,72 @@
2020
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2121
// SOFTWARE.
2222

23-
final class AsyncQueue {}
23+
/// A queue that enables sending FIFO-ordered tasks from synchronous to asynchronous contexts
24+
public final class AsyncQueue: Sendable {
25+
26+
// MARK: Initialization
27+
28+
/// Instantiates an asynchronous queue.
29+
/// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue.
30+
public init(priority: TaskPriority? = nil) {
31+
var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil
32+
let taskStream = AsyncStream<@Sendable () async -> Void> { continuation in
33+
capturedTaskStreamContinuation = continuation
34+
}
35+
// Continuation will be captured during stream creation, so it is safe to force unwrap here.
36+
// If this force-unwrap fails, something is fundamentally broken in the Swift runtime.
37+
taskStreamContinuation = capturedTaskStreamContinuation!
38+
39+
streamTask = Task.detached(priority: priority) {
40+
for await task in taskStream {
41+
await task()
42+
}
43+
}
44+
}
45+
46+
deinit {
47+
taskStreamContinuation.finish()
48+
}
49+
50+
// MARK: Public
51+
52+
/// Schedules an asynchronous task for execution and immediately returns.
53+
/// The schedueled task will not execute until all prior tasks have completed.
54+
/// - Parameter task: The task to enqueue.
55+
public func async(_ task: @escaping @Sendable () async -> Void) {
56+
taskStreamContinuation.yield(task)
57+
}
58+
59+
/// Schedules an asynchronous throwing task and returns after the task is complete.
60+
/// The schedueled task will not execute until all prior tasks have completed.
61+
/// - Parameter task: The task to enqueue.
62+
/// - Returns: The value returned from the enqueued task.
63+
public func await<T>(_ task: @escaping @Sendable () async -> T) async -> T {
64+
await withUnsafeContinuation { continuation in
65+
taskStreamContinuation.yield {
66+
continuation.resume(returning: await task())
67+
}
68+
}
69+
}
70+
71+
/// Schedules an asynchronous task and returns after the task is complete.
72+
/// The schedueled task will not execute until all prior tasks have completed.
73+
/// - Parameter task: The task to enqueue.
74+
/// - Returns: The value returned from the enqueued task.
75+
public func await<T>(_ task: @escaping @Sendable () async throws -> T) async throws -> T {
76+
try await withUnsafeThrowingContinuation { continuation in
77+
taskStreamContinuation.yield {
78+
do {
79+
continuation.resume(returning: try await task())
80+
} catch {
81+
continuation.resume(throwing: error)
82+
}
83+
}
84+
}
85+
}
86+
87+
// MARK: Private
88+
89+
private let streamTask: Task<Void, Never>
90+
private let taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation
91+
}

Tests/AsyncQueueTests/AsyncQueueTests.swift

Lines changed: 196 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,202 @@ import XCTest
2626

2727
final class AsyncQueueTests: XCTestCase {
2828

29-
func test_example() {
30-
_ = AsyncQueue()
29+
// MARK: XCTestCase
30+
31+
override func setUp() async throws {
32+
try await super.setUp()
33+
34+
systemUnderTest = AsyncQueue()
35+
}
36+
37+
// MARK: Behavior Tests
38+
39+
func test_async_sendsEventsInOrder() async {
40+
let counter = Counter()
41+
for iteration in 1...1_000 {
42+
systemUnderTest.async {
43+
await counter.incrementAndExpectCount(equals: iteration)
44+
}
45+
}
46+
await systemUnderTest.await { /* Drain the queue */ }
47+
}
48+
49+
func test_async_executesAsyncBlocksAtomically() async {
50+
let semaphore = Semaphore()
51+
for _ in 1...1_000 {
52+
systemUnderTest.async {
53+
let isWaiting = await semaphore.isWaiting
54+
// This test will fail occasionally if we aren't executing atomically.
55+
// You can prove this to yourself by replacing `systemUnderTest.async` above with `Task`.
56+
XCTAssertFalse(isWaiting)
57+
// Signal the semaphore before or after we wait – let the scheduler decide.
58+
Task {
59+
await semaphore.signal()
60+
}
61+
// Wait for the concurrent task to complete.
62+
await semaphore.wait()
63+
}
64+
}
65+
await systemUnderTest.await { /* Drain the queue */ }
66+
}
67+
68+
func test_async_isNotReentrant() async {
69+
let counter = Counter()
70+
await systemUnderTest.await { [systemUnderTest] in
71+
systemUnderTest.async {
72+
await counter.incrementAndExpectCount(equals: 2)
73+
}
74+
await counter.incrementAndExpectCount(equals: 1)
75+
systemUnderTest.async {
76+
await counter.incrementAndExpectCount(equals: 3)
77+
}
78+
}
79+
await systemUnderTest.await { /* Drain the queue */ }
80+
}
81+
82+
func test_async_retainsReceiverUntilFlushed() async {
83+
var systemUnderTest: AsyncQueue? = AsyncQueue()
84+
let counter = Counter()
85+
let expectation = self.expectation(description: #function)
86+
let semaphore = Semaphore()
87+
systemUnderTest?.async {
88+
// Make the queue wait.
89+
await semaphore.wait()
90+
await counter.incrementAndExpectCount(equals: 1)
91+
}
92+
systemUnderTest?.async {
93+
// This async task should not execute until the semaphore is released.
94+
await counter.incrementAndExpectCount(equals: 2)
95+
expectation.fulfill()
96+
}
97+
// Nil out our reference to the queue to show that the enqueued tasks will still complete
98+
systemUnderTest = nil
99+
// Signal the semaphore to unlock the remaining enqueued tasks.
100+
await semaphore.signal()
101+
102+
await waitForExpectations(timeout: 1.0)
103+
}
104+
105+
func test_async_doesNotRetainTaskAfterExecution() async {
106+
final class Reference: Sendable {}
107+
final class ReferenceHolder: @unchecked Sendable {
108+
var reference: Reference? = Reference()
109+
}
110+
let referenceHolder = ReferenceHolder()
111+
weak var weakReference = referenceHolder.reference
112+
let asyncSemaphore = Semaphore()
113+
let syncSemaphore = Semaphore()
114+
systemUnderTest.async { [reference = referenceHolder.reference] in
115+
// Now that we've started the task and captured the reference, release the synchronous code.
116+
await syncSemaphore.signal()
117+
// Wait for the synchronous setup to complete and the reference to be nil'd out.
118+
await asyncSemaphore.wait()
119+
// Retain the unsafe counter until the task is completed.
120+
_ = reference
121+
}
122+
// Wait for the asynchronous task to start.
123+
await syncSemaphore.wait()
124+
referenceHolder.reference = nil
125+
XCTAssertNotNil(weakReference)
126+
// Allow the enqueued task to complete.
127+
await asyncSemaphore.signal()
128+
// Make sure the task has completed.
129+
await systemUnderTest.await { /* Drain the queue */ }
130+
XCTAssertNil(weakReference)
131+
}
132+
133+
func test_await_sendsEventsInOrder() async {
134+
let counter = Counter()
135+
for iteration in 1...1_000 {
136+
systemUnderTest.async {
137+
await counter.incrementAndExpectCount(equals: iteration)
138+
}
139+
140+
guard iteration % 25 == 0 else {
141+
// Keep sending async events to the queue.
142+
continue
143+
}
144+
145+
await systemUnderTest.await {
146+
let count = await counter.count
147+
XCTAssertEqual(count, iteration)
148+
}
149+
}
150+
await systemUnderTest.await { /* Drain the queue */ }
151+
}
152+
153+
func test_await_canReturn() async {
154+
let expectedValue = UUID()
155+
let returnedValue = await systemUnderTest.await { expectedValue }
156+
XCTAssertEqual(expectedValue, returnedValue)
157+
}
158+
159+
func test_await_canThrow() async {
160+
struct TestError: Error, Equatable {
161+
private let identifier = UUID()
162+
}
163+
let expectedError = TestError()
164+
do {
165+
try await systemUnderTest.await { throw expectedError }
166+
} catch {
167+
XCTAssertEqual(error as? TestError, expectedError)
168+
}
169+
}
170+
171+
// MARK: Private
172+
173+
private var systemUnderTest = AsyncQueue()
174+
175+
// MARK: - Counter
176+
177+
private actor Counter {
178+
func incrementAndExpectCount(equals expectedCount: Int) {
179+
increment()
180+
XCTAssertEqual(expectedCount, count)
181+
}
182+
183+
func increment() {
184+
count += 1
185+
}
186+
187+
var count = 0
31188
}
32189

190+
// MARK: - Semaphore
191+
192+
private actor Semaphore {
193+
194+
func wait() async {
195+
count -= 1
196+
guard count < 0 else {
197+
// We don't need to wait because count is greater than or equal to zero.
198+
return
199+
}
200+
201+
await withCheckedContinuation { continuation in
202+
continuations.append(continuation)
203+
}
204+
}
205+
206+
func signal() {
207+
count += 1
208+
guard !isWaiting else {
209+
// Continue waiting.
210+
return
211+
}
212+
213+
for continuation in continuations {
214+
continuation.resume()
215+
}
216+
217+
continuations.removeAll()
218+
}
219+
220+
var isWaiting: Bool {
221+
count < 0
222+
}
223+
224+
private var continuations = [CheckedContinuation<Void, Never>]()
225+
private var count = 0
226+
}
33227
}

0 commit comments

Comments
 (0)