| 
1 | 1 | import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache";  | 
2 |  | -import type { CacheValue } from "types/overrides";  | 
3 | 2 | import { writeTags } from "utils/cache";  | 
4 | 3 | import { fromReadableStream, toReadableStream } from "utils/stream";  | 
5 | 4 | import { debug } from "./logger";  | 
6 | 5 | 
 
  | 
7 |  | -const pendingWritePromiseMap = new Map<  | 
8 |  | -  string,  | 
9 |  | -  Promise<CacheValue<"composable">>  | 
10 |  | ->();  | 
 | 6 | +const pendingWritePromiseMap = new Map<string, Promise<ComposableCacheEntry>>();  | 
11 | 7 | 
 
  | 
12 | 8 | export default {  | 
13 | 9 |   async get(cacheKey: string) {  | 
14 | 10 |     try {  | 
15 | 11 |       // We first check if we have a pending write for this cache key  | 
16 | 12 |       // If we do, we return the pending promise instead of fetching the cache  | 
17 |  | -      if (pendingWritePromiseMap.has(cacheKey)) {  | 
18 |  | -        const stored = pendingWritePromiseMap.get(cacheKey);  | 
19 |  | -        if (stored) {  | 
20 |  | -          return stored.then((entry) => ({  | 
21 |  | -            ...entry,  | 
22 |  | -            value: toReadableStream(entry.value),  | 
23 |  | -          }));  | 
24 |  | -        }  | 
25 |  | -      }  | 
 | 13 | +      const stored = pendingWritePromiseMap.get(cacheKey);  | 
 | 14 | +      if (stored) return stored;  | 
 | 15 | + | 
26 | 16 |       const result = await globalThis.incrementalCache.get(  | 
27 | 17 |         cacheKey,  | 
28 | 18 |         "composable",  | 
@@ -69,28 +59,45 @@ export default {  | 
69 | 59 |   },  | 
70 | 60 | 
 
  | 
71 | 61 |   async set(cacheKey: string, pendingEntry: Promise<ComposableCacheEntry>) {  | 
72 |  | -    const promiseEntry = pendingEntry.then(async (entry) => ({  | 
73 |  | -      ...entry,  | 
74 |  | -      value: await fromReadableStream(entry.value),  | 
75 |  | -    }));  | 
76 |  | -    pendingWritePromiseMap.set(cacheKey, promiseEntry);  | 
 | 62 | +    const teedPromise = pendingEntry.then((entry) => {  | 
 | 63 | +      // Optimization: We avoid consuming and stringifying the stream here,  | 
 | 64 | +      // because it creates double copies just to be discarded when this function  | 
 | 65 | +      // ends. This avoids unnecessary memory usage, and reduces GC pressure.  | 
 | 66 | +      const [stream1, stream2] = entry.value.tee();  | 
 | 67 | +      return [  | 
 | 68 | +        { ...entry, value: stream1 },  | 
 | 69 | +        { ...entry, value: stream2 },  | 
 | 70 | +      ] as const;  | 
 | 71 | +    });  | 
 | 72 | + | 
 | 73 | +    pendingWritePromiseMap.set(  | 
 | 74 | +      cacheKey,  | 
 | 75 | +      teedPromise.then(([entry]) => entry),  | 
 | 76 | +    );  | 
77 | 77 | 
 
  | 
78 |  | -    const entry = await promiseEntry.finally(() => {  | 
 | 78 | +    const [, entryForStorage] = await teedPromise.finally(() => {  | 
79 | 79 |       pendingWritePromiseMap.delete(cacheKey);  | 
80 | 80 |     });  | 
 | 81 | + | 
81 | 82 |     await globalThis.incrementalCache.set(  | 
82 | 83 |       cacheKey,  | 
83 | 84 |       {  | 
84 |  | -        ...entry,  | 
85 |  | -        value: entry.value,  | 
 | 85 | +        ...entryForStorage,  | 
 | 86 | +        value: await fromReadableStream(entryForStorage.value),  | 
86 | 87 |       },  | 
87 | 88 |       "composable",  | 
88 | 89 |     );  | 
 | 90 | + | 
89 | 91 |     if (globalThis.tagCache.mode === "original") {  | 
90 | 92 |       const storedTags = await globalThis.tagCache.getByPath(cacheKey);  | 
91 |  | -      const tagsToWrite = entry.tags.filter((tag) => !storedTags.includes(tag));  | 
 | 93 | +      const tagsToWrite = [];  | 
 | 94 | +      for (const tag of entryForStorage.tags) {  | 
 | 95 | +        if (!storedTags.includes(tag)) {  | 
 | 96 | +          tagsToWrite.push({ tag, path: cacheKey });  | 
 | 97 | +        }  | 
 | 98 | +      }  | 
92 | 99 |       if (tagsToWrite.length > 0) {  | 
93 |  | -        await writeTags(tagsToWrite.map((tag) => ({ tag, path: cacheKey })));  | 
 | 100 | +        await writeTags(tagsToWrite);  | 
94 | 101 |       }  | 
95 | 102 |     }  | 
96 | 103 |   },  | 
@@ -125,17 +132,14 @@ export default {  | 
125 | 132 |         }));  | 
126 | 133 |       }),  | 
127 | 134 |     );  | 
128 |  | -    // We need to deduplicate paths, we use a set for that  | 
129 |  | -    const setToWrite = new Set<{ path: string; tag: string }>();  | 
 | 135 | + | 
 | 136 | +    const dedupeMap = new Map();  | 
130 | 137 |     for (const entry of pathsToUpdate.flat()) {  | 
131 |  | -      setToWrite.add(entry);  | 
 | 138 | +      dedupeMap.set(`${entry.path}|${entry.tag}`, entry);  | 
132 | 139 |     }  | 
133 |  | -    await writeTags(Array.from(setToWrite));  | 
 | 140 | +    await writeTags(Array.from(dedupeMap.values()));  | 
134 | 141 |   },  | 
135 | 142 | 
 
  | 
136 | 143 |   // This one is necessary for older versions of next  | 
137 |  | -  async receiveExpiredTags(...tags: string[]) {  | 
138 |  | -    // This function does absolutely nothing  | 
139 |  | -    return;  | 
140 |  | -  },  | 
 | 144 | +  async receiveExpiredTags() {},  | 
141 | 145 | } satisfies ComposableCacheHandler;  | 
0 commit comments