diff --git a/packages/transformers/src/backends/utils/cacheWasm.js b/packages/transformers/src/backends/utils/cacheWasm.js index ddf4fb970..d75b3d18c 100644 --- a/packages/transformers/src/backends/utils/cacheWasm.js +++ b/packages/transformers/src/backends/utils/cacheWasm.js @@ -6,7 +6,7 @@ import { env } from '../../env.js'; /** * Loads and caches a file from the given URL. * @param {string} url The URL of the file to load. - * @returns {Promise} The response object, or null if loading failed. + * @returns {Promise} The response object, or null if loading failed. */ async function loadAndCacheFile(url) { const fileName = url.split('/').pop(); diff --git a/packages/transformers/src/utils/cache.js b/packages/transformers/src/utils/cache.js index ef10ec55c..1ae907f82 100644 --- a/packages/transformers/src/utils/cache.js +++ b/packages/transformers/src/utils/cache.js @@ -1,10 +1,10 @@ import { apis, env } from '../env.js'; -import { FileCache } from './hub/files.js'; +import { FileCache } from './cache/FileCache.js'; import { logger } from './logger.js'; /** * @typedef {Object} CacheInterface - * @property {(request: string) => Promise} match + * @property {(request: string) => Promise} match * Checks if a request is in the cache and returns the cached response if found. * @property {(request: string, response: Response, progress_callback?: (data: {progress: number, loaded: number, total: number}) => void) => Promise} put * Adds a response to the cache. @@ -70,7 +70,7 @@ export async function getCache(file_cache_dir = null) { * Searches the cache for any of the provided names and returns the first match found. * @param {CacheInterface} cache The cache to search * @param {...string} names The names of the items to search for - * @returns {Promise} The item from the cache, or undefined if not found. + * @returns {Promise} The item from the cache, or undefined if not found. */ export async function tryCache(cache, ...names) { for (let name of names) { diff --git a/packages/transformers/src/utils/cache/FileCache.js b/packages/transformers/src/utils/cache/FileCache.js new file mode 100644 index 000000000..8ea2b86e3 --- /dev/null +++ b/packages/transformers/src/utils/cache/FileCache.js @@ -0,0 +1,118 @@ +import fs from 'node:fs'; +import path from 'node:path'; + +import { FileResponse } from '../hub/FileResponse.js'; + +/** + * File system cache implementation that implements the CacheInterface. + * Provides `match` and `put` methods compatible with the Web Cache API. + */ +export class FileCache { + /** + * Instantiate a `FileCache` object. + * @param {string} path + */ + constructor(path) { + this.path = path; + } + + /** + * Checks whether the given request is in the cache. + * @param {string} request + * @returns {Promise} + */ + async match(request) { + let filePath = path.join(this.path, request); + let file = new FileResponse(filePath); + + if (file.exists) { + return file; + } else { + return undefined; + } + } + + /** + * Adds the given response to the cache. + * @param {string} request + * @param {Response} response + * @param {(data: {progress: number, loaded: number, total: number}) => void} [progress_callback] Optional. + * The function to call with progress updates + * @returns {Promise} + */ + async put(request, response, progress_callback = undefined) { + let filePath = path.join(this.path, request); + // Include both PID and a random suffix so that concurrent put() call within the same process (e.g. multiple pipelines loading the same file in parallel) each get their own temp file and don't corrupt each other's writes. + let tmpPath = filePath + `.tmp.${process.pid}.${Math.random().toString(36).slice(2)}`; + + try { + const contentLength = response.headers.get('Content-Length'); + const total = parseInt(contentLength ?? '0'); + let loaded = 0; + + await fs.promises.mkdir(path.dirname(filePath), { recursive: true }); + const fileStream = fs.createWriteStream(tmpPath); + const reader = response.body.getReader(); + + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + + await new Promise((resolve, reject) => { + fileStream.write(value, (err) => { + if (err) { + reject(err); + return; + } + resolve(); + }); + }); + + loaded += value.length; + const progress = total ? (loaded / total) * 100 : 0; + + progress_callback?.({ progress, loaded, total }); + } + + await new Promise((resolve, reject) => { + fileStream.close((err) => (err ? reject(err) : resolve())); + }); + + // Atomically move the completed temp file to the final path so that + // concurrent readers (other processes or other in-process calls) + // never observe a partially-written file. + await fs.promises.rename(tmpPath, filePath); + } catch (error) { + // Clean up the temp file if an error occurred during download + try { + await fs.promises.unlink(tmpPath); + } catch {} + throw error; + } + } + + /** + * Deletes the cache entry for the given request. + * @param {string} request + * @returns {Promise} A Promise that resolves to `true` if the cache entry was deleted, `false` otherwise. + */ + async delete(request) { + let filePath = path.join(this.path, request); + + try { + await fs.promises.unlink(filePath); + return true; + } catch (error) { + // File doesn't exist or couldn't be deleted + return false; + } + } + + // TODO add the rest? + // addAll(requests: RequestInfo[]): Promise; + // keys(request?: RequestInfo | URL, options?: CacheQueryOptions): Promise>; + // match(request: RequestInfo | URL, options?: CacheQueryOptions): Promise; + // matchAll(request?: RequestInfo | URL, options?: CacheQueryOptions): Promise>; +} diff --git a/packages/transformers/src/utils/hub.js b/packages/transformers/src/utils/hub.js index 4e3971675..6945a9302 100755 --- a/packages/transformers/src/utils/hub.js +++ b/packages/transformers/src/utils/hub.js @@ -6,7 +6,8 @@ import { apis, env } from '../env.js'; import { dispatchCallback } from './core.js'; -import { FileResponse, FileCache } from './hub/files.js'; +import { FileResponse } from './hub/FileResponse.js'; +import { FileCache } from './cache/FileCache.js'; import { handleError, isValidUrl, pathJoin, isValidHfModelId, readResponse } from './hub/utils.js'; import { getCache, tryCache } from './cache.js'; import { get_file_metadata } from './model_registry/get_file_metadata.js'; @@ -160,7 +161,7 @@ export function buildResourcePaths(path_or_repo_id, filename, options = {}, cach * @param {import('./cache.js').CacheInterface | null} cache The cache instance to check. * @param {string} localPath The local path to try first. * @param {string} proposedCacheKey The proposed cache key to try second. - * @returns {Promise} + * @returns {Promise} * The cached response if found, undefined otherwise. */ export async function checkCachedResource(cache, localPath, proposedCacheKey) { @@ -182,7 +183,7 @@ export async function checkCachedResource(cache, localPath, proposedCacheKey) { * @param {string} filename The name of the file to cache. * @param {import('./cache.js').CacheInterface} cache The cache instance to store in. * @param {string} cacheKey The cache key to use. - * @param {Response|import('./hub/files.js').FileResponse} response The response to cache. + * @param {Response|import('./hub/FileResponse.js').FileResponse} response The response to cache. * @param {Uint8Array} [result] The result buffer if already read. * @param {PretrainedOptions} [options] Options containing progress callback and context for progress updates. * @returns {Promise} @@ -259,7 +260,7 @@ export async function loadResourceFile( // Whether to cache the final response in the end. let toCacheResponse = false; - /** @type {Response|import('./hub/files.js').FileResponse|undefined|string} */ + /** @type {Response|import('./hub/FileResponse.js').FileResponse|undefined|string} */ let response; // Check cache diff --git a/packages/transformers/src/utils/hub/files.js b/packages/transformers/src/utils/hub/FileResponse.js similarity index 56% rename from packages/transformers/src/utils/hub/files.js rename to packages/transformers/src/utils/hub/FileResponse.js index 1ddc7d874..d0102dd79 100644 --- a/packages/transformers/src/utils/hub/files.js +++ b/packages/transformers/src/utils/hub/FileResponse.js @@ -1,5 +1,4 @@ import fs from 'node:fs'; -import path from 'node:path'; /** * Mapping from file extensions to MIME types. @@ -120,107 +119,3 @@ export class FileResponse { return JSON.parse(await this.text()); } } - -/** - * File system cache implementation that implements the CacheInterface. - * Provides `match` and `put` methods compatible with the Web Cache API. - */ -export class FileCache { - /** - * Instantiate a `FileCache` object. - * @param {string} path - */ - constructor(path) { - this.path = path; - } - - /** - * Checks whether the given request is in the cache. - * @param {string} request - * @returns {Promise} - */ - async match(request) { - let filePath = path.join(this.path, request); - let file = new FileResponse(filePath); - - if (file.exists) { - return file; - } else { - return undefined; - } - } - - /** - * Adds the given response to the cache. - * @param {string} request - * @param {Response} response - * @param {(data: {progress: number, loaded: number, total: number}) => void} [progress_callback] Optional. - * The function to call with progress updates - * @returns {Promise} - */ - async put(request, response, progress_callback = undefined) { - let filePath = path.join(this.path, request); - - try { - const contentLength = response.headers.get('Content-Length'); - const total = parseInt(contentLength ?? '0'); - let loaded = 0; - - await fs.promises.mkdir(path.dirname(filePath), { recursive: true }); - const fileStream = fs.createWriteStream(filePath); - const reader = response.body.getReader(); - - while (true) { - const { done, value } = await reader.read(); - if (done) { - break; - } - - await new Promise((resolve, reject) => { - fileStream.write(value, (err) => { - if (err) { - reject(err); - return; - } - resolve(); - }); - }); - - loaded += value.length; - const progress = total ? (loaded / total) * 100 : 0; - - progress_callback?.({ progress, loaded, total }); - } - - fileStream.close(); - } catch (error) { - // Clean up the file if an error occurred during download - try { - await fs.promises.unlink(filePath); - } catch {} - throw error; - } - } - - /** - * Deletes the cache entry for the given request. - * @param {string} request - * @returns {Promise} A Promise that resolves to `true` if the cache entry was deleted, `false` otherwise. - */ - async delete(request) { - let filePath = path.join(this.path, request); - - try { - await fs.promises.unlink(filePath); - return true; - } catch (error) { - // File doesn't exist or couldn't be deleted - return false; - } - } - - // TODO add the rest? - // addAll(requests: RequestInfo[]): Promise; - // keys(request?: RequestInfo | URL, options?: CacheQueryOptions): Promise>; - // matchAll(request?: RequestInfo | URL, options?: CacheQueryOptions): Promise>; -} diff --git a/packages/transformers/src/utils/hub/utils.js b/packages/transformers/src/utils/hub/utils.js index 71f5cb415..f95573ca0 100644 --- a/packages/transformers/src/utils/hub/utils.js +++ b/packages/transformers/src/utils/hub/utils.js @@ -80,7 +80,7 @@ export function handleError(status, remoteURL, fatal) { /** * Read and track progress when reading a Response object * - * @param {Response|import('./files.js').FileResponse} response The Response object to read + * @param {Response|import('./FileResponse.js').FileResponse} response The Response object to read * @param {(data: {progress: number, loaded: number, total: number}) => void} progress_callback The function to call with progress updates * @param {number} [expectedSize] The expected size of the file (used when content-length header is missing) * @returns {Promise} A Promise that resolves with the Uint8Array buffer