Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/transformers/src/backends/utils/cacheWasm.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { env } from '../../env.js';
/**
* Loads and caches a file from the given URL.
* @param {string} url The URL of the file to load.
* @returns {Promise<Response|import('../../utils/hub/files.js').FileResponse|null|string>} The response object, or null if loading failed.
* @returns {Promise<Response|import('../../utils/hub/FileResponse.js').FileResponse|null|string>} The response object, or null if loading failed.
*/
async function loadAndCacheFile(url) {
const fileName = url.split('/').pop();
Expand Down
6 changes: 3 additions & 3 deletions packages/transformers/src/utils/cache.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { apis, env } from '../env.js';
import { FileCache } from './hub/files.js';
import { FileCache } from './cache/FileCache.js';
import { logger } from './logger.js';

/**
* @typedef {Object} CacheInterface
* @property {(request: string) => Promise<Response|import('./hub/files.js').FileResponse|undefined|string>} match
* @property {(request: string) => Promise<Response|import('./hub/FileResponse.js').FileResponse|undefined|string>} match
* Checks if a request is in the cache and returns the cached response if found.
* @property {(request: string, response: Response, progress_callback?: (data: {progress: number, loaded: number, total: number}) => void) => Promise<void>} put
* Adds a response to the cache.
Expand Down Expand Up @@ -70,7 +70,7 @@ export async function getCache(file_cache_dir = null) {
* Searches the cache for any of the provided names and returns the first match found.
* @param {CacheInterface} cache The cache to search
* @param {...string} names The names of the items to search for
* @returns {Promise<import('./hub/files.js').FileResponse|Response|undefined|string>} The item from the cache, or undefined if not found.
* @returns {Promise<import('./hub/FileResponse.js').FileResponse|Response|undefined|string>} The item from the cache, or undefined if not found.
*/
export async function tryCache(cache, ...names) {
for (let name of names) {
Expand Down
118 changes: 118 additions & 0 deletions packages/transformers/src/utils/cache/FileCache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import fs from 'node:fs';
import path from 'node:path';

import { FileResponse } from '../hub/FileResponse.js';

/**
* File system cache implementation that implements the CacheInterface.
* Provides `match` and `put` methods compatible with the Web Cache API.
*/
export class FileCache {
/**
* Instantiate a `FileCache` object.
* @param {string} path
*/
constructor(path) {
this.path = path;
}

/**
* Checks whether the given request is in the cache.
* @param {string} request
* @returns {Promise<FileResponse | undefined>}
*/
async match(request) {
let filePath = path.join(this.path, request);
let file = new FileResponse(filePath);

if (file.exists) {
return file;
} else {
return undefined;
}
}

/**
* Adds the given response to the cache.
* @param {string} request
* @param {Response} response
* @param {(data: {progress: number, loaded: number, total: number}) => void} [progress_callback] Optional.
* The function to call with progress updates
* @returns {Promise<void>}
*/
async put(request, response, progress_callback = undefined) {
let filePath = path.join(this.path, request);
// Include both PID and a random suffix so that concurrent put() call within the same process (e.g. multiple pipelines loading the same file in parallel) each get their own temp file and don't corrupt each other's writes.
let tmpPath = filePath + `.tmp.${process.pid}.${Math.random().toString(36).slice(2)}`;

try {
const contentLength = response.headers.get('Content-Length');
const total = parseInt(contentLength ?? '0');
let loaded = 0;

await fs.promises.mkdir(path.dirname(filePath), { recursive: true });
const fileStream = fs.createWriteStream(tmpPath);
const reader = response.body.getReader();

while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}

await new Promise((resolve, reject) => {
fileStream.write(value, (err) => {
if (err) {
reject(err);
return;
}
resolve();
});
});

loaded += value.length;
const progress = total ? (loaded / total) * 100 : 0;

progress_callback?.({ progress, loaded, total });
}

await new Promise((resolve, reject) => {
fileStream.close((err) => (err ? reject(err) : resolve()));
});

// Atomically move the completed temp file to the final path so that
// concurrent readers (other processes or other in-process calls)
// never observe a partially-written file.
await fs.promises.rename(tmpPath, filePath);
} catch (error) {
// Clean up the temp file if an error occurred during download
try {
await fs.promises.unlink(tmpPath);
} catch {}
throw error;
}
}

/**
* Deletes the cache entry for the given request.
* @param {string} request
* @returns {Promise<boolean>} A Promise that resolves to `true` if the cache entry was deleted, `false` otherwise.
*/
async delete(request) {
let filePath = path.join(this.path, request);

try {
await fs.promises.unlink(filePath);
return true;
} catch (error) {
// File doesn't exist or couldn't be deleted
return false;
}
}

// TODO add the rest?
// addAll(requests: RequestInfo[]): Promise<void>;
// keys(request?: RequestInfo | URL, options?: CacheQueryOptions): Promise<ReadonlyArray<Request>>;
// match(request: RequestInfo | URL, options?: CacheQueryOptions): Promise<Response | undefined>;
// matchAll(request?: RequestInfo | URL, options?: CacheQueryOptions): Promise<ReadonlyArray<Response>>;
}
9 changes: 5 additions & 4 deletions packages/transformers/src/utils/hub.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

import { apis, env } from '../env.js';
import { dispatchCallback } from './core.js';
import { FileResponse, FileCache } from './hub/files.js';
import { FileResponse } from './hub/FileResponse.js';
import { FileCache } from './cache/FileCache.js';
import { handleError, isValidUrl, pathJoin, isValidHfModelId, readResponse } from './hub/utils.js';
import { getCache, tryCache } from './cache.js';
import { get_file_metadata } from './model_registry/get_file_metadata.js';
Expand Down Expand Up @@ -160,7 +161,7 @@ export function buildResourcePaths(path_or_repo_id, filename, options = {}, cach
* @param {import('./cache.js').CacheInterface | null} cache The cache instance to check.
* @param {string} localPath The local path to try first.
* @param {string} proposedCacheKey The proposed cache key to try second.
* @returns {Promise<Response|import('./hub/files.js').FileResponse|undefined|string>}
* @returns {Promise<Response|import('./hub/FileResponse.js').FileResponse|undefined|string>}
* The cached response if found, undefined otherwise.
*/
export async function checkCachedResource(cache, localPath, proposedCacheKey) {
Expand All @@ -182,7 +183,7 @@ export async function checkCachedResource(cache, localPath, proposedCacheKey) {
* @param {string} filename The name of the file to cache.
* @param {import('./cache.js').CacheInterface} cache The cache instance to store in.
* @param {string} cacheKey The cache key to use.
* @param {Response|import('./hub/files.js').FileResponse} response The response to cache.
* @param {Response|import('./hub/FileResponse.js').FileResponse} response The response to cache.
* @param {Uint8Array} [result] The result buffer if already read.
* @param {PretrainedOptions} [options] Options containing progress callback and context for progress updates.
* @returns {Promise<void>}
Expand Down Expand Up @@ -259,7 +260,7 @@ export async function loadResourceFile(
// Whether to cache the final response in the end.
let toCacheResponse = false;

/** @type {Response|import('./hub/files.js').FileResponse|undefined|string} */
/** @type {Response|import('./hub/FileResponse.js').FileResponse|undefined|string} */
let response;

// Check cache
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import fs from 'node:fs';
import path from 'node:path';

/**
* Mapping from file extensions to MIME types.
Expand Down Expand Up @@ -120,107 +119,3 @@ export class FileResponse {
return JSON.parse(await this.text());
}
}

/**
* File system cache implementation that implements the CacheInterface.
* Provides `match` and `put` methods compatible with the Web Cache API.
*/
export class FileCache {
/**
* Instantiate a `FileCache` object.
* @param {string} path
*/
constructor(path) {
this.path = path;
}

/**
* Checks whether the given request is in the cache.
* @param {string} request
* @returns {Promise<FileResponse | undefined>}
*/
async match(request) {
let filePath = path.join(this.path, request);
let file = new FileResponse(filePath);

if (file.exists) {
return file;
} else {
return undefined;
}
}

/**
* Adds the given response to the cache.
* @param {string} request
* @param {Response} response
* @param {(data: {progress: number, loaded: number, total: number}) => void} [progress_callback] Optional.
* The function to call with progress updates
* @returns {Promise<void>}
*/
async put(request, response, progress_callback = undefined) {
let filePath = path.join(this.path, request);

try {
const contentLength = response.headers.get('Content-Length');
const total = parseInt(contentLength ?? '0');
let loaded = 0;

await fs.promises.mkdir(path.dirname(filePath), { recursive: true });
const fileStream = fs.createWriteStream(filePath);
const reader = response.body.getReader();

while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}

await new Promise((resolve, reject) => {
fileStream.write(value, (err) => {
if (err) {
reject(err);
return;
}
resolve();
});
});

loaded += value.length;
const progress = total ? (loaded / total) * 100 : 0;

progress_callback?.({ progress, loaded, total });
}

fileStream.close();
} catch (error) {
// Clean up the file if an error occurred during download
try {
await fs.promises.unlink(filePath);
} catch {}
throw error;
}
}

/**
* Deletes the cache entry for the given request.
* @param {string} request
* @returns {Promise<boolean>} A Promise that resolves to `true` if the cache entry was deleted, `false` otherwise.
*/
async delete(request) {
let filePath = path.join(this.path, request);

try {
await fs.promises.unlink(filePath);
return true;
} catch (error) {
// File doesn't exist or couldn't be deleted
return false;
}
}

// TODO add the rest?
// addAll(requests: RequestInfo[]): Promise<void>;
// keys(request?: RequestInfo | URL, options?: CacheQueryOptions): Promise<ReadonlyArray<Request>>;
// matchAll(request?: RequestInfo | URL, options?: CacheQueryOptions): Promise<ReadonlyArray<Response>>;
}
2 changes: 1 addition & 1 deletion packages/transformers/src/utils/hub/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export function handleError(status, remoteURL, fatal) {
/**
* Read and track progress when reading a Response object
*
* @param {Response|import('./files.js').FileResponse} response The Response object to read
* @param {Response|import('./FileResponse.js').FileResponse} response The Response object to read
* @param {(data: {progress: number, loaded: number, total: number}) => void} progress_callback The function to call with progress updates
* @param {number} [expectedSize] The expected size of the file (used when content-length header is missing)
* @returns {Promise<Uint8Array>} A Promise that resolves with the Uint8Array buffer
Expand Down