Skip to content

Commit 66218b2

Browse files
chore: Improve racy tests (#769)
1 parent 1530190 commit 66218b2

File tree

5 files changed

+55
-16
lines changed

5 files changed

+55
-16
lines changed

.changeset/angry-planets-clean.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Improved potential race condition when closing HTTP stream connections.

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import PACKAGE from '../../../../package.json' with { type: 'json' };
66
import { AbortOperation } from '../../../utils/AbortOperation.js';
77
import { DataStream } from '../../../utils/DataStream.js';
88
import { PowerSyncCredentials } from '../../connection/PowerSyncCredentials.js';
9-
import { StreamingSyncRequest } from './streaming-sync-types.js';
109
import { WebsocketClientTransport } from './WebsocketClientTransport.js';
10+
import { StreamingSyncRequest } from './streaming-sync-types.js';
11+
1112

1213
export type BSONImplementation = typeof BSON;
1314

@@ -557,30 +558,39 @@ export abstract class AbstractRemote {
557558
// Create a new stream splitting the response at line endings while also handling cancellations
558559
// by closing the reader.
559560
const reader = res.body.getReader();
561+
let readerReleased = false;
560562
// This will close the network request and read stream
561563
const closeReader = async () => {
562564
try {
565+
readerReleased = true;
563566
await reader.cancel();
564567
} catch (ex) {
565568
// an error will throw if the reader hasn't been used yet
566569
}
567570
reader.releaseLock();
568571
};
569572

573+
574+
const stream = new DataStream<T, string>({
575+
logger: this.logger,
576+
mapLine: mapLine
577+
});
578+
570579
abortSignal?.addEventListener('abort', () => {
571580
closeReader();
581+
stream.close();
572582
});
573583

574584
const decoder = this.createTextDecoder();
575585
let buffer = '';
576586

577-
const stream = new DataStream<T, string>({
578-
logger: this.logger,
579-
mapLine: mapLine
580-
});
587+
581588

582589
const l = stream.registerListener({
583590
lowWater: async () => {
591+
if (stream.closed || abortSignal?.aborted || readerReleased) {
592+
return
593+
}
584594
try {
585595
let didCompleteLine = false;
586596
while (!didCompleteLine) {

packages/node/tests/sync.test.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import { describe, vi, expect, beforeEach } from 'vitest';
21
import util from 'node:util';
2+
import { beforeEach, describe, expect, vi } from 'vitest';
33

4-
import { bucket, MockSyncService, mockSyncServiceTest, TestConnector, waitForSyncStatus } from './utils';
54
import {
65
AbstractPowerSyncDatabase,
76
BucketChecksum,
@@ -14,6 +13,7 @@ import {
1413
SyncStreamConnectionMethod
1514
} from '@powersync/common';
1615
import Logger from 'js-logger';
16+
import { bucket, MockSyncService, mockSyncServiceTest, TestConnector, waitForSyncStatus } from './utils';
1717

1818
describe('Sync', () => {
1919
describe('js client', () => {
@@ -484,6 +484,7 @@ function defineSyncTests(impl: SyncClientImplementation) {
484484

485485
// Re-open database
486486
await database.close();
487+
487488
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
488489
database = await syncService.createDatabase();
489490
database.connect(new TestConnector(), options);
@@ -821,10 +822,11 @@ function defineSyncTests(impl: SyncClientImplementation) {
821822
const powersync = await syncService.createDatabase({ schema: customSchema, logger });
822823
powersync.connect(new TestConnector(), options);
823824

824-
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
825-
expect(logMessages).toEqual(
826-
expect.arrayContaining([expect.stringContaining('Raw tables require the Rust-based sync client')])
827-
);
825+
await vi.waitFor(() => {
826+
expect(logMessages).toEqual(
827+
expect.arrayContaining([expect.stringContaining('Raw tables require the Rust-based sync client')])
828+
);
829+
});
828830
});
829831

830832
mockSyncServiceTest(`does not warn about raw tables if they're not used`, async ({ syncService }) => {

packages/node/tests/utils.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,16 @@ import { onTestFinished, test } from 'vitest';
99
import {
1010
AbstractPowerSyncDatabase,
1111
BucketChecksum,
12-
column,
1312
NodePowerSyncDatabaseOptions,
1413
PowerSyncBackendConnector,
1514
PowerSyncCredentials,
1615
PowerSyncDatabase,
17-
PowerSyncDatabaseOptions,
1816
Schema,
1917
StreamingSyncCheckpoint,
2018
StreamingSyncLine,
2119
SyncStatus,
22-
Table
20+
Table,
21+
column
2322
} from '../lib';
2423

2524
export async function createTempDir() {
@@ -67,7 +66,7 @@ export async function createDatabase(
6766
const database = new PowerSyncDatabase({
6867
schema: AppSchema,
6968
...options,
70-
logger: defaultLogger,
69+
logger: options.logger ?? defaultLogger,
7170
database: {
7271
dbFilename: 'test.db',
7372
dbLocation: tmpdir,
@@ -103,6 +102,9 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{
103102
stream: ReadableStreamDefaultController<StreamingSyncLine>;
104103
}
105104

105+
// Uses a unique database name per mockSyncServiceTest to avoid conflicts with other tests.
106+
const databaseName = `test-${crypto.randomUUID()}.db`;
107+
106108
const listeners: Listener[] = [];
107109

108110
const inMemoryFetch: typeof fetch = async (info, init?) => {
@@ -149,6 +151,10 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{
149151
const newConnection = async (options?: Partial<NodePowerSyncDatabaseOptions>) => {
150152
const db = await createDatabase(tmpdir, {
151153
...options,
154+
database: {
155+
dbFilename: databaseName,
156+
...options?.database
157+
},
152158
remoteOptions: {
153159
fetchImplementation: inMemoryFetch
154160
}

packages/web/tests/watch.test.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,11 @@ describe('Watch Tests', { sequential: true }, () => {
658658
parameters: ['test']
659659
})
660660
.watch({
661-
reportFetching: false
661+
reportFetching: false,
662+
// Comparisons require a comparator to be set
663+
comparator: new ArrayComparator({
664+
compareBy: (item) => JSON.stringify(item)
665+
})
662666
});
663667

664668
expect(watch.state.isFetching).false;
@@ -671,9 +675,21 @@ describe('Watch Tests', { sequential: true }, () => {
671675
});
672676
onTestFinished(dispose);
673677

678+
// Wait for the initial load to complete
679+
await vi.waitFor(() => {
680+
expect(notificationCount).equals(1);
681+
});
682+
683+
notificationCount = 0; // We want to count the number of state changes after the initial load
684+
674685
// Should only a state change trigger for this operation
675686
await powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['test', uuid()]);
676687

688+
// We should get an update for the change above
689+
await vi.waitFor(() => {
690+
expect(notificationCount).equals(1);
691+
});
692+
677693
// Should not trigger any state change for these operations
678694
await powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['make1', uuid()]);
679695
await powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['make2', uuid()]);

0 commit comments

Comments
 (0)