import type { Application } from "./application.ts";import type { Context } from "./context.ts";import { assert, BufWriter } from "./deps.ts";import { NativeRequest } from "./http_server_native.ts";import type { ServerRequest } from "./http_server_std.ts";
const encoder = new TextEncoder();
export interface ServerSentEventInit extends EventInit { id?: number;
replacer?: | (string | number)[] | ((this: any, key: string, value: any) => any);
space?: string | number;}
export interface ServerSentEventTargetOptions { headers?: Headers;}
class CloseEvent extends Event { constructor(eventInit: EventInit) { super("close", eventInit); }}
export class ServerSentEvent extends Event { #data: string; #id?: number; #type: string;
constructor( type: string, data: any, { replacer, space, ...eventInit }: ServerSentEventInit = {}, ) { super(type, eventInit); this.#type = type; try { this.#data = typeof data === "string" ? data : JSON.stringify(data, replacer as (string | number)[], space); } catch (e) { assert(e instanceof Error); throw new TypeError( `data could not be coerced into a serialized string.\n ${e.message}`, ); } const { id } = eventInit; this.#id = id; }
get data(): string { return this.#data; }
get id(): number | undefined { return this.#id; }
toString(): string { const data = `data: ${this.#data.split("\n").join("\ndata: ")}\n`; return `${this.#type === "__message" ? "" : `event: ${this.#type}\n`}${ this.#id ? `id: ${String(this.#id)}\n` : "" }${data}\n`; }}
const response = `HTTP/1.1 200 OK\n`;
const responseHeaders = new Headers( [ ["Connection", "Keep-Alive"], ["Content-Type", "text/event-stream"], ["Cache-Control", "no-cache"], ["Keep-Alive", `timeout=${Number.MAX_SAFE_INTEGER}`], ],);
export interface ServerSentEventTarget extends EventTarget { readonly closed: boolean;
close(): Promise<void>;
dispatchComment(comment: string): boolean;
dispatchMessage(data: any): boolean;
dispatchEvent(event: ServerSentEvent): boolean;
dispatchEvent(event: CloseEvent | ErrorEvent): boolean;}
export class SSEStreamTarget extends EventTarget implements ServerSentEventTarget { #closed = false; #context: Context; #controller?: ReadableStreamDefaultController<Uint8Array>;
#error = (error: any) => { this.dispatchEvent(new CloseEvent({ cancelable: false })); const errorEvent = new ErrorEvent("error", { error }); this.dispatchEvent(errorEvent); this.#context.app.dispatchEvent(errorEvent); };
#push = (payload: string) => { if (!this.#controller) { this.#error(new Error("The controller has not been set.")); return; } if (this.#closed) { return; } this.#controller.enqueue(encoder.encode(payload)); };
get closed(): boolean { return this.#closed; }
constructor( context: Context, { headers }: ServerSentEventTargetOptions = {}, ) { super();
this.#context = context;
context.response.body = new ReadableStream<Uint8Array>({ start: (controller) => { this.#controller = controller; }, cancel: (error) => { this.#error(error); }, });
if (headers) { for (const [key, value] of headers) { context.response.headers.set(key, value); } } for (const [key, value] of responseHeaders) { context.response.headers.set(key, value); }
this.addEventListener("close", () => { this.#closed = true; if (this.#controller) { this.#controller.close(); } }); }
close(): Promise<void> { this.dispatchEvent(new CloseEvent({ cancelable: false })); return Promise.resolve(); }
dispatchComment(comment: string): boolean { this.#push(`: ${comment.split("\n").join("\n: ")}\n\n`); return true; }
dispatchMessage(data: any): boolean { const event = new ServerSentEvent("__message", data); return this.dispatchEvent(event); }
dispatchEvent(event: ServerSentEvent): boolean; dispatchEvent(event: CloseEvent | ErrorEvent): boolean; dispatchEvent(event: ServerSentEvent | CloseEvent | ErrorEvent): boolean { const dispatched = super.dispatchEvent(event); if (dispatched && event instanceof ServerSentEvent) { this.#push(String(event)); } return dispatched; }}
export class SSEStdLibTarget extends EventTarget implements ServerSentEventTarget { #app: Application; #closed = false; #prev = Promise.resolve(); #ready: Promise<void> | true; #serverRequest: ServerRequest; #writer: BufWriter;
#send = async (payload: string, prev: Promise<void>): Promise<void> => { if (this.#closed) { return; } if (this.#ready !== true) { await this.#ready; this.#ready = true; } try { await prev; await this.#writer.write(encoder.encode(payload)); await this.#writer.flush(); } catch (error) { this.dispatchEvent(new CloseEvent({ cancelable: false })); const errorEvent = new ErrorEvent("error", { error }); this.dispatchEvent(errorEvent); this.#app.dispatchEvent(errorEvent); } };
#setup = async (overrideHeaders?: Headers): Promise<void> => { const headers = new Headers(responseHeaders); if (overrideHeaders) { for (const [key, value] of overrideHeaders) { headers.set(key, value); } } let payload = response; for (const [key, value] of headers) { payload += `${key}: ${value}\n`; } payload += `\n`; try { await this.#writer.write(encoder.encode(payload)); await this.#writer.flush(); } catch (error) { this.dispatchEvent(new CloseEvent({ cancelable: false })); const errorEvent = new ErrorEvent("error", { error }); this.dispatchEvent(errorEvent); this.#app.dispatchEvent(errorEvent); throw error; } };
get closed(): boolean { return this.#closed; }
constructor( context: Context, { headers }: ServerSentEventTargetOptions = {}, ) { super(); this.#app = context.app; assert(!(context.request.originalRequest instanceof NativeRequest)); this.#serverRequest = context.request.originalRequest; this.#writer = this.#serverRequest.w; this.addEventListener("close", () => { this.#closed = true; try { this.#serverRequest.conn.close(); } catch (error) { if (!(error instanceof Deno.errors.BadResource)) { const errorEvent = new ErrorEvent("error", { error }); this.dispatchEvent(errorEvent); this.#app.dispatchEvent(errorEvent); } } }); this.#ready = this.#setup(headers); }
async close(): Promise<void> { if (this.#ready !== true) { await this.#ready; } await this.#prev; this.dispatchEvent(new CloseEvent({ cancelable: false })); }
dispatchComment(comment: string): boolean { this.#prev = this.#send( `: ${comment.split("\n").join("\n: ")}\n\n`, this.#prev, ); return true; }
dispatchMessage(data: any): boolean { const event = new ServerSentEvent("__message", data); return this.dispatchEvent(event); }
dispatchEvent(event: ServerSentEvent): boolean; dispatchEvent(event: CloseEvent | ErrorEvent): boolean; dispatchEvent(event: ServerSentEvent | CloseEvent | ErrorEvent): boolean { const dispatched = super.dispatchEvent(event); if (dispatched && event instanceof ServerSentEvent) { this.#prev = this.#send(String(event), this.#prev); } return dispatched; }}