Skip to content

Commit 5d6da85

Browse files
added lock
1 parent ada5d13 commit 5d6da85

1 file changed

Lines changed: 97 additions & 0 deletions

File tree

src/state-persistence.js

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,103 @@ const parallelPersistedPushData = async (items, options = {}) => {
197197
}
198198
};
199199

200+
/**
201+
* Locking mechanism for resources shared accross actor runs.
202+
* This lock doesn't provide 100% guarante of safety from race condition
203+
* which is not possible due to asynchronous and distributed nature of Apify platform.
204+
* The lock relies on wait times before acquiring the lock so in case of dead slow Apify API
205+
* it can malfunction
206+
*
207+
* @example
208+
* const lock = new Lock();
209+
* await lock.init();
210+
* const criticalSection = async () => {
211+
* // Do something that no one else can touch now
212+
* // At the end of this function, lock gets released
213+
* }
214+
* await lock.lockAndRunSection(criticalSection);
215+
*/
216+
class Lock {
217+
constructor(options = {}) {
218+
const {
219+
storeName = 'LOCK',
220+
instanceId = Apify.getEnv().actorRunId,
221+
pollIntervalMs = 30000,
222+
candidateWaitTimeMs = 10000,
223+
} = options;
224+
this.storeName = storeName;
225+
this.instanceId = instanceId;
226+
this.pollIntervalMs = pollIntervalMs;
227+
this.candidateWaitTimeMs = candidateWaitTimeMs;
228+
this.store = null;
229+
this.isMigrating = false;
230+
this.ourLocked = false;
231+
}
232+
233+
async init() {
234+
this.store = await Apify.openKeyValueStore(this.storeName);
235+
Apify.events.on('migrating', async () => { await this.handleMigration(); });
236+
Apify.events.on('aborting', async () => { await this.handleMigration(); });
237+
}
238+
239+
async handleMigration() {
240+
this.isMigrating = true;
241+
if (this.ourLocked) {
242+
await this.unlock();
243+
}
244+
}
245+
246+
async isLocked() {
247+
const { locked } = await this.store.getValue('LOCKED');
248+
return locked;
249+
}
250+
251+
async unlock() {
252+
await this.store.setValue('LOCKED', { locked: false });
253+
}
254+
255+
async waitAsCandidate() {
256+
await this.store.setValue('CANDIDATE', { instanceId: this.instanceId });
257+
// We wait to see if no other instance acquired a candidate meanwhile
258+
await Apify.utils.sleep(this.candidateWaitTimeMs);
259+
const { instanceId } = await this.store.getValue('CANDIDATE');
260+
return instanceId === this.instanceId;
261+
}
262+
263+
async acquireLock() {
264+
if (await this.isLocked()) {
265+
return false;
266+
}
267+
if (!await this.waitAsCandidate()) {
268+
return false;
269+
}
270+
if (this.isMigrating) {
271+
await Apify.utils.sleep(99999);
272+
}
273+
this.ourLocked = true;
274+
await this.store.setValue('LOCKED', { locked: true });
275+
return true;
276+
}
277+
278+
async lockAndRunSection(criticalSection) {
279+
// We do linear backoff to prevent deadlock
280+
let lockAttempts = 1;
281+
for (;;) {
282+
if (this.isMigrating) {
283+
await Apify.utils.sleep(99999);
284+
}
285+
if (await this.acquireLock()) {
286+
break;
287+
}
288+
await Apify.utils.sleep(this.pollIntervalMs * lockAttempts);
289+
lockAttempts++;
290+
}
291+
// We have the lock now
292+
await criticalSection();
293+
await this.unlock();
294+
}
295+
}
296+
200297
module.exports = {
201298
persistedCall,
202299
createPersistedMap,

0 commit comments

Comments
 (0)