Skip to content

Commit 7a5aaf5

Browse files
authored
React hooks for sync streams (#717)
1 parent d8236aa commit 7a5aaf5

File tree

15 files changed

+392
-38
lines changed

15 files changed

+392
-38
lines changed

.changeset/angry-ducks-sneeze.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
'@powersync/react-native': minor
33
'@powersync/common': minor
44
'@powersync/web': minor
5+
'@powersync/node': minor
56
---
67

78
Add alpha support for sync streams, allowing different sets of data to be synced dynamically.

.changeset/mighty-colts-rule.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/react': minor
3+
---
4+
5+
Add hooks for sync streams

.github/workflows/test-simulators.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ jobs:
130130
test-ios:
131131
name: Test iOS
132132
needs: check-changes
133-
if: ${{ needs.check-changes.outputs.should_run == 'true' }}
133+
# TODO: Re-enable iOS tests. They have been disabled because they are failing extremely frequently without
134+
# any apparent cause. In particular, it seems like even starting the simulator times out most of the time.
135+
if: ${{ false && needs.check-changes.outputs.should_run == 'true' }}
134136
runs-on: macOS-15
135137

136138
steps:

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
355355
return this.waitForStatus(statusMatches, signal);
356356
}
357357

358-
private async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise<void> {
358+
/**
359+
* Waits for the first sync status for which the `status` callback returns a truthy value.
360+
*/
361+
async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise<void> {
359362
if (predicate(this.currentStatus)) {
360363
return;
361364
}
@@ -364,16 +367,21 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
364367
const dispose = this.registerListener({
365368
statusChanged: (status) => {
366369
if (predicate(status)) {
367-
dispose();
368-
resolve();
370+
abort();
369371
}
370372
}
371373
});
372374

373-
signal?.addEventListener('abort', () => {
375+
function abort() {
374376
dispose();
375377
resolve();
376-
});
378+
}
379+
380+
if (signal?.aborted) {
381+
abort();
382+
} else {
383+
signal?.addEventListener('abort', abort);
384+
}
377385
});
378386
}
379387

packages/common/src/client/ConnectionManager.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -324,14 +324,15 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
324324
};
325325
}
326326

327-
private get activeStreams() {
327+
/**
328+
* @internal exposed for testing
329+
*/
330+
get activeStreams() {
328331
return [...this.locallyActiveSubscriptions.values()].map((a) => ({ name: a.name, params: a.parameters }));
329332
}
330333

331334
private subscriptionsMayHaveChanged() {
332-
if (this.syncStreamImplementation) {
333-
this.syncStreamImplementation.updateSubscriptions(this.activeStreams);
334-
}
335+
this.syncStreamImplementation?.updateSubscriptions(this.activeStreams);
335336
}
336337
}
337338

