Skip to content

Commit a0ee132

Browse files
WatchedQuery Bug Fixes (#713)
1 parent ba72a58 commit a0ee132

File tree

10 files changed

+434
-40
lines changed

10 files changed

+434
-40
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Fixed potential race conditions in WatchedQueries when updateSettings is called frequently.

.changeset/empty-glasses-camp.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/react': patch
3+
---
4+
5+
- Fixed bug where the `useQuery` reported `error` state would not clear after updating the query to a valid query.
6+
- Fixed bug where `useQuery` `isFetching` status would not immediately be reported as true when the query has changed.

.github/workflows/test-simulators.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ jobs:
141141
- name: Set up XCode
142142
uses: maxim-lobanov/setup-xcode@v1
143143
with:
144-
xcode-version: latest-stable
144+
# TODO: Update to latest-stable once GH installs iOS 26 simulators
145+
xcode-version: '^16.4.0'
145146

146147
- name: CocoaPods Cache
147148
uses: actions/cache@v3

packages/common/src/client/watched/WatchedQuery.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,15 @@ export enum WatchedQueryListenerEvent {
7171
ON_DATA = 'onData',
7272
ON_ERROR = 'onError',
7373
ON_STATE_CHANGE = 'onStateChange',
74+
SETTINGS_WILL_UPDATE = 'settingsWillUpdate',
7475
CLOSED = 'closed'
7576
}
7677

7778
export interface WatchedQueryListener<Data> extends BaseListener {
7879
[WatchedQueryListenerEvent.ON_DATA]?: (data: Data) => void | Promise<void>;
7980
[WatchedQueryListenerEvent.ON_ERROR]?: (error: Error) => void | Promise<void>;
8081
[WatchedQueryListenerEvent.ON_STATE_CHANGE]?: (state: WatchedQueryState<Data>) => void | Promise<void>;
82+
[WatchedQueryListenerEvent.SETTINGS_WILL_UPDATE]?: () => void;
8183
[WatchedQueryListenerEvent.CLOSED]?: () => void | Promise<void>;
8284
}
8385

packages/common/src/client/watched/processors/AbstractQueryProcessor.ts

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import { AbstractPowerSyncDatabase } from '../../../client/AbstractPowerSyncDatabase.js';
22
import { MetaBaseObserver } from '../../../utils/MetaBaseObserver.js';
3-
import { WatchedQuery, WatchedQueryListener, WatchedQueryOptions, WatchedQueryState } from '../WatchedQuery.js';
3+
import {
4+
WatchedQuery,
5+
WatchedQueryListener,
6+
WatchedQueryListenerEvent,
7+
WatchedQueryOptions,
8+
WatchedQueryState
9+
} from '../WatchedQuery.js';
410

511
/**
612
* @internal
@@ -62,7 +68,7 @@ export abstract class AbstractQueryProcessor<
6268
this._closed = false;
6369
this.state = this.constructInitialState();
6470
this.disposeListeners = null;
65-
this.initialized = this.init();
71+
this.initialized = this.init(this.abortController.signal);
6672
}
6773

6874
protected constructInitialState(): WatchedQueryState<Data> {
@@ -79,37 +85,57 @@ export abstract class AbstractQueryProcessor<
7985
return this.options.watchOptions.reportFetching ?? true;
8086
}
8187

82-
/**
83-
* Updates the underlying query.
84-
*/
85-
async updateSettings(settings: Settings) {
86-
this.abortController.abort();
87-
await this.initialized;
88+
protected async updateSettingsInternal(settings: Settings, signal: AbortSignal) {
89+
// This may have been aborted while awaiting or if multiple calls to `updateSettings` were made
90+
if (this._closed || signal.aborted) {
91+
return;
92+
}
93+
94+
this.options.watchOptions = settings;
95+
96+
this.iterateListeners((l) => l[WatchedQueryListenerEvent.SETTINGS_WILL_UPDATE]?.());
8897

8998
if (!this.state.isFetching && this.reportFetching) {
9099
await this.updateState({
91100
isFetching: true
92101
});
93102
}
94103

95-
this.options.watchOptions = settings;
96-
97-
this.abortController = new AbortController();
98104
await this.runWithReporting(() =>
99105
this.linkQuery({
100-
abortSignal: this.abortController.signal,
106+
abortSignal: signal,
101107
settings
102108
})
103109
);
104110
}
105111

112+
/**
113+
* Updates the underlying query.
114+
*/
115+
async updateSettings(settings: Settings) {
116+
// Abort the previous request
117+
this.abortController.abort();
118+
119+
// Keep track of this controller's abort status
120+
const abortController = new AbortController();
121+
// Allow this to be aborted externally
122+
this.abortController = abortController;
123+
124+
await this.initialized;
125+
return this.updateSettingsInternal(settings, abortController.signal);
126+
}
127+
106128
/**
107129
* This method is used to link a query to the subscribers of this listener class.
108130
* This method should perform actual query watching and report results via {@link updateState} method.
109131
*/
110132
protected abstract linkQuery(options: LinkQueryOptions<Data>): Promise<void>;
111133

112134
protected async updateState(update: Partial<MutableWatchedQueryState<Data>>) {
135+
if (this._closed) {
136+
return;
137+
}
138+
113139
if (typeof update.error !== 'undefined') {
114140
await this.iterateAsyncListenersWithError(async (l) => l.onError?.(update.error!));
115141
// An error always stops for the current fetching state
@@ -128,7 +154,7 @@ export abstract class AbstractQueryProcessor<
128154
/**
129155
* Configures base DB listeners and links the query to listeners.
130156
*/
131-
protected async init() {
157+
protected async init(signal: AbortSignal) {
132158
const { db } = this.options;
133159

134160
const disposeCloseListener = db.registerListener({
@@ -153,17 +179,16 @@ export abstract class AbstractQueryProcessor<
153179
};
154180

155181
// Initial setup
156-
this.runWithReporting(async () => {
157-
await this.updateSettings(this.options.watchOptions);
182+
await this.runWithReporting(async () => {
183+
await this.updateSettingsInternal(this.options.watchOptions, signal);
158184
});
159185
}
160186

161187
async close() {
162-
await this.initialized;
188+
this._closed = true;
163189
this.abortController.abort();
164190
this.disposeListeners?.();
165191
this.disposeListeners = null;
166-
this._closed = true;
167192
this.iterateListeners((l) => l.closed?.());
168193
this.listeners.clear();
169194
}

packages/common/src/client/watched/processors/DifferentialQueryProcessor.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,10 @@ export class DifferentialQueryProcessor<RowType>
279279
});
280280
}
281281

282+
if (this.state.error) {
283+
partialStateUpdate.error = null;
284+
}
285+
282286
if (Object.keys(partialStateUpdate).length > 0) {
283287
await this.updateState(partialStateUpdate);
284288
}

packages/common/src/client/watched/processors/OnChangeQueryProcessor.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ export class OnChangeQueryProcessor<Data> extends AbstractQueryProcessor<Data, W
9696
});
9797
}
9898

99+
if (this.state.error) {
100+
partialStateUpdate.error = null;
101+
}
102+
99103
if (Object.keys(partialStateUpdate).length > 0) {
100104
await this.updateState(partialStateUpdate);
101105
}

packages/react/src/hooks/watched/useWatchedQuery.ts

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,6 @@ export const useWatchedQuery = <RowType = unknown>(
1616
): QueryResult<RowType> | ReadonlyQueryResult<RowType> => {
1717
const { query, powerSync, queryChanged, options: hookOptions, active } = options;
1818

19-
// This ref is used to protect against cases where `queryChanged` changes multiple times too quickly to be
20-
// picked up by the useEffect below. This typically happens when React.StrictMode is enabled.
21-
const queryChangeRef = React.useRef(false);
22-
if (queryChanged && !queryChangeRef.current) {
23-
queryChangeRef.current = true;
24-
}
25-
2619
function createWatchedQuery() {
2720
if (!active) {
2821
return null;
@@ -42,24 +35,55 @@ export const useWatchedQuery = <RowType = unknown>(
4235
}
4336

4437
const [watchedQuery, setWatchedQuery] = React.useState(createWatchedQuery);
38+
const disposePendingUpdateListener = React.useRef<() => void | null>(null);
4539

4640
React.useEffect(() => {
4741
watchedQuery?.close();
48-
setWatchedQuery(createWatchedQuery);
42+
const newQuery = createWatchedQuery();
43+
setWatchedQuery(newQuery);
44+
45+
return () => {
46+
disposePendingUpdateListener.current?.();
47+
newQuery?.close();
48+
};
4949
}, [powerSync, active]);
5050

51-
// Indicates that the query will be re-fetched due to a change in the query.
52-
// Used when `isFetching` hasn't been set to true yet due to React execution.
53-
React.useEffect(() => {
54-
if (queryChangeRef.current) {
55-
watchedQuery?.updateSettings({
56-
query,
57-
throttleMs: hookOptions.throttleMs,
58-
reportFetching: hookOptions.reportFetching
59-
});
60-
queryChangeRef.current = false;
61-
}
62-
}, [queryChangeRef.current]);
51+
/**
52+
* Indicates that the query will be re-fetched due to a change in the query.
53+
* We execute this in-line (not using an effect) since effects are delayed till after the hook returns.
54+
* The `queryChanged` value should only be true for a single render.
55+
* The `updateSettings` method is asynchronous, thus it will update the state asynchronously.
56+
* In the React hooks we'd like to report that we are fetching the data for an updated query
57+
* as soon as the query has been updated. This prevents a result flow where e.g. the hook:
58+
* - already returned a result: isLoading, isFetching are both false
59+
* - the query is updated, but the state is still isFetching=false from the previous state
60+
* We override the isFetching status while the `updateSettings` method is running (if we report `isFetching`),
61+
* we override this value just until the `updateSettings` method itself will update the `isFetching` status.
62+
* We achieve this by registering a `settingsWillUpdate` listener on the `WatchedQuery`. This will fire
63+
* just before the `isFetching` status is updated.
64+
*/
65+
if (queryChanged) {
66+
// Keep track of this pending operation
67+
watchedQuery?.updateSettings({
68+
query,
69+
throttleMs: hookOptions.throttleMs,
70+
reportFetching: hookOptions.reportFetching
71+
});
72+
// This could have been called multiple times, clear any old listeners.
73+
disposePendingUpdateListener.current?.();
74+
disposePendingUpdateListener.current = watchedQuery?.registerListener({
75+
settingsWillUpdate: () => {
76+
// We'll use the fact that we have a listener at all as an indication
77+
disposePendingUpdateListener.current?.();
78+
disposePendingUpdateListener.current = null;
79+
}
80+
});
81+
}
6382

64-
return useNullableWatchedQuerySubscription(watchedQuery);
83+
const shouldReportCurrentlyFetching = (hookOptions.reportFetching ?? true) && !!disposePendingUpdateListener.current;
84+
const result = useNullableWatchedQuerySubscription(watchedQuery);
85+
return {
86+
...result,
87+
isFetching: result?.isFetching || shouldReportCurrentlyFetching
88+
};
6589
};

0 commit comments

Comments
 (0)