import { bold, yellow } from "./deps.ts";import { BufReader, BufWriter } from "./deps.ts";import { DeferredStack } from "./deferred.ts";import { hashMd5Password, readUInt32BE } from "./utils.ts";import { PacketReader } from "./packet_reader.ts";import { PacketWriter } from "./packet_writer.ts";import { parseError, parseNotice } from "./warning.ts";import { Query, QueryArrayResult, QueryConfig, QueryObjectResult, QueryResult,} from "./query.ts";import type { ConnectionParams } from "./connection_params.ts";
export enum ResultType { ARRAY, OBJECT,}
export enum Format { TEXT = 0, BINARY = 1,}
enum TransactionStatus { Idle = "I", IdleInTransaction = "T", InFailedTransaction = "E",}
export class Message { public reader: PacketReader;
constructor( public type: string, public byteCount: number, public body: Uint8Array, ) { this.reader = new PacketReader(body); }}
export class Column { constructor( public name: string, public tableOid: number, public index: number, public typeOid: number, public columnLength: number, public typeModifier: number, public format: Format, ) {}}
export class RowDescription { constructor(public columnCount: number, public columns: Column[]) {}}
export class Connection { private conn!: Deno.Conn;
private bufReader!: BufReader; private bufWriter!: BufWriter; private packetWriter!: PacketWriter; private decoder: TextDecoder = new TextDecoder(); private encoder: TextEncoder = new TextEncoder();
private _transactionStatus?: TransactionStatus; private _pid?: number; private _secretKey?: number; private _parameters: { [key: string]: string } = {}; private _queryLock: DeferredStack<undefined> = new DeferredStack( 1, [undefined], );
constructor(private connParams: ConnectionParams) {}
async readMessage(): Promise<Message> { const header = new Uint8Array(5); await this.bufReader.readFull(header); const msgType = this.decoder.decode(header.slice(0, 1)); const msgLength = readUInt32BE(header, 1) - 4; const msgBody = new Uint8Array(msgLength); await this.bufReader.readFull(msgBody);
return new Message(msgType, msgLength, msgBody); }
private async _sendStartupMessage() { const writer = this.packetWriter; writer.clear(); writer.addInt16(3).addInt16(0); const connParams = this.connParams; writer.addCString("user").addCString(connParams.user); writer.addCString("database").addCString(connParams.database); writer.addCString("application_name").addCString( connParams.applicationName, );
writer.addCString("client_encoding").addCString("'utf-8'"); writer.addCString("");
const bodyBuffer = writer.flush(); const bodyLength = bodyBuffer.length + 4;
writer.clear();
const finalBuffer = writer .addInt32(bodyLength) .add(bodyBuffer) .join();
await this.bufWriter.write(finalBuffer); }
async startup() { const { port, hostname } = this.connParams; this.conn = await Deno.connect({ port, hostname });
this.bufReader = new BufReader(this.conn); this.bufWriter = new BufWriter(this.conn); this.packetWriter = new PacketWriter();
await this._sendStartupMessage(); await this.bufWriter.flush();
let msg: Message;
msg = await this.readMessage(); await this.handleAuth(msg);
while (true) { msg = await this.readMessage(); switch (msg.type) { case "E": await this._processError(msg, false); break; case "K": this._processBackendKeyData(msg); break; case "S": this._processParameterStatus(msg); break; case "Z": this._processReadyForQuery(msg); return; default: throw new Error(`Unknown response for startup: ${msg.type}`); } } }
async handleAuth(msg: Message) { const code = msg.reader.readInt32(); switch (code) { case 0: break; case 3: await this._authCleartext(); await this._readAuthResponse(); break; case 5: { const salt = msg.reader.readBytes(4); await this._authMd5(salt); await this._readAuthResponse(); break; } default: throw new Error(`Unknown auth message code ${code}`); } }
private async _readAuthResponse() { const msg = await this.readMessage();
if (msg.type === "E") { throw parseError(msg); } else if (msg.type !== "R") { throw new Error(`Unexpected auth response: ${msg.type}.`); }
const responseCode = msg.reader.readInt32(); if (responseCode !== 0) { throw new Error(`Unexpected auth response code: ${responseCode}.`); } }
private async _authCleartext() { this.packetWriter.clear(); const password = this.connParams.password || ""; const buffer = this.packetWriter.addCString(password).flush(0x70);
await this.bufWriter.write(buffer); await this.bufWriter.flush(); }
private async _authMd5(salt: Uint8Array) { this.packetWriter.clear();
if (!this.connParams.password) { throw new Error("Auth Error: attempting MD5 auth with password unset"); }
const password = hashMd5Password( this.connParams.password, this.connParams.user, salt, ); const buffer = this.packetWriter.addCString(password).flush(0x70);
await this.bufWriter.write(buffer); await this.bufWriter.flush(); }
private _processBackendKeyData(msg: Message) { this._pid = msg.reader.readInt32(); this._secretKey = msg.reader.readInt32(); }
private _processParameterStatus(msg: Message) { const key = msg.reader.readCString(); const value = msg.reader.readCString(); this._parameters[key] = value; }
private _processReadyForQuery(msg: Message) { const txStatus = msg.reader.readByte(); this._transactionStatus = String.fromCharCode( txStatus, ) as TransactionStatus; }
private async _readReadyForQuery() { const msg = await this.readMessage();
if (msg.type !== "Z") { throw new Error( `Unexpected message type: ${msg.type}, expected "Z" (ReadyForQuery)`, ); }
this._processReadyForQuery(msg); }
private async _simpleQuery( query: Query, type: ResultType, ): Promise<QueryResult> { this.packetWriter.clear();
const buffer = this.packetWriter.addCString(query.text).flush(0x51);
await this.bufWriter.write(buffer); await this.bufWriter.flush();
let result; if (type === ResultType.ARRAY) { result = new QueryArrayResult(query); } else { result = new QueryObjectResult(query); }
let msg: Message;
msg = await this.readMessage();
switch (msg.type) { case "T": result.loadColumnDescriptions(this._processRowDescription(msg)); break; case "n": break; case "E": await this._processError(msg); break; case "N": result.warnings.push(await this._processNotice(msg)); break; case "C": { const commandTag = this._readCommandTag(msg); result.handleCommandComplete(commandTag); result.done(); break; } default: throw new Error(`Unexpected frame: ${msg.type}`); }
while (true) { msg = await this.readMessage(); switch (msg.type) { case "D": { const foo = this._readDataRow(msg); result.insertRow(foo); break; } case "C": { const commandTag = this._readCommandTag(msg); result.handleCommandComplete(commandTag); result.done(); break; } case "Z": this._processReadyForQuery(msg); return result; case "E": await this._processError(msg); break; case "N": result.warnings.push(await this._processNotice(msg)); break; default: throw new Error(`Unexpected frame: ${msg.type}`); } } }
async _sendPrepareMessage(query: Query) { this.packetWriter.clear();
const buffer = this.packetWriter .addCString("") .addCString(query.text) .addInt16(0) .flush(0x50); await this.bufWriter.write(buffer); }
async _sendBindMessage(query: Query) { this.packetWriter.clear();
const hasBinaryArgs = query.args.some((arg) => arg instanceof Uint8Array);
this.packetWriter.clear(); this.packetWriter .addCString("") .addCString("");
if (hasBinaryArgs) { this.packetWriter.addInt16(query.args.length);
query.args.forEach((arg) => { this.packetWriter.addInt16(arg instanceof Uint8Array ? 1 : 0); }); } else { this.packetWriter.addInt16(0); }
this.packetWriter.addInt16(query.args.length);
query.args.forEach((arg) => { if (arg === null || typeof arg === "undefined") { this.packetWriter.addInt32(-1); } else if (arg instanceof Uint8Array) { this.packetWriter.addInt32(arg.length); this.packetWriter.add(arg); } else { const byteLength = this.encoder.encode(arg).length; this.packetWriter.addInt32(byteLength); this.packetWriter.addString(arg); } });
this.packetWriter.addInt16(0); const buffer = this.packetWriter.flush(0x42); await this.bufWriter.write(buffer); }
async _sendDescribeMessage() { this.packetWriter.clear();
const buffer = this.packetWriter.addCString("P").flush(0x44); await this.bufWriter.write(buffer); }
async _sendExecuteMessage() { this.packetWriter.clear();
const buffer = this.packetWriter .addCString("") .addInt32(0) .flush(0x45); await this.bufWriter.write(buffer); }
async _sendFlushMessage() { this.packetWriter.clear();
const buffer = this.packetWriter.flush(0x48); await this.bufWriter.write(buffer); }
async _sendSyncMessage() { this.packetWriter.clear();
const buffer = this.packetWriter.flush(0x53); await this.bufWriter.write(buffer); }
async _processError(msg: Message, recoverable = true) { const error = parseError(msg); if (recoverable) { await this._readReadyForQuery(); } throw error; }
_processNotice(msg: Message) { const warning = parseNotice(msg); console.error(`${bold(yellow(warning.severity))}: ${warning.message}`); return warning; }
private async _readParseComplete() { const msg = await this.readMessage();
switch (msg.type) { case "1": break; case "E": await this._processError(msg); break; default: throw new Error(`Unexpected frame: ${msg.type}`); } }
private async _readBindComplete() { const msg = await this.readMessage();
switch (msg.type) { case "2": break; case "E": await this._processError(msg); break; default: throw new Error(`Unexpected frame: ${msg.type}`); } }
async _preparedQuery(query: Query, type: ResultType): Promise<QueryResult> { await this._sendPrepareMessage(query); await this._sendBindMessage(query); await this._sendDescribeMessage(); await this._sendExecuteMessage(); await this._sendSyncMessage(); await this.bufWriter.flush();
await this._readParseComplete(); await this._readBindComplete();
let result; if (type === ResultType.ARRAY) { result = new QueryArrayResult(query); } else { result = new QueryObjectResult(query); } let msg: Message; msg = await this.readMessage();
switch (msg.type) { case "T": { const rowDescription = this._processRowDescription(msg); result.loadColumnDescriptions(rowDescription); break; } case "n": break; case "E": await this._processError(msg); break; default: throw new Error(`Unexpected frame: ${msg.type}`); }
outerLoop: while (true) { msg = await this.readMessage(); switch (msg.type) { case "D": { const rawDataRow = this._readDataRow(msg); result.insertRow(rawDataRow); break; } case "C": { const commandTag = this._readCommandTag(msg); result.handleCommandComplete(commandTag); result.done(); break outerLoop; } case "E": await this._processError(msg); break; default: throw new Error(`Unexpected frame: ${msg.type}`); } }
await this._readReadyForQuery();
return result; }
async query(query: Query, type: ResultType): Promise<QueryResult> { await this._queryLock.pop(); try { if (query.args.length === 0) { return await this._simpleQuery(query, type); } else { return await this._preparedQuery(query, type); } } finally { this._queryLock.push(undefined); } }
private _processRowDescription(msg: Message): RowDescription { const columnCount = msg.reader.readInt16(); const columns = [];
for (let i = 0; i < columnCount; i++) { const column = new Column( msg.reader.readCString(), msg.reader.readInt32(), msg.reader.readInt16(), msg.reader.readInt32(), msg.reader.readInt16(), msg.reader.readInt32(), msg.reader.readInt16(), ); columns.push(column); }
return new RowDescription(columnCount, columns); }
_readDataRow(msg: Message): any[] { const fieldCount = msg.reader.readInt16(); const row = [];
for (let i = 0; i < fieldCount; i++) { const colLength = msg.reader.readInt32();
if (colLength == -1) { row.push(null); continue; }
row.push(msg.reader.readBytes(colLength)); }
return row; }
_readCommandTag(msg: Message) { return msg.reader.readString(msg.byteCount); }
async initSQL(): Promise<void> { const config: QueryConfig = { text: "select 1;", args: [] }; const query = new Query(config); await this.query(query, ResultType.ARRAY); }
async end(): Promise<void> { const terminationMessage = new Uint8Array([0x58, 0x00, 0x00, 0x00, 0x04]); await this.bufWriter.write(terminationMessage); await this.bufWriter.flush(); this.conn.close(); }}