packages/node/vitest.config.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import { defineConfig } from 'vitest/config';
33
// We need to define an empty config to be part of the vitest works
44
export default defineConfig({
55
test: {
6-
silent: false
6+
silent: false,
7+
// This doesn't make the tests considerably slower. It may improve reliability for GH actions.
8+
fileParallelism: false
79
}
810
});
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import { useEffect, useMemo, useState } from 'react';
2+
import { usePowerSync } from './PowerSyncContext.js';
3+
import {
4+
AbstractPowerSyncDatabase,
5+
SyncStatus,
6+
SyncStreamStatus,
7+
SyncStreamSubscribeOptions,
8+
SyncStreamSubscription
9+
} from '@powersync/common';
10+
import { useStatus } from './useStatus.js';
11+
import { QuerySyncStreamOptions } from './watched/watch-types.js';
12+
13+
/**
14+
* A sync stream to subscribe to in {@link useSyncStream}.
15+
*
16+
* For more details on sync streams, see the [documentation](https://docs.powersync.com/usage/sync-streams).
17+
*/
18+
export interface UseSyncStreamOptions extends SyncStreamSubscribeOptions {
19+
/**
20+
* The name of the stream to subscribe to.
21+
*/
22+
name: string;
23+
/**
24+
* Parameters for the stream subscription. A single stream can have multiple subscriptions with different parameter
25+
* sets.
26+
*/
27+
parameters?: Record<string, any>;
28+
}
29+
30+
/**
31+
* Creates a PowerSync stream subscription. The subscription is kept alive as long as the React component calling this
32+
* function. When it unmounts, {@link SyncStreamSubscription.unsubscribe} is called
33+
*
34+
* For more details on sync streams, see the [documentation](https://docs.powersync.com/usage/sync-streams).
35+
*
36+
* @returns The status for that stream, or `null` if the stream is currently being resolved.
37+
*/
38+
export function useSyncStream(options: UseSyncStreamOptions): SyncStreamStatus | null {
39+
const { name, parameters } = options;
40+
const db = usePowerSync();
41+
const status = useStatus();
42+
const [subscription, setSubscription] = useState<SyncStreamSubscription | null>(null);
43+
44+
useEffect(() => {
45+
let active = true;
46+
let subscription: SyncStreamSubscription | null = null;
47+
48+
db.syncStream(name, parameters)
49+
.subscribe(options)
50+
.then((sub) => {
51+
if (active) {
52+
subscription = sub;
53+
setSubscription(sub);
54+
} else {
55+
// The cleanup function already ran, unsubscribe immediately.
56+
sub.unsubscribe();
57+
}
58+
});
59+
60+
return () => {
61+
active = false;
62+
// If we don't have a subscription yet, it'll still get cleaned up once the promise resolves because we've set
63+
// active to false.
64+
subscription?.unsubscribe();
65+
};
66+
}, [name, parameters]);
67+
68+
return subscription && status.forStream(subscription);
69+
}
70+
71+
/**
72+
* @internal
73+
*/
74+
export function useAllSyncStreamsHaveSynced(
75+
db: AbstractPowerSyncDatabase,
76+
streams: QuerySyncStreamOptions[] | undefined
77+
): boolean {
78+
// Since streams are a user-supplied array, they will likely be different each time this function is called. We don't
79+
// want to update underlying subscriptions each time, though.
80+
const hash = useMemo(() => streams && JSON.stringify(streams), [streams]);
81+
const [synced, setHasSynced] = useState(streams == null || streams.every((e) => e.waitForStream != true));
82+
83+
useEffect(() => {
84+
if (streams) {
85+
setHasSynced(false);
86+
87+
const promises: Promise<SyncStreamSubscription>[] = [];
88+
const abort = new AbortController();
89+
for (const stream of streams) {
90+
promises.push(db.syncStream(stream.name, stream.parameters).subscribe(stream));
91+
}
92+
93+
// First, wait for all subscribe() calls to make all subscriptions active.
94+
Promise.all(promises).then(async (resolvedStreams) => {
95+
function allHaveSynced(status: SyncStatus) {
96+
return resolvedStreams.every((s, i) => {
97+
const request = streams[i];
98+
return !request.waitForStream || status.forStream(s)?.subscription?.hasSynced;
99+
});
100+
}
101+
102+
// Wait for the effect to be cancelled or all streams having synced.
103+
await db.waitForStatus(allHaveSynced, abort.signal);
104+
if (abort.signal.aborted) {
105+
// Was cancelled
106+
} else {
107+
// Has synced, update public state.
108+
setHasSynced(true);
109+
110+
// Wait for cancellation before clearing subscriptions.
111+
await new Promise<void>((resolve) => {
112+
abort.signal.addEventListener('abort', () => resolve());
113+
});
114+
}
115+
116+
// Effect was definitely cancelled at this point, so drop the subscriptions.
117+
for (const stream of resolvedStreams) {
118+
stream.unsubscribe();
119+
}
120+
});
121+
122+
return () => abort.abort();
123+
} else {
124+
// There are no streams, so all of them have synced.
125+
setHasSynced(true);
126+
return undefined;
127+
}
128+
}, [hash]);
129+
130+
return synced;
131+
}

packages/react/src/hooks/watched/useQuery.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { useSingleQuery } from './useSingleQuery.js';
44
import { useWatchedQuery } from './useWatchedQuery.js';
55
import { AdditionalOptions, DifferentialHookOptions, QueryResult, ReadonlyQueryResult } from './watch-types.js';
66
import { constructCompatibleQuery } from './watch-utils.js';
7+
import { useAllSyncStreamsHaveSynced } from '../streams.js';
78

