import { Conn } from "deno";import { BufReader, BufWriter } from "https://deno.land/x/io/bufio.ts";import { PacketWriter } from "./packet_writer.ts";import { readUInt32BE } from "./utils.ts";import { PacketReader } from "./packet_reader.ts";import { QueryResult, Query } from "./query.ts";import { parseError } from "./error.ts";import { ConnectionParams } from "./connection_params.ts";
export enum Format { TEXT = 0, BINARY = 1,}
enum TransactionStatus { Idle = "I", IdleInTransaction = "T", InFailedTransaction = "E",};
export class Message { public reader: PacketReader;
constructor( public type: string, public byteCount: number, public body: Uint8Array, ) { this.reader = new PacketReader(body); }}
export class Column { constructor( public name: string, public tableOid: number, public index: number, public typeOid: number, public columnLength: number, public typeModifier: number, public format: Format, ) { }}
export class RowDescription { constructor( public columnCount: number, public columns: Column[], ) { }}
export class Connection { private bufReader: BufReader; private bufWriter: BufWriter; private packetWriter: PacketWriter; private decoder: TextDecoder = new TextDecoder(); private encoder: TextEncoder = new TextEncoder();
private _transactionStatus?: TransactionStatus; private _pid?: number; private _secretKey?: number; private _parameters: { [key: string]: string } = {};
constructor(private conn: Conn, private connParams: ConnectionParams) { this.bufReader = new BufReader(conn); this.bufWriter = new BufWriter(conn); this.packetWriter = new PacketWriter(); }
async readMessage(): Promise<Message> { const header = new Uint8Array(5); await this.bufReader.readFull(header); const msgType = this.decoder.decode(header.slice(0, 1)); const msgLength = readUInt32BE(header, 1) - 4; const msgBody = new Uint8Array(msgLength); await this.bufReader.readFull(msgBody);
return new Message(msgType, msgLength, msgBody); }
private async _sendStartupMessage(connParams: ConnectionParams) { const writer = this.packetWriter; writer.clear(); writer.addInt16(3).addInt16(0);
["user", "database", "application_name"].forEach(function (key) { const val = connParams[key]; writer.addCString(key).addCString(val); })
writer.addCString('client_encoding').addCString("'utf-8'"); writer.addCString("");
const bodyBuffer = writer.flush(); const bodyLength = bodyBuffer.length + 4;
writer.clear(); const finalBuffer = writer .addInt32(bodyLength) .add(bodyBuffer) .join();
await this.bufWriter.write(finalBuffer); }
async startup(connParams: ConnectionParams) { await this._sendStartupMessage(connParams); await this.bufWriter.flush();
let msg: Message;
msg = await this.readMessage(); await this.handleAuth(msg);
while (true) { msg = await this.readMessage(); switch (msg.type) { case "K": this._processBackendKeyData(msg); break; case "S": this._processParameterStatus(msg); break; case "Z": this._processReadyForQuery(msg); return; default: throw new Error(`Unknown response for startup: ${msg.type}`); } } }
async handleAuth(msg: Message) { const code = msg.reader.readInt32(); switch (code) { case 0: break; case 3: await this._authCleartext(); await this._readAuthResponse(); break; case 5: const salt = msg.reader.readBytes(4); await this._authMd5(salt); await this._readAuthResponse(); break; default: throw new Error(`Unknown auth message code ${code}`); } }
private async _readAuthResponse() { const msg = await this.readMessage();
if (msg.type !== "R") { throw new Error(`Unexpected auth response: ${msg.type}.`); }
const responseCode = msg.reader.readInt32(); if (responseCode !== 0) { throw new Error(`Unexpected auth response code: ${responseCode}.`); } }
private async _authCleartext() { this.packetWriter.clear();
const password = this.connParams.password || "";
const buffer = this.packetWriter .addCString(password) .flush(0x70);
await this.bufWriter.write(buffer); await this.bufWriter.flush(); }
private async _authMd5(salt: Uint8Array) { throw new Error("MD5 password auth not implemented."); this.packetWriter.clear();
const password = "";
const buffer = this.packetWriter .addCString(password) .flush(0x70);
await this.bufWriter.write(buffer); await this.bufWriter.flush(); }
private _processBackendKeyData(msg: Message) { this._pid = msg.reader.readInt32(); this._secretKey = msg.reader.readInt32(); }
private _processParameterStatus(msg: Message) { const key = msg.reader.readCString(); const value = msg.reader.readCString(); this._parameters[key] = value; }
private _processReadyForQuery(msg: Message) { const txStatus = msg.reader.readByte(); this._transactionStatus = String.fromCharCode(txStatus) as TransactionStatus; }
private async _readReadyForQuery() { const msg = await this.readMessage();
if (msg.type !== 'Z') { throw new Error(`Unexpected message type: ${msg.type}, expected "Z" (ReadyForQuery)`); }
this._processReadyForQuery(msg); }
private async _simpleQuery(query: Query): Promise<QueryResult> { this.packetWriter.clear();
const buffer = this.packetWriter .addCString(query.text) .flush(0x51);
await this.bufWriter.write(buffer); await this.bufWriter.flush();
const result = query.result;
let msg: Message;
msg = await this.readMessage();
switch (msg.type) { case "T": result.handleRowDescription(this._processRowDescription(msg)); break; case "n": return result; case "E": await this._processError(msg); break; case "N": console.log("TODO: handle notice"); break; case "C": result.done(); break; default: throw new Error(`Unexpected frame: ${msg.type}`); }
while (true) { msg = await this.readMessage(); switch (msg.type) { case "D": const foo = this._readDataRow(msg); result.handleDataRow(foo) break; case "C": result.done(); break; case "Z": this._processReadyForQuery(msg); return result; case "E": await this._processError(msg); break; default: throw new Error(`Unexpected frame: ${msg.type}`); } } }
async _sendPrepareMessage(query: Query) { this.packetWriter.clear();
const buffer = this.packetWriter .addCString("") .addCString(query.text) .addInt16(0) .flush(0x50); await this.bufWriter.write(buffer); }
async _sendBindMessage(query: Query) { this.packetWriter.clear();
const hasBinaryArgs = query.args.reduce((prev, curr) => { return prev || curr instanceof Uint8Array; }, false);
this.packetWriter.clear(); this.packetWriter .addCString("") .addCString(""); if (hasBinaryArgs) { this.packetWriter .addInt16(query.args.length);
query.args.forEach(arg => { this.packetWriter .addInt16( arg instanceof Uint8Array ? 1 : 0 ); }); } else { this.packetWriter.addInt16(0) } this.packetWriter.addInt16(query.args.length); query.args.forEach(arg => { if (arg === null || typeof arg === 'undefined') { this.packetWriter.addInt32(-1) } else if (arg instanceof Uint8Array) { this.packetWriter.addInt32(arg.length) this.packetWriter.add(arg); } else { const byteLength = this.encoder.encode(arg).length; this.packetWriter.addInt32(byteLength); this.packetWriter.addString(arg); } });
this.packetWriter.addInt16(0); const buffer = this.packetWriter.flush(0x42); await this.bufWriter.write(buffer); }
async _sendDescribeMessage() { this.packetWriter.clear();
const buffer = this.packetWriter .addCString("P") .flush(0x44); await this.bufWriter.write(buffer); }
async _sendExecuteMessage() { this.packetWriter.clear();
const buffer = this.packetWriter .addCString("") .addInt32(0) .flush(0x45); await this.bufWriter.write(buffer); }
async _sendFlushMessage() { this.packetWriter.clear();
const buffer = this.packetWriter .flush(0x48); await this.bufWriter.write(buffer); }
async _sendSyncMessage() { this.packetWriter.clear();
const buffer = this.packetWriter .flush(0x53); await this.bufWriter.write(buffer); }
async _processError(msg: Message) { const error = parseError(msg); await this._readReadyForQuery(); throw error; }
private async _readParseComplete() { const msg = await this.readMessage();
switch (msg.type) { case "1": break; case "E": await this._processError(msg); break; default: throw new Error(`Unexpected frame: ${msg.type}`); } }
private async _readBindComplete() { const msg = await this.readMessage();
switch (msg.type) { case "2": break; case "E": await this._processError(msg); break; default: throw new Error(`Unexpected frame: ${msg.type}`); } }
async _preparedQuery(query: Query): Promise<QueryResult> { await this._sendPrepareMessage(query); await this._sendBindMessage(query); await this._sendDescribeMessage(); await this._sendExecuteMessage(); await this._sendSyncMessage(); await this.bufWriter.flush();
await this._readParseComplete(); await this._readBindComplete(); const result = query.result; let msg: Message; msg = await this.readMessage();
switch (msg.type) { case "T": const rowDescription = this._processRowDescription(msg); result.handleRowDescription(rowDescription); break; case "n": return result; case "E": await this._processError(msg); break; default: throw new Error(`Unexpected frame: ${msg.type}`); }
let isDone = false; while (!isDone) { msg = await this.readMessage(); switch (msg.type) { case "D": const rawDataRow = this._readDataRow(msg); result.handleDataRow(rawDataRow) break; case "C": result.done(); isDone = true; break; case "E": await this._processError(msg); break; default: throw new Error(`Unexpected frame: ${msg.type}`); } }
await this._readReadyForQuery();
return result; }
async query(query: Query): Promise<QueryResult> { if (query.args.length === 0) { return await this._simpleQuery(query); } return await this._preparedQuery(query); }
private _processRowDescription(msg: Message): RowDescription { const columnCount = msg.reader.readInt16(); const columns = [];
for (let i = 0; i < columnCount; i++) { const column = new Column( msg.reader.readCString(), msg.reader.readInt32(), msg.reader.readInt16(), msg.reader.readInt32(), msg.reader.readInt16(), msg.reader.readInt32(), msg.reader.readInt16(), ) columns.push(column); }
return new RowDescription(columnCount, columns); }
_readDataRow(msg: Message): any[] { const fieldCount = msg.reader.readInt16(); const row = [];
for (let i = 0; i < fieldCount; i++) { const colLength = msg.reader.readInt32();
if (colLength == -1) { row.push(null); continue; }
row.push(msg.reader.readBytes(colLength)) }
return row; }
async end(): Promise<void> { const terminationMessage = new Uint8Array([0x58, 0x00, 0x00, 0x00, 0x04]); await this.bufWriter.write(terminationMessage); await this.bufWriter.flush(); this.conn.close(); }}