diff --git a/packages/appkit/src/type-generator/cache.ts b/packages/appkit/src/type-generator/cache.ts index 3135e2d0..8b22ca96 100644 --- a/packages/appkit/src/type-generator/cache.ts +++ b/packages/appkit/src/type-generator/cache.ts @@ -1,5 +1,5 @@ import crypto from "node:crypto"; -import fs from "node:fs"; +import fs from "node:fs/promises"; import path from "node:path"; import { createLogger } from "../logging/logger"; @@ -50,31 +50,29 @@ export function hashSQL(sql: string): string { * If the cache is not found, run the query explain * @returns - the cache */ -export function loadCache(): Cache { +export async function loadCache(): Promise { const cachePath = path.join(CACHE_DIR, CACHE_FILE); try { - if (!fs.existsSync(CACHE_DIR)) { - fs.mkdirSync(CACHE_DIR, { recursive: true }); - } + await fs.mkdir(CACHE_DIR, { recursive: true }); - if (fs.existsSync(cachePath)) { - const cache = JSON.parse(fs.readFileSync(cachePath, "utf8")) as Cache; - if (cache.version === CACHE_VERSION) { - return cache; - } + const raw = await fs.readFile(cachePath, "utf8"); + const cache = JSON.parse(raw) as Cache; + if (cache.version === CACHE_VERSION) { + return cache; + } + } catch (err) { + if ((err as NodeJS.ErrnoException).code !== "ENOENT") { + logger.warn("Cache file is corrupted, flushing cache completely."); } - } catch { - logger.warn("Cache file is corrupted, flushing cache completely."); } return { version: CACHE_VERSION, queries: {} }; } /** * Save the cache to the file system - * The cache is saved as a JSON file, it is used to avoid running the query explain multiple times * @param cache - cache object to save */ -export function saveCache(cache: Cache): void { +export async function saveCache(cache: Cache): Promise { const cachePath = path.join(CACHE_DIR, CACHE_FILE); - fs.writeFileSync(cachePath, JSON.stringify(cache, null, 2), "utf8"); + await fs.writeFile(cachePath, JSON.stringify(cache, null, 2), "utf8"); } diff --git a/packages/appkit/src/type-generator/index.ts b/packages/appkit/src/type-generator/index.ts index c41bc577..5cdb65c3 100644 --- a/packages/appkit/src/type-generator/index.ts +++ b/packages/appkit/src/type-generator/index.ts @@ -1,4 +1,4 @@ -import fs from "node:fs"; +import fs from "node:fs/promises"; import dotenv from "dotenv"; import { createLogger } from "../logging/logger"; import { generateQueriesFromDescribe } from "./query-registry"; @@ -67,7 +67,7 @@ export async function generateFromEntryPoint(options: { const typeDeclarations = generateTypeDeclarations(queryRegistry); - fs.writeFileSync(outFile, typeDeclarations, "utf-8"); + await fs.writeFile(outFile, typeDeclarations, "utf-8"); logger.debug("Type generation complete!"); } diff --git a/packages/appkit/src/type-generator/query-registry.ts b/packages/appkit/src/type-generator/query-registry.ts index 2ba27584..3ef12abf 100644 --- a/packages/appkit/src/type-generator/query-registry.ts +++ b/packages/appkit/src/type-generator/query-registry.ts @@ -1,6 +1,7 @@ -import fs from "node:fs"; +import fs from "node:fs/promises"; import path from "node:path"; import { WorkspaceClient } from "@databricks/sdk-experimental"; +import pc from "picocolors"; import { createLogger } from "../logging/logger"; import { CACHE_VERSION, hashSQL, loadCache, saveCache } from "./cache"; import { Spinner } from "./spinner"; @@ -13,6 +14,29 @@ import { const logger = createLogger("type-generator:query-registry"); +/** + * Parse a raw API/SDK error into a structured code + message. + * Handles Databricks-style JSON bodies embedded in the message string, + * e.g. `Response from server (Bad Request) {"error_code":"...","message":"..."}`. + */ +function parseError(raw: string): { code?: string; message: string } { + const jsonMatch = raw.match(/\{[\s\S]*\}/); + if (jsonMatch) { + try { + const parsed = JSON.parse(jsonMatch[0]); + if (parsed.error_code || parsed.message) { + return { + code: parsed.error_code, + message: parsed.message || raw, + }; + } + } catch { + // not valid JSON, fall through + } + } + return { message: raw }; +} + /** * Extract parameters from a SQL query * @param sql - the SQL query to extract parameters from @@ -116,19 +140,6 @@ function generateUnknownResultQuery(sql: string, queryName: string): string { }`; } -function cacheFailedQuery( - cache: ReturnType, - querySchemas: QuerySchema[], - sql: string, - queryName: string, - sqlHash: string, -): void { - const type = generateUnknownResultQuery(sql, queryName); - querySchemas.push({ name: queryName, type }); - cache.queries[queryName] = { hash: sqlHash, type, retry: true }; - saveCache(cache); -} - export function extractParameterTypes(sql: string): Record { const paramTypes: Record = {}; const regex = @@ -154,88 +165,246 @@ export function extractParameterTypes(sql: string): Record { export async function generateQueriesFromDescribe( queryFolder: string, warehouseId: string, - options: { noCache?: boolean } = {}, + options: { noCache?: boolean; concurrency?: number } = {}, ): Promise { - const { noCache = false } = options; - - // read all query files in the folder - const queryFiles = fs - .readdirSync(queryFolder) - .filter((file) => file.endsWith(".sql")); - + const { noCache = false, concurrency: rawConcurrency = 10 } = options; + const concurrency = + typeof rawConcurrency === "number" && Number.isFinite(rawConcurrency) + ? Math.max(1, Math.floor(rawConcurrency)) + : 10; + + // read all query files and cache in parallel + const [allFiles, cache] = await Promise.all([ + fs.readdir(queryFolder), + noCache + ? ({ version: CACHE_VERSION, queries: {} } as Awaited< + ReturnType + >) + : loadCache(), + ]); + + const queryFiles = allFiles.filter((file) => file.endsWith(".sql")); logger.debug("Found %d SQL queries", queryFiles.length); - // load cache - const cache = noCache ? { version: CACHE_VERSION, queries: {} } : loadCache(); - const client = new WorkspaceClient({}); - const querySchemas: QuerySchema[] = []; const spinner = new Spinner(); - // process each query file + // Read all SQL files in parallel + const sqlContents = await Promise.all( + queryFiles.map((file) => fs.readFile(path.join(queryFolder, file), "utf8")), + ); + + const startTime = performance.now(); + + // Phase 1: Check cache, separate cached vs uncached + const cachedResults: Array<{ index: number; schema: QuerySchema }> = []; + const uncachedQueries: Array<{ + index: number; + queryName: string; + sql: string; + sqlHash: string; + cleanedSql: string; + }> = []; + const logEntries: Array<{ + queryName: string; + status: "HIT" | "MISS"; + failed?: boolean; + error?: { code?: string; message: string }; + }> = []; + for (let i = 0; i < queryFiles.length; i++) { const file = queryFiles[i]; const rawName = path.basename(file, ".sql"); const queryName = normalizeQueryName(rawName); - // read query file content - const sql = fs.readFileSync(path.join(queryFolder, file), "utf8"); + const sql = sqlContents[i]; const sqlHash = hashSQL(sql); - // check cache (skip if marked for retry after a failed DESCRIBE) const cached = cache.queries[queryName]; if (cached && cached.hash === sqlHash && !cached.retry) { - querySchemas.push({ name: queryName, type: cached.type }); - spinner.start(`Processing ${queryName} (${i + 1}/${queryFiles.length})`); - spinner.stop(`✓ ${queryName} (cached)`); - continue; + cachedResults.push({ + index: i, + schema: { name: queryName, type: cached.type }, + }); + logEntries.push({ queryName, status: "HIT" }); + } else { + const sqlWithDefaults = sql.replace(/:([a-zA-Z_]\w*)/g, "''"); + const cleanedSql = sqlWithDefaults.trim().replace(/;\s*$/, ""); + uncachedQueries.push({ index: i, queryName, sql, sqlHash, cleanedSql }); } + } - spinner.start(`Processing ${queryName} (${i + 1}/${queryFiles.length})`); - - const sqlWithDefaults = sql.replace(/:([a-zA-Z_]\w*)/g, "''"); - - // strip trailing semicolon for DESCRIBE QUERY - const cleanedSql = sqlWithDefaults.trim().replace(/;\s*$/, ""); - - // execute DESCRIBE QUERY to get schema without running the actual query - try { + // Phase 2: Execute all uncached DESCRIBE calls in parallel + type DescribeResult = + | { + status: "ok"; + index: number; + schema: QuerySchema; + cacheEntry: { hash: string; type: string; retry: boolean }; + } + | { + status: "fail"; + index: number; + schema: QuerySchema; + cacheEntry: { hash: string; type: string; retry: boolean }; + error: { code?: string; message: string }; + }; + + const freshResults: Array<{ index: number; schema: QuerySchema }> = []; + + if (uncachedQueries.length > 0) { + let completed = 0; + const total = uncachedQueries.length; + spinner.start( + `Describing ${total} ${total === 1 ? "query" : "queries"} (0/${total})`, + ); + + const describeOne = async ({ + index, + queryName, + sql, + sqlHash, + cleanedSql, + }: (typeof uncachedQueries)[number]): Promise => { const result = (await client.statementExecution.executeStatement({ statement: `DESCRIBE QUERY ${cleanedSql}`, warehouse_id: warehouseId, })) as DatabricksStatementExecutionResponse; + completed++; + spinner.update( + `Describing ${total} ${total === 1 ? "query" : "queries"} (${completed}/${total})`, + ); + + logger.debug( + "DESCRIBE result for %s: state=%s, rows=%d", + queryName, + result.status.state, + result.result?.data_array?.length ?? 0, + ); + if (result.status.state === "FAILED") { const sqlError = result.status.error?.message || "Query execution failed"; - cacheFailedQuery(cache, querySchemas, sql, queryName, sqlHash); - spinner.stop(`✗ ${queryName} - failed`); - spinner.printDetail(`SQL Error: ${sqlError}`); - spinner.printDetail(`Query: ${cleanedSql.slice(0, 200)}`); - continue; + logger.warn("DESCRIBE failed for %s: %s", queryName, sqlError); + const type = generateUnknownResultQuery(sql, queryName); + return { + status: "fail", + index, + schema: { name: queryName, type }, + cacheEntry: { hash: sqlHash, type, retry: true }, + error: parseError(sqlError), + }; } - // convert result to query schema const { type, hasResults } = convertToQueryType(result, sql, queryName); - querySchemas.push({ name: queryName, type }); - - // update cache immediately so successful results survive partial failures - // retry if DESCRIBE returned no columns (result: unknown) - const retry = !hasResults; - cache.queries[queryName] = { hash: sqlHash, type, retry }; - saveCache(cache); - - spinner.stop(`✓ ${queryName}`); - } catch (error) { - const errorMessage = - error instanceof Error ? error.message : "Unknown error"; - spinner.stop(`✗ ${queryName}`); - spinner.printDetail(errorMessage); - cacheFailedQuery(cache, querySchemas, sql, queryName, sqlHash); + return { + status: "ok", + index, + schema: { name: queryName, type }, + cacheEntry: { hash: sqlHash, type, retry: !hasResults }, + }; + }; + + // Process in chunks, saving cache after each chunk + const processBatchResults = ( + settled: PromiseSettledResult[], + batchOffset: number, + ) => { + for (let i = 0; i < settled.length; i++) { + const entry = settled[i]; + const { queryName } = uncachedQueries[batchOffset + i]; + + if (entry.status === "fulfilled") { + const res = entry.value; + freshResults.push({ index: res.index, schema: res.schema }); + cache.queries[queryName] = res.cacheEntry; + logEntries.push({ + queryName, + status: "MISS", + failed: res.status === "fail", + error: res.status === "fail" ? res.error : undefined, + }); + } else { + const { sql, sqlHash, index } = uncachedQueries[batchOffset + i]; + const reason = + entry.reason instanceof Error + ? entry.reason.message + : String(entry.reason); + logger.warn("DESCRIBE rejected for %s: %s", queryName, reason); + const type = generateUnknownResultQuery(sql, queryName); + freshResults.push({ index, schema: { name: queryName, type } }); + cache.queries[queryName] = { hash: sqlHash, type, retry: true }; + logEntries.push({ + queryName, + status: "MISS", + failed: true, + error: parseError(reason), + }); + } + } + }; + + if (uncachedQueries.length > concurrency) { + for (let b = 0; b < uncachedQueries.length; b += concurrency) { + const batch = uncachedQueries.slice(b, b + concurrency); + const batchResults = await Promise.allSettled(batch.map(describeOne)); + processBatchResults(batchResults, b); + await saveCache(cache); + } + } else { + const settled = await Promise.allSettled( + uncachedQueries.map(describeOne), + ); + processBatchResults(settled, 0); + await saveCache(cache); + } + + spinner.stop(""); + } + + const elapsed = ((performance.now() - startTime) / 1000).toFixed(2); + + // Print formatted table + if (logEntries.length > 0) { + const maxNameLen = Math.max(...logEntries.map((e) => e.queryName.length)); + const separator = pc.dim("─".repeat(50)); + console.log(""); + console.log( + ` ${pc.bold("Typegen Queries")} ${pc.dim(`(${logEntries.length})`)}`, + ); + console.log(` ${separator}`); + for (const entry of logEntries) { + const tag = entry.failed + ? pc.bold(pc.red("ERROR")) + : entry.status === "HIT" + ? `cache ${pc.bold(pc.green("HIT "))}` + : `cache ${pc.bold(pc.yellow("MISS "))}`; + const rawName = entry.queryName.padEnd(maxNameLen); + const name = entry.failed ? pc.dim(pc.strikethrough(rawName)) : rawName; + const errorCode = entry.error?.message.match(/\[([^\]]+)\]/)?.[1]; + const reason = errorCode ? ` ${pc.dim(errorCode)}` : ""; + console.log(` ${tag} ${name}${reason}`); } + const newCount = logEntries.filter( + (e) => e.status === "MISS" && !e.failed, + ).length; + const cacheCount = logEntries.filter( + (e) => e.status === "HIT" && !e.failed, + ).length; + const errorCount = logEntries.filter((e) => e.failed).length; + console.log(` ${separator}`); + const parts = [`${newCount} new`, `${cacheCount} from cache`]; + if (errorCount > 0) + parts.push(`${errorCount} ${errorCount === 1 ? "error" : "errors"}`); + console.log(` ${parts.join(", ")}. ${pc.dim(`${elapsed}s`)}`); + console.log(""); } - return querySchemas; + // Merge and sort by original file index for deterministic output + return [...cachedResults, ...freshResults] + .sort((a, b) => a.index - b.index) + .map((r) => r.schema); } /** diff --git a/packages/appkit/src/type-generator/spinner.ts b/packages/appkit/src/type-generator/spinner.ts index 5a6e04b6..d5607f55 100644 --- a/packages/appkit/src/type-generator/spinner.ts +++ b/packages/appkit/src/type-generator/spinner.ts @@ -17,6 +17,10 @@ export class Spinner { }, 300); } + update(text: string) { + this.text = text; + } + stop(finalText?: string) { if (this.interval) { clearInterval(this.interval); @@ -27,6 +31,10 @@ export class Spinner { } printDetail(text: string) { - process.stdout.write(`\x1b[2m ${text}\x1b[0m\n`); + // Clear spinner line, print detail, then redraw spinner + process.stdout.write(`\x1b[2K\r\x1b[2m ${text}\x1b[0m\n`); + if (this.interval) { + process.stdout.write(` ${this.text}${this.frames[this.current]}`); + } } } diff --git a/packages/appkit/src/type-generator/tests/generate-queries.test.ts b/packages/appkit/src/type-generator/tests/generate-queries.test.ts index f4948e3c..ac43ef9e 100644 --- a/packages/appkit/src/type-generator/tests/generate-queries.test.ts +++ b/packages/appkit/src/type-generator/tests/generate-queries.test.ts @@ -1,8 +1,8 @@ import { beforeEach, describe, expect, test, vi } from "vitest"; const mocks = vi.hoisted(() => ({ - readdirSync: vi.fn(), - readFileSync: vi.fn(), + readdir: vi.fn(), + readFile: vi.fn(), executeStatement: vi.fn(), spinnerStop: vi.fn(), spinnerPrintDetail: vi.fn(), @@ -10,10 +10,10 @@ const mocks = vi.hoisted(() => ({ saveCache: vi.fn(), })); -vi.mock("node:fs", () => ({ +vi.mock("node:fs/promises", () => ({ default: { - readdirSync: mocks.readdirSync, - readFileSync: mocks.readFileSync, + readdir: mocks.readdir, + readFile: mocks.readFile, }, })); @@ -26,6 +26,7 @@ vi.mock("@databricks/sdk-experimental", () => ({ vi.mock("../spinner", () => ({ Spinner: vi.fn(() => ({ start: vi.fn(), + update: vi.fn(), stop: mocks.spinnerStop, printDetail: mocks.spinnerPrintDetail, })), @@ -52,8 +53,8 @@ describe("generateQueriesFromDescribe", () => { }); test("success path — returns query schema", async () => { - mocks.readdirSync.mockReturnValue(["users.sql"]); - mocks.readFileSync.mockReturnValue( + mocks.readdir.mockResolvedValue(["users.sql"]); + mocks.readFile.mockResolvedValue( "SELECT id, name FROM users WHERE status = :status", ); mocks.executeStatement.mockResolvedValue( @@ -69,13 +70,13 @@ describe("generateQueriesFromDescribe", () => { expect(schemas[0].name).toBe("users"); expect(schemas[0].type).toContain("id: number"); expect(schemas[0].type).toContain("name: string"); - expect(mocks.spinnerStop).toHaveBeenCalledWith("✓ users"); + expect(mocks.spinnerStop).toHaveBeenCalledWith(""); expect(mocks.saveCache).toHaveBeenCalledTimes(1); }); test("FAILED status with error message — reports SQL error and produces unknown result type", async () => { - mocks.readdirSync.mockReturnValue(["bad_table.sql"]); - mocks.readFileSync.mockReturnValue("SELECT * FROM bad_table"); + mocks.readdir.mockResolvedValue(["bad_table.sql"]); + mocks.readFile.mockResolvedValue("SELECT * FROM bad_table"); mocks.executeStatement.mockResolvedValue({ statement_id: "stmt-2", status: { @@ -89,19 +90,13 @@ describe("generateQueriesFromDescribe", () => { expect(schemas).toHaveLength(1); expect(schemas[0].name).toBe("bad_table"); expect(schemas[0].type).toContain("result: unknown"); - expect(mocks.spinnerStop).toHaveBeenCalledWith("✗ bad_table - failed"); - expect(mocks.spinnerPrintDetail).toHaveBeenCalledWith( - "SQL Error: Table or view not found: bad_table", - ); - expect(mocks.spinnerPrintDetail).toHaveBeenCalledWith( - expect.stringContaining("Query:"), - ); + expect(mocks.spinnerStop).toHaveBeenCalledWith(""); expect(mocks.saveCache).toHaveBeenCalledTimes(1); }); test("FAILED status without error message — uses fallback message and produces unknown result type", async () => { - mocks.readdirSync.mockReturnValue(["query.sql"]); - mocks.readFileSync.mockReturnValue("SELECT 1"); + mocks.readdir.mockResolvedValue(["query.sql"]); + mocks.readFile.mockResolvedValue("SELECT 1"); mocks.executeStatement.mockResolvedValue({ statement_id: "stmt-3", status: { state: "FAILED" }, @@ -112,18 +107,15 @@ describe("generateQueriesFromDescribe", () => { expect(schemas).toHaveLength(1); expect(schemas[0].name).toBe("query"); expect(schemas[0].type).toContain("result: unknown"); - expect(mocks.spinnerStop).toHaveBeenCalledWith("✗ query - failed"); - expect(mocks.spinnerPrintDetail).toHaveBeenCalledWith( - "SQL Error: Query execution failed", - ); + expect(mocks.spinnerStop).toHaveBeenCalledWith(""); expect(mocks.saveCache).toHaveBeenCalledTimes(1); }); test("partial failure — caches success, unknown result for failure, output includes both", async () => { - mocks.readdirSync.mockReturnValue(["good.sql", "bad.sql"]); - mocks.readFileSync - .mockReturnValueOnce("SELECT id FROM good_table WHERE status = :status") - .mockReturnValueOnce("SELECT * FROM missing_table"); + mocks.readdir.mockResolvedValue(["good.sql", "bad.sql"]); + mocks.readFile + .mockResolvedValueOnce("SELECT id FROM good_table WHERE status = :status") + .mockResolvedValueOnce("SELECT * FROM missing_table"); mocks.executeStatement .mockResolvedValueOnce(succeededResult([["id", "INT", null]])) @@ -147,15 +139,15 @@ describe("generateQueriesFromDescribe", () => { expect(schemas[1].name).toBe("bad"); expect(schemas[1].type).toContain("result: unknown"); - // saveCache called for both queries (success + failure with retry: true) - expect(mocks.saveCache).toHaveBeenCalledTimes(2); + // saveCache called once after all parallel queries complete + expect(mocks.saveCache).toHaveBeenCalledTimes(1); }); test("all queries fail — caches with retry flag, all unknown result types", async () => { - mocks.readdirSync.mockReturnValue(["a.sql", "b.sql"]); - mocks.readFileSync - .mockReturnValueOnce("SELECT * FROM table_a") - .mockReturnValueOnce("SELECT * FROM table_b"); + mocks.readdir.mockResolvedValue(["a.sql", "b.sql"]); + mocks.readFile + .mockResolvedValueOnce("SELECT * FROM table_a") + .mockResolvedValueOnce("SELECT * FROM table_b"); mocks.executeStatement .mockRejectedValueOnce(new Error("Connection refused")) @@ -172,12 +164,39 @@ describe("generateQueriesFromDescribe", () => { expect(schemas[1].name).toBe("b"); expect(schemas[1].type).toContain("result: unknown"); + // saveCache called once after all parallel queries complete + expect(mocks.saveCache).toHaveBeenCalledTimes(1); + }); + + test("concurrency batching — saves cache after each batch", async () => { + // 3 queries with concurrency=2 → 2 batches (2 + 1), saveCache called twice + mocks.readdir.mockResolvedValue(["q1.sql", "q2.sql", "q3.sql"]); + mocks.readFile + .mockResolvedValueOnce("SELECT id FROM t1") + .mockResolvedValueOnce("SELECT id FROM t2") + .mockResolvedValueOnce("SELECT id FROM t3"); + + mocks.executeStatement + .mockResolvedValueOnce(succeededResult([["id", "INT", null]])) + .mockResolvedValueOnce(succeededResult([["id", "INT", null]])) + .mockResolvedValueOnce(succeededResult([["id", "INT", null]])); + + const schemas = await generateQueriesFromDescribe("/queries", "wh-123", { + concurrency: 2, + }); + + expect(schemas).toHaveLength(3); + expect(schemas[0].name).toBe("q1"); + expect(schemas[1].name).toBe("q2"); + expect(schemas[2].name).toBe("q3"); + + // 2 batches → 2 saveCache calls expect(mocks.saveCache).toHaveBeenCalledTimes(2); }); test("unknown result type includes parameters from SQL", async () => { - mocks.readdirSync.mockReturnValue(["parameterized.sql"]); - mocks.readFileSync.mockReturnValue( + mocks.readdir.mockResolvedValue(["parameterized.sql"]); + mocks.readFile.mockResolvedValue( "-- @param status STRING\nSELECT * FROM t WHERE status = :status AND org = :org", ); mocks.executeStatement.mockRejectedValueOnce(new Error("timeout")); diff --git a/packages/appkit/src/type-generator/vite-plugin.ts b/packages/appkit/src/type-generator/vite-plugin.ts index 998daf0a..741308f2 100644 --- a/packages/appkit/src/type-generator/vite-plugin.ts +++ b/packages/appkit/src/type-generator/vite-plugin.ts @@ -1,4 +1,4 @@ -import fs from "node:fs"; +import { existsSync } from "node:fs"; import path from "node:path"; import type { Plugin } from "vite"; import { createLogger } from "../logging/logger"; @@ -62,7 +62,7 @@ export function appKitTypesPlugin(options?: AppKitTypesPluginOptions): Plugin { return false; } - if (!fs.existsSync(path.join(process.cwd(), "config", "queries"))) { + if (!existsSync(path.join(process.cwd(), "config", "queries"))) { return false; }