89
/**
910
* A hook to access the results of a watched query.
@@ -58,15 +59,20 @@ export function useQuery<RowType = any>(
5859
) {
5960
const powerSync = usePowerSync();
6061
if (!powerSync) {
61-
return { isLoading: false, isFetching: false, data: [], error: new Error('PowerSync not configured.') };
62+
return {
63+
..._loadingState,
64+
isLoading: false,
65+
error: new Error('PowerSync not configured.')
66+
};
6267
}
6368
const { parsedQuery, queryChanged } = constructCompatibleQuery(query, parameters, options);
69+
const streamsHaveSynced = useAllSyncStreamsHaveSynced(powerSync, options?.streams);
6470
const runOnce = options?.runQueryOnce == true;
6571
const single = useSingleQuery<RowType>({
6672
query: parsedQuery,
6773
powerSync,
6874
queryChanged,
69-
active: runOnce
75+
active: runOnce && streamsHaveSynced
7076
});
7177
const watched = useWatchedQuery<RowType>({
7278
query: parsedQuery,
@@ -79,8 +85,14 @@ export function useQuery<RowType = any>(
7985
// We emit new data for each table change by default.
8086
rowComparator: options.rowComparator
8187
},
82-
active: !runOnce
88+
active: !runOnce && streamsHaveSynced
8389
});
8490

85-
return runOnce ? single : watched;
91+
if (!streamsHaveSynced) {
92+
return { ..._loadingState };
93+
}
94+
95+
return (runOnce ? single : watched) ?? _loadingState;
8696
}
97+
98+
const _loadingState = { isLoading: true, isFetching: false, data: [], error: undefined };

packages/react/src/hooks/watched/watch-types.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,35 @@
1-
import { DifferentialWatchedQueryComparator, SQLOnChangeOptions } from '@powersync/common';
1+
import { DifferentialWatchedQueryComparator, SQLOnChangeOptions, SyncSubscriptionDescription } from '@powersync/common';
2+
import { UseSyncStreamOptions } from '../streams.js';
23

34
export interface HookWatchOptions extends Omit<SQLOnChangeOptions, 'signal'> {
5+
/**
6+
* An optional array of sync streams (with names and parameters) backing the query.
7+
*
8+
* When set, `useQuery` will subscribe to those streams (and automatically handle unsubscribing from them, too).
9+
*
10+
* If {@link QuerySyncStreamOptions} is set on a stream, `useQuery` will remain in a loading state until that stream
11+
* is marked as {@link SyncSubscriptionDescription.hasSynced}. This ensures the query is not missing rows that haven't
12+
* been downloaded.
13+
* Note however that after an initial sync, the query will not block itself while new rows are downloading. Instead,
14+
* consistent sync snapshots will be made available as they've been processed by PowerSync.
15+
*
16+
* @experimental Sync streams are currently in alpha.
17+
*/
18+
streams?: QuerySyncStreamOptions[];
419
reportFetching?: boolean;
520
}
621

22+
/**
23+
* Additional options to control how `useQuery` behaves when subscribing to a stream.
24+
*/
25+
export interface QuerySyncStreamOptions extends UseSyncStreamOptions {
26+
/**
27+
* When set to `true`, a `useQuery` hook will remain in a loading state as long as the stream is resolving or
28+
* downloading for the first time (in other words, until {@link SyncSubscriptionDescription.hasSynced} is true).
29+
*/
30+
waitForStream?: boolean;
31+
}
32+
733
export interface AdditionalOptions extends HookWatchOptions {
834
runQueryOnce?: boolean;
935
}

packages/react/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export * from './hooks/PowerSyncContext.js';
55
export { SuspenseQueryResult } from './hooks/suspense/SuspenseQueryResult.js';
66
export { useSuspenseQuery } from './hooks/suspense/useSuspenseQuery.js';
77
export { useWatchedQuerySuspenseSubscription } from './hooks/suspense/useWatchedQuerySuspenseSubscription.js';
8+
export { useSyncStream, UseSyncStreamOptions } from './hooks/streams.js';
89
export { useStatus } from './hooks/useStatus.js';
910
export { useQuery } from './hooks/watched/useQuery.js';
1011
export { useWatchedQuerySubscription } from './hooks/watched/useWatchedQuerySubscription.js';

0 commit comments

Comments
 (0)