Skip to content

Commit be86b3e

Browse files
committed
1.4.7
1 parent e3ffc0d commit be86b3e

3 files changed

Lines changed: 703 additions & 258 deletions

File tree

app/server.js

Lines changed: 167 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ const READ_TIMEOUT_MS = 120000;
3535
const PAYLOAD_STATUS_CONNECT_TIMEOUT_MS = 5000;
3636
const PAYLOAD_STATUS_READ_TIMEOUT_MS = 10000;
3737
const UPLOAD_SOCKET_BUFFER_SIZE = 8 * 1024 * 1024;
38+
const UploadCmd = {
39+
StartUpload: 0x10,
40+
UploadChunk: 0x11,
41+
EndUpload: 0x12,
42+
};
43+
const UploadResp = {
44+
Ok: 0x01,
45+
Error: 0x02,
46+
Data: 0x03,
47+
Ready: 0x04,
48+
Progress: 0x05,
49+
};
3850
const LANE_CONNECTIONS = 4;
3951
const LANE_HUGE_FILE_BYTES = 20 * 1024 * 1024 * 1024;
4052
const LANE_LARGE_FILE_BYTES = 4 * 1024 * 1024 * 1024;
@@ -438,6 +450,9 @@ function tuneUploadSocket(socket) {
438450
if (!socket) return;
439451
socket.setNoDelay(true);
440452
socket.setKeepAlive(true, 1000);
453+
socket.setTimeout(15 * 60 * 1000, () => {
454+
try { socket.destroy(new Error('Upload socket timeout')); } catch {}
455+
});
441456
if (typeof socket.setSendBufferSize === 'function') {
442457
socket.setSendBufferSize(UPLOAD_SOCKET_BUFFER_SIZE);
443458
}
@@ -518,6 +533,118 @@ function createSocketLineReader(socket) {
518533
};
519534
}
520535

