Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 27 additions & 27 deletions lib/partner/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import type {
Balance,
Claim as TransferRequest,
} from "@/lib/vercel/schemas";
import { kv } from "@vercel/kv";
import { compact } from "lodash";
import { redis} from '../redis';
import {
getInvoice,
importResource as importResourceToVercelApi,
Expand Down Expand Up @@ -92,7 +92,7 @@ export async function installIntegration(
installationId: string,
request: InstallIntegrationRequest & { type: "marketplace" | "external" },
): Promise<void> {
const pipeline = kv.pipeline();
const pipeline = redis.pipeline();
await pipeline.set(installationId, request);
await pipeline.lrem("installations", 0, installationId);
await pipeline.lpush("installations", installationId);
Expand All @@ -104,7 +104,7 @@ export async function updateInstallation(
billingPlanId: string,
): Promise<void> {
const installation = await getInstallation(installationId);
const pipeline = kv.pipeline();
const pipeline = redis.pipeline();
await pipeline.set(installationId, { ...installation, billingPlanId });
await pipeline.exec();
}
Expand All @@ -116,7 +116,7 @@ export async function uninstallInstallation(
if (!installation || installation.deletedAt) {
return undefined;
}
const pipeline = kv.pipeline();
const pipeline = redis.pipeline();
await pipeline.set(installationId, {
...installation,
deletedAt: Date.now(),
Expand All @@ -130,7 +130,7 @@ export async function uninstallInstallation(
}

export async function listInstallations(): Promise<string[]> {
const installationIds = await kv.lrange("installations", 0, -1);
const installationIds = await redis.lrange("installations", 0, -1);
return installationIds;
}

Expand All @@ -151,11 +151,11 @@ export async function provisionResource(
productId: request.productId,
} satisfies Resource;

await kv.set(
await redis.set(
`${installationId}:resource:${resource.id}`,
serializeResource(resource),
);
await kv.lpush(`${installationId}:resources`, resource.id);
await redis.lpush(`${installationId}:resources`, resource.id);
await updateInstallation(installationId, request.billingPlanId);

const currentDate = new Date().toISOString();
Expand Down Expand Up @@ -200,7 +200,7 @@ export async function updateResource(
: resource.billingPlan,
};

await kv.set(
await redis.set(
`${installationId}:resource:${resourceId}`,
serializeResource(nextResource),
);
Expand All @@ -215,11 +215,11 @@ export async function transferResource(installationId: string, resourceId: strin
throw new Error(`Cannot find resource ${resourceId}`);
}

await kv.set(
await redis.set(
`${targetInstallationId}:resource:${resourceId}`,
serializeResource(resource),
);
await kv.del(`${installationId}:resource:${resourceId}`);
await redis.del(`${installationId}:resource:${resourceId}`);
}

export async function updateResourceNotification(
Expand All @@ -233,7 +233,7 @@ export async function updateResourceNotification(
throw new Error(`Cannot find resource ${resourceId}`);
}

await kv.set(
await redis.set(
`${installationId}:resource:${resourceId}`,
serializeResource({
...resource,
Expand All @@ -253,7 +253,7 @@ export async function deleteResource(
installationId: string,
resourceId: string,
): Promise<void> {
const pipeline = kv.pipeline();
const pipeline = redis.pipeline();
pipeline.del(`${installationId}:resource:${resourceId}`);
pipeline.lrem(`${installationId}:resources`, 0, resourceId);
await pipeline.exec();
Expand All @@ -265,13 +265,13 @@ export async function listResources(
): Promise<ListResourcesResponse> {
const resourceIds = targetResourceIds?.length
? targetResourceIds
: await kv.lrange(`${installationId}:resources`, 0, -1);
: await redis.lrange(`${installationId}:resources`, 0, -1);

if (resourceIds.length === 0) {
return { resources: [] };
}

const pipeline = kv.pipeline();
const pipeline = redis.pipeline();

for (const resourceId of resourceIds) {
pipeline.get(`${installationId}:resource:${resourceId}`);
Expand All @@ -288,7 +288,7 @@ export async function getResource(
installationId: string,
resourceId: string,
): Promise<GetResourceResponse | null> {
const resource = await kv.get<SerializedResource>(
const resource = await redis.get<SerializedResource>(
`${installationId}:resource:${resourceId}`,
);

Expand Down Expand Up @@ -366,7 +366,7 @@ export async function provisionPurchase(
const balances: Record<string, Balance> = {};

for (const item of invoice.items ?? []) {
const amountInCents = Math.floor(parseFloat(item.total) * 100);
const amountInCents = Math.floor(Number.parseFloat(item.total) * 100);
if (item.resourceId) {
const balance = await addResourceBalanceInternal(
installationId,
Expand All @@ -393,7 +393,7 @@ export async function addInstallationBalanceInternal(
installationId: string,
currencyValueInCents: number,
): Promise<Balance> {
const result = await kv.incrby(
const result = await redis.incrby(
`${installationId}:balance`,
currencyValueInCents,
);
Expand All @@ -407,7 +407,7 @@ export async function addInstallationBalanceInternal(
export async function getInstallationBalance(
installationId: string,
): Promise<Balance | null> {
const result = await kv.get<number>(`${installationId}:balance`);
const result = await redis.get<number>(`${installationId}:balance`);
if (result === null) {
return null;
}
Expand All @@ -423,7 +423,7 @@ export async function addResourceBalanceInternal(
resourceId: string,
currencyValueInCents: number,
): Promise<Balance> {
const result = await kv.incrby(
const result = await redis.incrby(
`${installationId}:${resourceId}:balance`,
currencyValueInCents,
);
Expand All @@ -439,7 +439,7 @@ export async function getResourceBalance(
installationId: string,
resourceId: string,
): Promise<Balance | null> {
const result = await kv.get<number>(
const result = await redis.get<number>(
`${installationId}:${resourceId}:balance`,
);
if (result === null) {
Expand Down Expand Up @@ -524,7 +524,7 @@ export async function getInstallation(installationId: string): Promise<
notification?: Notification;
}
> {
const installation = await kv.get<
const installation = await redis.get<
InstallIntegrationRequest & {
type: "marketplace" | "external";
billingPlanId: string;
Expand All @@ -545,7 +545,7 @@ export async function setInstallationNotification(
notification: Notification | undefined | null,
): Promise<void> {
const installation = await getInstallation(installationId);
const pipeline = kv.pipeline();
const pipeline = redis.pipeline();
await pipeline.set(installationId, {
...installation,
notification: notification ?? undefined,
Expand All @@ -556,30 +556,30 @@ export async function setInstallationNotification(
export async function storeWebhookEvent(
event: WebhookEvent | UnknownWebhookEvent,
): Promise<void> {
const pipeline = kv.pipeline();
const pipeline = redis.pipeline();
await pipeline.lpush("webhook_events", event);
await pipeline.ltrim("webhook_events", 0, 100);
await pipeline.exec();
}

export async function getWebhookEvents(limit = 100): Promise<WebhookEvent[]> {
return (await kv.lrange<WebhookEvent>("webhook_events", 0, limit)).sort(
return (await redis.lrange<WebhookEvent>("webhook_events", 0, limit)).sort(
(a, b) => b.createdAt - a.createdAt,
);
}

export async function getTransferRequest(
transferId: string,
): Promise<TransferRequest | null> {
return await kv.get<TransferRequest>(
return await redis.get<TransferRequest>(
`transfer-request:${transferId}`,
);
}

export async function setTransferRequest(
transferRequest: TransferRequest,
): Promise<'OK' | TransferRequest | null> {
return kv.set<TransferRequest>(
return redis.set<TransferRequest>(
`transfer-request:${transferRequest.transferId}`,
transferRequest,
);
Expand All @@ -588,7 +588,7 @@ export async function setTransferRequest(
export async function daleteTransferRequest(
transferRequest: TransferRequest,
): Promise<number> {
return kv.del(
return redis.del(
`transfer-request:${transferRequest.transferId}`,
);
}
9 changes: 9 additions & 0 deletions lib/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Redis } from "@upstash/redis";

// Redis initialization may vary based on provider or other considerations
// See, for instance Redis.fromEnv() on the upstash client
// which defaults to `UPSTASH_REDIS_REST_URL` and `UPSTASH_REDIS_REST_TOKEN`
export const redis = new Redis({
url: process.env.KV_REST_API_URL,
token: process.env.KV_REST_API_TOKEN,
});
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"lint": "next lint"
},
"dependencies": {
"@vercel/kv": "^1.0.1",
"@upstash/redis": "^1.35.3",
"jose": "^5.2.4",
"lodash": "^4.17.21",
"nanoid": "^5.0.7",
Expand Down
Loading