import type { Client, ObjectMetadata, UploadedObjectInfo } from "./client.ts";import { getVersionId, sanitizeETag } from "./helpers.ts";import { parse as parseXML } from "./xml-parser.ts";
export class ObjectUploader extends WritableStream<Uint8Array> { public readonly getResult: () => UploadedObjectInfo;
constructor({ client, bucketName, objectName, partSize, metadata }: { client: Client; bucketName: string; objectName: string; partSize: number; metadata: Record<string, string>; }) { let result: UploadedObjectInfo; let nextPartNumber = 1; let uploadId: string; const etags: { part: number; etag: string }[] = []; const partsPromises: Promise<Response>[] = [];
super({ start() {}, async write(chunk, _controller) { const method = "PUT"; const partNumber = nextPartNumber++;
try { if (partNumber == 1 && chunk.length < partSize) { const response = await client.makeRequest({ method, headers: new Headers({ ...metadata, "Content-Length": String(chunk.length), }), bucketName, objectName, payload: chunk, }); result = { etag: sanitizeETag(response.headers.get("etag") ?? undefined), versionId: getVersionId(response.headers), }; return; }
if (partNumber === 1) { uploadId = (await initiateNewMultipartUpload({ client, bucketName, objectName, metadata, })).uploadId; } const partPromise = client.makeRequest({ method, query: { partNumber: partNumber.toString(), uploadId }, headers: new Headers({ "Content-Length": String(chunk.length) }), bucketName: bucketName, objectName: objectName, payload: chunk, }); partPromise.then((response) => { let etag = response.headers.get("etag") ?? ""; if (etag) { etag = etag.replace(/^"/, "").replace(/"$/, ""); } etags.push({ part: partNumber, etag }); }); partsPromises.push(partPromise); } catch (err) { throw err; } }, async close() { if (result) { } else if (uploadId) { await Promise.all(partsPromises); etags.sort((a, b) => a.part > b.part ? 1 : -1); result = await completeMultipartUpload({ client, bucketName, objectName, uploadId, etags }); } else { throw new Error("Stream was closed without uploading any data."); } }, }); this.getResult = () => { if (result === undefined) { throw new Error("Result is not ready. await the stream first."); } return result; }; }}
async function initiateNewMultipartUpload( options: { client: Client; bucketName: string; objectName: string; metadata?: ObjectMetadata; },): Promise<{ uploadId: string }> { const method = "POST"; const headers = new Headers(options.metadata); const query = "uploads"; const response = await options.client.makeRequest({ method, bucketName: options.bucketName, objectName: options.objectName, query, headers, returnBody: true, }); const responseText = await response.text(); const root = parseXML(responseText).root; if (!root || root.name !== "InitiateMultipartUploadResult") { throw new Error(`Unexpected response: ${responseText}`); } const uploadId = root.children.find((c) => c.name === "UploadId")?.content; if (!uploadId) { throw new Error(`Unable to get UploadId from response: ${responseText}`); } return { uploadId };}
async function completeMultipartUpload( { client, bucketName, objectName, uploadId, etags }: { client: Client; bucketName: string; objectName: string; uploadId: string; etags: { part: number; etag: string }[]; },): Promise<UploadedObjectInfo> { const payload = ` <CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> ${etags.map((et) => ` <Part><PartNumber>${et.part}</PartNumber><ETag>${et.etag}</ETag></Part>`).join("\n")} </CompleteMultipartUpload> `; const response = await client.makeRequest({ method: "POST", bucketName, objectName, query: `uploadId=${encodeURIComponent(uploadId)}`, payload: new TextEncoder().encode(payload), returnBody: true, }); const responseText = await response.text(); const root = parseXML(responseText).root; if (!root || root.name !== "CompleteMultipartUploadResult") { throw new Error(`Unexpected response: ${responseText}`); } const etagRaw = root.children.find((c) => c.name === "ETag")?.content; if (!etagRaw) throw new Error(`Unable to get ETag from response: ${responseText}`); const versionId = getVersionId(response.headers); return { etag: sanitizeETag(etagRaw), versionId, };}