import type {LogContext} from '@rocicorp/logger';import {deepClone, JSONValue, ReadonlyJSONValue} from './json';import { isScanIndexOptions, KeyTypeForScanOptions, ScanIndexOptions, ScanOptions, toDbScanOptions,} from './scan-options';import {fromKeyForIndexScanInternal, ScanResultImpl} from './scan-iterator';import type {ScanResult} from './scan-iterator';import {throwIfClosed} from './transaction-closed-error';import * as db from './db/mod';import * as sync from './sync/mod';import type {Hash} from './hash';import type {ScanSubscriptionInfo} from './subscriptions';import type {ScanNoIndexOptions} from './mod.js';import {decodeIndexKey, IndexKey} from './db/index.js';
export interface ReadTransaction { readonly clientID: string;
get(key: string): Promise<ReadonlyJSONValue | undefined>;
has(key: string): Promise<boolean>;
isEmpty(): Promise<boolean>;
scan(): ScanResult<string, ReadonlyJSONValue>;
scan<Options extends ScanOptions>( options?: Options, ): ScanResult<KeyTypeForScanOptions<Options>, ReadonlyJSONValue>;}
let transactionIDCounter = 0;
export class ReadTransactionImpl< Value extends ReadonlyJSONValue = ReadonlyJSONValue,> implements ReadTransaction{ readonly clientID: string; protected readonly _dbtx: db.Read; protected readonly _lc: LogContext;
constructor( clientID: string, dbRead: db.Read, lc: LogContext, rpcName = 'openReadTransaction', ) { this.clientID = clientID; this._dbtx = dbRead; this._lc = lc .addContext(rpcName) .addContext('txid', transactionIDCounter++); }
async get(key: string): Promise<Value | undefined> { throwIfClosed(this._dbtx); const rv = await this._dbtx.get(key); if (this._dbtx instanceof db.Write) { return (rv && deepClone(rv)) as Value | undefined; } return rv as Value | undefined; }
async has(key: string): Promise<boolean> { throwIfClosed(this._dbtx); return this._dbtx.has(key); }
async isEmpty(): Promise<boolean> { throwIfClosed(this._dbtx); return this._dbtx.isEmpty(); }
scan(): ScanResult<string, Value>; scan<Options extends ScanOptions>( options?: Options, ): ScanResult<KeyTypeForScanOptions<Options>, Value>; scan<Options extends ScanOptions>( options?: Options, ): ScanResult<KeyTypeForScanOptions<Options>, Value> { return scan(options, this._dbtx, noop); }}
function noop(_: unknown): void { }
function scan<Options extends ScanOptions, Value>( options: Options | undefined, dbRead: db.Read, onLimitKey: (inclusiveLimitKey: string) => void,): ScanResult<KeyTypeForScanOptions<Options>, Value> { const iter = getScanIterator(dbRead, options); return makeScanResultFromScanIteratorInternal( iter, options ?? ({} as Options), dbRead, onLimitKey, ) as ScanResult<KeyTypeForScanOptions<Options>, Value>;}
export class SubscriptionTransactionWrapper extends ReadTransactionImpl<ReadonlyJSONValue> { private readonly _keys: Set<string> = new Set(); private readonly _scans: ScanSubscriptionInfo[] = [];
isEmpty(): Promise<boolean> { this._scans.push({options: {}}); return super.isEmpty(); }
get(key: string): Promise<ReadonlyJSONValue | undefined> { this._keys.add(key); return super.get(key); }
has(key: string): Promise<boolean> { this._keys.add(key); return super.has(key); }
scan(): ScanResult<string, ReadonlyJSONValue>; scan<Options extends ScanOptions>( options?: Options, ): ScanResult<KeyTypeForScanOptions<Options>, ReadonlyJSONValue>; scan<Options extends ScanOptions>( options?: Options, ): ScanResult<KeyTypeForScanOptions<Options>, ReadonlyJSONValue> { const scanInfo: ScanSubscriptionInfo = { options: toDbScanOptions(options), inclusiveLimitKey: undefined, }; this._scans.push(scanInfo); return scan(options, this._dbtx, inclusiveLimitKey => { scanInfo.inclusiveLimitKey = inclusiveLimitKey; }); }
get keys(): ReadonlySet<string> { return this._keys; }
get scans(): ScanSubscriptionInfo[] { return this._scans; }}
export interface WriteTransaction extends ReadTransaction { put(key: string, value: JSONValue): Promise<void>;
del(key: string): Promise<boolean>;
get(key: string): Promise<JSONValue | undefined>;
scan(): ScanResult<string, JSONValue>; scan<Options extends ScanOptions>( options?: Options, ): ScanResult<KeyTypeForScanOptions<Options>, JSONValue>;}
export class WriteTransactionImpl extends ReadTransactionImpl<JSONValue> implements WriteTransaction{ protected declare readonly _dbtx: db.Write;
constructor( clientID: string, dbWrite: db.Write, lc: LogContext, rpcName = 'openWriteTransaction', ) { super(clientID, dbWrite, lc, rpcName); }
async put(key: string, value: JSONValue): Promise<void> { throwIfClosed(this._dbtx); await this._dbtx.put(this._lc, key, deepClone(value)); }
async del(key: string): Promise<boolean> { throwIfClosed(this._dbtx); return await this._dbtx.del(this._lc, key); }
async commit( generateChangedKeys: boolean, ): Promise<[Hash, sync.ChangedKeysMap]> { const txn = this._dbtx; throwIfClosed(txn);
const headName = txn.isRebase() ? sync.SYNC_HEAD_NAME : db.DEFAULT_HEAD_NAME; return await txn.commitWithChangedKeys(headName, generateChangedKeys); }}
export interface IndexTransaction extends ReadTransaction { createIndex(def: CreateIndexDefinition): Promise<void>;
dropIndex(name: string): Promise<void>;}
export interface CreateIndexDefinition { name: string;
prefix?: string;
jsonPointer: string;}
export class IndexTransactionImpl extends WriteTransactionImpl implements IndexTransaction{ constructor(clientID: string, dbWrite: db.Write, lc: LogContext) { super(clientID, dbWrite, lc, 'openIndexTransaction'); }
async createIndex(options: CreateIndexDefinition): Promise<void> { throwIfClosed(this._dbtx); await this._dbtx.createIndex( this._lc, options.name, options.prefix ?? '', options.jsonPointer, ); }
async dropIndex(name: string): Promise<void> { throwIfClosed(this._dbtx); await this._dbtx.dropIndex(name); }
async commit(): Promise<[Hash, sync.ChangedKeysMap]> { return super.commit(false); }}
type Entry<K> = readonly [key: K, value: ReadonlyJSONValue];
export type IndexKeyEntry = Entry<IndexKey>;
export type StringKeyEntry = Entry<string>;
export type EntryForOptions<Options extends ScanOptions> = Options extends ScanIndexOptions ? IndexKeyEntry : StringKeyEntry;
function getScanIterator<Options extends ScanOptions>( dbRead: db.Read, options: Options | undefined,): AsyncIterable<EntryForOptions<Options>> { if (options && isScanIndexOptions(options)) { return getScanIteratorForIndexMap(dbRead, options) as AsyncIterable< EntryForOptions<Options> >; }
return dbRead.map.scan(fromKeyForNonIndexScan(options)) as AsyncIterable< EntryForOptions<Options> >;}
export function fromKeyForNonIndexScan( options: ScanNoIndexOptions | undefined,): string { if (!options) { return ''; }
const {prefix = '', start} = options; if (start && start.key > prefix) { return start.key; } return prefix;}
function makeScanResultFromScanIteratorInternal< Options extends ScanOptions, Value,>( iter: AsyncIterable<EntryForOptions<Options>>, options: Options, dbRead: db.Read, onLimitKey: (inclusiveLimitKey: string) => void,): ScanResult<KeyTypeForScanOptions<Options>, Value> { return new ScanResultImpl(iter, options, dbRead, onLimitKey);}
async function* getScanIteratorForIndexMap( dbRead: db.Read, options: ScanIndexOptions,): AsyncIterable<IndexKeyEntry> { const map = await dbRead.getMapForIndex(options.indexName); for await (const entry of map.scan(fromKeyForIndexScanInternal(options))) { yield [decodeIndexKey(entry[0]), entry[1]]; }}