536+
function createSocketReader(socket) {
537+
let buffer = Buffer.alloc(0);
538+
let ended = false;
539+
let error = null;
540+
const waiters = new Set();
541+
542+
const notify = () => {
543+
for (const waiter of Array.from(waiters)) waiter();
544+
};
545+
546+
const onData = (chunk) => {
547+
buffer = Buffer.concat([buffer, chunk]);
548+
notify();
549+
};
550+
const onError = (err) => {
551+
error = err;
552+
notify();
553+
};
554+
const onClose = () => {
555+
ended = true;
556+
notify();
557+
};
558+
559+
socket.on('data', onData);
560+
socket.on('error', onError);
561+
socket.on('close', onClose);
562+
socket.on('end', onClose);
563+
564+
const awaitCondition = (predicate, timeoutMs) => new Promise((resolve, reject) => {
565+
if (error) return reject(error);
566+
if (predicate()) return resolve();
567+
if (ended) return reject(new Error('Connection closed'));
568+
569+
const waiter = () => {
570+
if (error) {
571+
cleanup();
572+
reject(error);
573+
return;
574+
}
575+
if (predicate()) {
576+
cleanup();
577+
resolve();
578+
return;
579+
}
580+
if (ended) {
581+
cleanup();
582+
reject(new Error('Connection closed'));
583+
}
584+
};
585+
586+
let timeout = null;
587+
if (timeoutMs) {
588+
timeout = setTimeout(() => {
589+
cleanup();
590+
reject(new Error('Read timeout'));
591+
}, timeoutMs);
592+
}
593+
594+
const cleanup = () => {
595+
waiters.delete(waiter);
596+
if (timeout) clearTimeout(timeout);
597+
};
598+
599+
waiters.add(waiter);
600+
});
601+
602+
return {
603+
readExact: async (length, timeoutMs) => {
604+
if (length === 0) return Buffer.alloc(0);
605+
await awaitCondition(() => buffer.length >= length, timeoutMs);
606+
const out = buffer.slice(0, length);
607+
buffer = buffer.slice(length);
608+
return out;
609+
},
610+
close: () => {
611+
socket.removeListener('data', onData);
612+
socket.removeListener('error', onError);
613+
socket.removeListener('close', onClose);
614+
socket.removeListener('end', onClose);
615+
waiters.clear();
616+
},
617+
};
618+
}
619+
620+
function buildUploadStartPayload(remotePath, totalSize, offset) {
621+
const pathBuf = Buffer.from(String(remotePath || ''), 'utf8');
622+
const payload = Buffer.alloc(pathBuf.length + 1 + 8 + 8);
623+
pathBuf.copy(payload, 0);
624+
payload.writeBigUInt64LE(BigInt(totalSize), pathBuf.length + 1);
625+
payload.writeBigUInt64LE(BigInt(offset), pathBuf.length + 9);
626+
return payload;
627+
}
628+
629+
async function readBinaryResponse(reader, timeoutMs = READ_TIMEOUT_MS) {
630+
const header = await reader.readExact(5, timeoutMs);
631+
const code = header.readUInt8(0);
632+
const len = header.readUInt32LE(1);
633+
const data = len > 0 ? await reader.readExact(len, timeoutMs) : Buffer.alloc(0);
634+
return { code, data };
635+
}
636+
637+
async function writeBinaryCommand(socket, cmd, payload) {
638+
const body = payload || Buffer.alloc(0);
639+
const header = Buffer.alloc(5);
640+
header[0] = cmd;
641+
header.writeUInt32LE(body.length, 1);
642+
await writeAll(socket, header);
643+
if (body.length > 0) {
644+
await writeAll(socket, body);
645+
}
646+
}
647+
521648
function writeAll(socket, buffer) {
522649
return new Promise((resolve, reject) => {
523650
const onError = (err) => {
@@ -1211,27 +1338,30 @@ async function uploadFastOneFile(ip, destRoot, file, options = {}) {
12111338
const chmodAfterUpload = Boolean(options.chmodAfterUpload);
12121339
const socket = await createSocketWithTimeout(ip, TRANSFER_PORT);
12131340
tuneUploadSocket(socket);
1214-
const lineReader = createSocketLineReader(socket);
1341+
const reader = createSocketReader(socket);
12151342
try {
1216-
const chmodToken = chmodAfterUpload ? 'CHMOD_END' : '0';
1217-
const cmd = `UPLOAD_FAST ${escapeCommandPath(destRoot)} ${escapeCommandPath(file.rel_path)} ${file.size} DIRECT ${chmodToken}\n`;
1218-
socket.write(cmd);
1219-
const ready = await lineReader.readLine(READ_TIMEOUT_MS);
1220-
if (!ready.startsWith('READY')) {
1221-
throw new Error(`Upload rejected: ${ready}`);
1343+
const remotePath = joinRemotePath(destRoot, file.rel_path);
1344+
const startPayload = buildUploadStartPayload(remotePath, file.size, 0);
1345+
await writeBinaryCommand(socket, UploadCmd.StartUpload, startPayload);
1346+
const readyResp = await readBinaryResponse(reader, READ_TIMEOUT_MS);
1347+
if (readyResp.code !== UploadResp.Ready) {
1348+
const msg = readyResp.data?.length ? readyResp.data.toString('utf8') : 'no response';
1349+
throw new Error(`Upload rejected: ${msg}`);
12221350
}
12231351
if (Number(file.size) > 0) {
12241352
const fd = await fs.promises.open(file.abs_path, 'r');
12251353
try {
1226-
const buf = Buffer.allocUnsafe(8 * 1024 * 1024);
1354+
const buf = Buffer.allocUnsafe(5 + 8 * 1024 * 1024);
12271355
let remaining = Number(file.size);
12281356
let pos = 0;
12291357
while (remaining > 0) {
12301358
if (shouldCancel && shouldCancel()) throw new Error('Transfer cancelled');
1231-
const take = Math.min(buf.length, remaining);
1232-
const { bytesRead } = await fd.read(buf, 0, take, pos);
1359+
const take = Math.min(buf.length - 5, remaining);
1360+
const { bytesRead } = await fd.read(buf, 5, take, pos);
12331361
if (bytesRead <= 0) throw new Error('Read failed');
1234-
await writeAll(socket, buf.subarray(0, bytesRead));
1362+
buf[0] = UploadCmd.UploadChunk;
1363+
buf.writeUInt32LE(bytesRead, 1);
1364+
await writeAll(socket, buf.subarray(0, 5 + bytesRead));
12351365
remaining -= bytesRead;
12361366
pos += bytesRead;
12371367
if (onProgress) onProgress(bytesRead);
@@ -1240,12 +1370,15 @@ async function uploadFastOneFile(ip, destRoot, file, options = {}) {
12401370
await fd.close().catch(() => {});
12411371
}
12421372
}
1243-
const result = await lineReader.readLine(READ_TIMEOUT_MS);
1244-
if (!result.startsWith('OK')) {
1245-
throw new Error(`Upload failed: ${result}`);
1373+
await writeBinaryCommand(socket, UploadCmd.EndUpload, Buffer.alloc(0));
1374+
const endResp = await readBinaryResponse(reader, READ_TIMEOUT_MS);
1375+
if (endResp.code !== UploadResp.Ok) {
1376+
const msg = endResp.data?.length ? endResp.data.toString('utf8') : 'unknown response';
1377+
throw new Error(`Upload failed: ${msg}`);
12461378
}
12471379
return true;
12481380
} finally {
1381+
try { reader.close(); } catch {}
12491382
try { socket.destroy(); } catch {}
12501383
}
12511384
}
@@ -1313,20 +1446,20 @@ async function uploadLaneSingleFile(ip, destRoot, file, options = {}) {
13131446
const runWorker = async (queue, idx) => {
13141447
const socket = await createSocketWithTimeout(ip, TRANSFER_PORT);
13151448
tuneUploadSocket(socket);
1316-
const lineReader = createSocketLineReader(socket);
1449+
const reader = createSocketReader(socket);
13171450
const fd = await fs.promises.open(file.abs_path, 'r');
13181451
try {
13191452
for (const chunk of queue) {
13201453
if (shouldCancel && shouldCancel()) throw new Error('Transfer cancelled');
13211454
if (chunk.offset !== 0) await waitForPrealloc();
13221455

1323-
const chmodToken = chmodAfterUpload ? 'CHMOD_END' : '0';
1324-
const cmd = `UPLOAD_FAST_OFFSET ${escapeCommandPath(destRoot)} ${escapeCommandPath(file.rel_path)} ${chunk.offset} ${totalSize} ${chunk.len} ${chmodToken}\n`;
1325-
socket.write(cmd);
1326-
1327-
const ready = await lineReader.readLine(READ_TIMEOUT_MS);
1328-
if (!ready.startsWith('READY')) {
1329-
throw new Error(`Lane rejected: ${ready}`);
1456+
const remotePath = joinRemotePath(destRoot, file.rel_path);
1457+
const startPayload = buildUploadStartPayload(remotePath, totalSize, chunk.offset);
1458+
await writeBinaryCommand(socket, UploadCmd.StartUpload, startPayload);
1459+
const readyResp = await readBinaryResponse(reader, READ_TIMEOUT_MS);
1460+
if (readyResp.code !== UploadResp.Ready) {
1461+
const msg = readyResp.data?.length ? readyResp.data.toString('utf8') : 'no response';
1462+
throw new Error(`Lane rejected: ${msg}`);
13301463
}
13311464
if (chunk.offset === 0 && !preallocResolved) {
13321465
preallocResolved = true;
@@ -1335,20 +1468,24 @@ async function uploadLaneSingleFile(ip, destRoot, file, options = {}) {
13351468

13361469
let remaining = chunk.len;
13371470
let pos = chunk.offset;
1338-
const buf = Buffer.allocUnsafe(8 * 1024 * 1024);
1471+
const buf = Buffer.allocUnsafe(5 + 8 * 1024 * 1024);
13391472
while (remaining > 0) {
13401473
if (shouldCancel && shouldCancel()) throw new Error('Transfer cancelled');
1341-
const take = Math.min(buf.length, remaining);
1342-
const { bytesRead } = await fd.read(buf, 0, take, pos);
1474+
const take = Math.min(buf.length - 5, remaining);
1475+
const { bytesRead } = await fd.read(buf, 5, take, pos);
13431476
if (bytesRead <= 0) throw new Error('Read failed');
1344-
await writeAll(socket, buf.subarray(0, bytesRead));
1477+
buf[0] = UploadCmd.UploadChunk;
1478+
buf.writeUInt32LE(bytesRead, 1);
1479+
await writeAll(socket, buf.subarray(0, 5 + bytesRead));
13451480
remaining -= bytesRead;
13461481
pos += bytesRead;
13471482
}
13481483

1349-
const result = await lineReader.readLine(READ_TIMEOUT_MS);
1350-
if (!result.startsWith('OK')) {
1351-
throw new Error(`Lane failed: ${result}`);
1484+
await writeBinaryCommand(socket, UploadCmd.EndUpload, Buffer.alloc(0));
1485+
const endResp = await readBinaryResponse(reader, READ_TIMEOUT_MS);
1486+
if (endResp.code !== UploadResp.Ok) {
1487+
const msg = endResp.data?.length ? endResp.data.toString('utf8') : 'unknown response';
1488+
throw new Error(`Lane failed: ${msg}`);
13521489
}
13531490

13541491
workerProgress[idx] += chunk.len;
@@ -1357,6 +1494,7 @@ async function uploadLaneSingleFile(ip, destRoot, file, options = {}) {
13571494
}
13581495
} finally {
13591496
await fd.close().catch(() => {});
1497+
try { reader.close(); } catch {}
13601498
try { socket.destroy(); } catch {}
13611499
}
13621500
};

0 commit comments

Comments
 (0)