import {consoleLogSink, LogContext, TeeLogSink} from '@rocicorp/logger';import {resolver} from '@rocicorp/resolver';import {Lock} from '@rocicorp/lock';import {deepClone, deepEqual, ReadonlyJSONValue} from './json';import type {JSONValue} from './json';import {Pusher, PushError} from './pusher';import { isClientStateNotFoundResponse, Puller, PullError, PullResponse, PullResponseOK,} from './puller';import { SubscriptionTransactionWrapper, IndexTransactionImpl, ReadTransactionImpl, WriteTransactionImpl,} from './transactions';import type { CreateIndexDefinition, ReadTransaction, WriteTransaction,} from './transactions';import {ConnectionLoop, MAX_DELAY_MS, MIN_DELAY_MS} from './connection-loop';import {defaultPuller} from './puller';import {defaultPusher} from './pusher';import type { ReplicacheInternalOptions, ReplicacheOptions,} from './replicache-options';import {PullDelegate, PushDelegate} from './connection-loop-delegates';import type {Subscription} from './subscriptions';import { subscriptionsForChangedKeys, subscriptionsForIndexDefinitionChanged,} from './subscriptions';import {IDBStore} from './kv/mod';import * as dag from './dag/mod';import * as db from './db/mod';import * as sync from './sync/mod';import { assertHash, assertNotTempHash, emptyHash, Hash, newTempHash,} from './hash';import * as persist from './persist/mod';import {requestIdle} from './request-idle';import type {HTTPRequestInfo} from './http-request-info';import {assertNotUndefined} from './asserts';import { getLicenseStatus, licenseActive, PROD_LICENSE_SERVER_URL, LicenseStatus, TEST_LICENSE_KEY,} from '@rocicorp/licensing/src/client';import {mustSimpleFetch} from './simple-fetch';import {initBgIntervalProcess} from './persist/bg-interval';import {setIntervalWithSignal} from './set-interval-with-signal.js';
export type BeginPullResult = { requestID: string; syncHead: Hash; ok: boolean;};
export type Poke = { baseCookie: ReadonlyJSONValue; pullResponse: PullResponse;};
export const httpStatusUnauthorized = 401;
export const REPLICACHE_FORMAT_VERSION = 4;const LAZY_STORE_SOURCE_CHUNK_CACHE_SIZE_LIMIT = 100 * 2 ** 20; const MUTATION_RECOVERY_LAZY_STORE_SOURCE_CHUNK_CACHE_SIZE_LIMIT = 10 * 2 ** 20; const RECOVER_MUTATIONS_INTERVAL_MS = 5 * 60 * 1000; const LICENSE_ACTIVE_INTERVAL_MS = 24 * 60 * 60 * 1000;
export type MaybePromise<T> = T | Promise<T>;
type ToPromise<P> = P extends Promise<unknown> ? P : Promise<P>;
export function makeIDBName(name: string, schemaVersion?: string): string { const n = `rep:${name}:${REPLICACHE_FORMAT_VERSION}`; return schemaVersion ? `${n}:${schemaVersion}` : n;}
const MAX_REAUTH_TRIES = 8;
const PERSIST_TIMEOUT = 1000;
const noop = () => { };
export type MutatorDefs = { [key: string]: ( tx: WriteTransaction, args?: any, ) => MaybePromise<JSONValue | void>;};
type MutatorReturn = MaybePromise<JSONValue | void>;
type MakeMutator< F extends (tx: WriteTransaction, ...args: [] | [JSONValue]) => MutatorReturn,> = F extends (tx: WriteTransaction, ...args: infer Args) => infer Ret ? (...args: Args) => ToPromise<Ret> : never;
type MakeMutators<T extends MutatorDefs> = { readonly [P in keyof T]: MakeMutator<T[P]>;};
export interface RequestOptions { minDelayMs?: number;
maxDelayMs?: number;}
const emptySet: ReadonlySet<string> = new Set();
type UnknownSubscription = Subscription<JSONValue | undefined, unknown>;type SubscriptionSet = Set<UnknownSubscription>;
export type ClientStateNotFoundReason = | {type: 'NotFoundOnServer'} | {type: 'NotFoundOnClient'};
const reasonServer = { type: 'NotFoundOnServer',} as const;
const reasonClient = { type: 'NotFoundOnClient',} as const;
export class Replicache<MD extends MutatorDefs = {}> { pullURL: string;
pushURL: string;
auth: string;
readonly name: string;
get idbName(): string { return makeIDBName(this.name, this.schemaVersion); }
readonly schemaVersion: string;
private get _idbDatabase(): persist.IndexedDBDatabase { return { name: this.idbName, replicacheName: this.name, replicacheFormatVersion: REPLICACHE_FORMAT_VERSION, schemaVersion: this.schemaVersion, }; } private _closed = false; private _online = true; private readonly _ready: Promise<void>; private readonly _profileIDPromise: Promise<string>; private readonly _clientIDPromise: Promise<string>; protected readonly _licenseCheckPromise: Promise<boolean>;
protected _licenseActivePromise: Promise<boolean>; private _root: Promise<Hash | undefined> = Promise.resolve(undefined); private readonly _mutatorRegistry = new Map< string, (tx: WriteTransaction, args?: JSONValue) => MutatorReturn >();
readonly mutate: MakeMutators<MD>;
private _pushCounter = 0; private _pullCounter = 0;
private _pullConnectionLoop: ConnectionLoop; private _pushConnectionLoop: ConnectionLoop;
private readonly _subscriptions: SubscriptionSet = new Set(); private readonly _pendingSubscriptions: SubscriptionSet = new Set();
pullInterval: number | null;
pushDelay: number;
private readonly _requestOptions: Required<RequestOptions>;
puller: Puller;
pusher: Pusher;
private readonly _licenseKey: string | undefined;
private readonly _memdag: dag.Store; private readonly _perdag: dag.Store; private readonly _idbDatabases: persist.IDBDatabasesStore = new persist.IDBDatabasesStore(); private _hasPendingSubscriptionRuns = false; private readonly _lc: LogContext;
private readonly _closeAbortController = new AbortController();
private readonly _persistLock = new Lock(); private _persistIsScheduled = false; private _recoveringMutations = false;
private readonly _enableLicensing: boolean; private readonly _enableMutationRecovery: boolean;
get requestOptions(): Required<RequestOptions> { return this._requestOptions; }
onSync: ((syncing: boolean) => void) | null = null;
onClientStateNotFound: ((reason: ClientStateNotFoundReason) => void) | null = reload;
getAuth: (() => MaybePromise<string | null | undefined>) | null | undefined = null;
constructor(options: ReplicacheOptions<MD>) { const { name, logLevel = 'info', logSinks = [consoleLogSink], pullURL = '', auth, pushDelay = 10, pushURL = '', schemaVersion = '', pullInterval = 60_000, mutators = {} as MD, requestOptions = {}, puller = defaultPuller, pusher = defaultPusher, licenseKey, experimentalKVStore, } = options; this.auth = auth ?? ''; this.pullURL = pullURL; this.pushURL = pushURL; if (name === undefined || name === '') { throw new Error('name is required and must be non-empty'); } this.name = name; this.schemaVersion = schemaVersion; this.pullInterval = pullInterval; this.pushDelay = pushDelay; this.puller = puller; this.pusher = pusher;
const internalOptions = options as ReplicacheInternalOptions; const {enableLicensing = true, enableMutationRecovery = true} = internalOptions; this._enableLicensing = enableLicensing; this._enableMutationRecovery = enableMutationRecovery;
if (internalOptions.exposeInternalAPI) { internalOptions.exposeInternalAPI({ persist: () => this._persist(), }); }
const logSink = logSinks.length === 1 ? logSinks[0] : new TeeLogSink(logSinks); this._lc = new LogContext(logLevel, logSink).addContext('name', name);
const perKvStore = experimentalKVStore || new IDBStore(this.idbName); this._perdag = new dag.StoreImpl( perKvStore, dag.throwChunkHasher, assertNotTempHash, ); this._memdag = new dag.LazyStore( this._perdag, LAZY_STORE_SOURCE_CHUNK_CACHE_SIZE_LIMIT, this._memdagHashFunction(), assertHash, );
const readyResolver = resolver<void>(); this._ready = readyResolver.promise;
this._licenseKey = licenseKey; const licenseCheckResolver = resolver<boolean>(); this._licenseCheckPromise = licenseCheckResolver.promise; const licenseActiveResolver = resolver<boolean>(); this._licenseActivePromise = licenseActiveResolver.promise;
const {minDelayMs = MIN_DELAY_MS, maxDelayMs = MAX_DELAY_MS} = requestOptions; this._requestOptions = {maxDelayMs, minDelayMs};
this._pullConnectionLoop = new ConnectionLoop( new PullDelegate( this, () => this._invokePull(), this._lc.addContext('PULL'), ), );
this._pushConnectionLoop = new ConnectionLoop( new PushDelegate( this, () => this._invokePush(), this._lc.addContext('PUSH'), ), );
this.mutate = this._registerMutators(mutators);
const profileIDResolver = resolver<string>(); this._profileIDPromise = profileIDResolver.promise; const clientIDResolver = resolver<string>(); this._clientIDPromise = clientIDResolver.promise;
void this._open( profileIDResolver.resolve, clientIDResolver.resolve, readyResolver.resolve, licenseCheckResolver.resolve, licenseActiveResolver.resolve, ); }
protected _memdagHashFunction(): <V extends ReadonlyJSONValue>( data: V, ) => Hash { return newTempHash; }
private async _open( profileIDResolver: (profileID: string) => void, resolveClientID: (clientID: string) => void, resolveReady: () => void, resolveLicenseCheck: (valid: boolean) => void, resolveLicenseActive: (active: boolean) => void, ): Promise<void> { await closingInstances.get(this.name); await this._idbDatabases.getProfileID().then(profileIDResolver); await this._idbDatabases.putDatabase(this._idbDatabase); const [clientID, client, clients] = await persist.initClient(this._perdag); resolveClientID(clientID); await this._memdag.withWrite(async write => { await write.setHead(db.DEFAULT_HEAD_NAME, client.headHash); await write.commit(); });
resolveReady();
this._root = this._getRoot(); await this._root;
await this._licenseCheck(resolveLicenseCheck);
this.pull(); this._push();
const {signal} = this._closeAbortController;
persist.startHeartbeats( clientID, this._perdag, () => { this._fireOnClientStateNotFound(clientID, reasonClient); }, this._lc, signal, ); persist.initClientGC(clientID, this._perdag, this._lc, signal);
persist.initCollectIDBDatabases(this._idbDatabases, this._lc, signal);
setIntervalWithSignal( () => this._recoverMutations(), RECOVER_MUTATIONS_INTERVAL_MS, signal, ); void this._recoverMutations(clients);
getDocument()?.addEventListener( 'visibilitychange', this._onVisibilityChange, );
await this._startLicenseActive(resolveLicenseActive, this._lc, signal); }
private _onVisibilityChange = async () => { if (this._closed) { return; }
if (getDocument()?.visibilityState !== 'visible') { return; }
await this._checkForClientStateNotFoundAndCallHandler(); };
private async _checkForClientStateNotFoundAndCallHandler(): Promise<boolean> { const clientID = await this._clientIDPromise; const hasClientState = await this._perdag.withRead(read => persist.hasClientState(clientID, read), ); if (!hasClientState) { this._fireOnClientStateNotFound(clientID, reasonClient); } return !hasClientState; }
private async _licenseCheck( resolveLicenseCheck: (valid: boolean) => void, ): Promise<void> { if (!this._enableLicensing) { resolveLicenseCheck(true); return; } if (!this._licenseKey) { await this._licenseInvalid( this._lc, `license key ReplicacheOptions.licenseKey is not set`, true , resolveLicenseCheck, ); return; } this._lc.info?.(`Replicache license key: ${this._licenseKey}`); if (this._licenseKey === TEST_LICENSE_KEY) { this._lc.info?.( `Skipping license check for TEST_LICENSE_KEY. ` + `You may ONLY use this key for automated (e.g., unit/CI) testing. ` + `See https://replicache.dev for more information.`, ); resolveLicenseCheck(true); return; } try { const resp = await getLicenseStatus( mustSimpleFetch, PROD_LICENSE_SERVER_URL, this._licenseKey, this._lc, ); if (resp.pleaseUpdate) { this._lc.error?.( `You are using an old version of Replicache that uses deprecated licensing features. ` + `Please update Replicache else it may stop working.`, ); } if (resp.status === LicenseStatus.Valid) { this._lc.info?.(`License is valid.`); } else { await this._licenseInvalid( this._lc, `status: ${resp.status}`, resp.disable, resolveLicenseCheck, ); return; } } catch (err) { this._lc.error?.(`Error checking license: ${err}`); } resolveLicenseCheck(true); }
private async _licenseInvalid( lc: LogContext, reason: string, disable: boolean, resolveLicenseCheck: (valid: boolean) => void, ): Promise<void> { lc.error?.( `** REPLICACHE LICENSE NOT VALID ** Replicache license key '${this._licenseKey}' is not valid (${reason}). ` + `Please run 'npx replicache get-license' to get a license key or contact hello@replicache.dev for help.`, ); if (disable) { await this.close(); lc.error?.(`** REPLICACHE DISABLED **`); } resolveLicenseCheck(false); return; }
private async _startLicenseActive( resolveLicenseActive: (valid: boolean) => void, lc: LogContext, signal: AbortSignal, ): Promise<void> { if ( !this._enableLicensing || !this._licenseKey || this._licenseKey === TEST_LICENSE_KEY ) { resolveLicenseActive(false); return; }
const markActive = async () => { try { await licenseActive( mustSimpleFetch, PROD_LICENSE_SERVER_URL, this._licenseKey as string, await this.profileID, lc, ); } catch (err) { this._lc.info?.(`Error sending license active ping: ${err}`); } }; await markActive(); resolveLicenseActive(true);
initBgIntervalProcess( 'LicenseActive', markActive, LICENSE_ACTIVE_INTERVAL_MS, lc, signal, ); }
get profileID(): Promise<string> { return this._profileIDPromise; }
get clientID(): Promise<string> { return this._clientIDPromise; }
onOnlineChange: ((online: boolean) => void) | null = null;
get online(): boolean { return this._online; }
get closed(): boolean { return this._closed; }
async close(): Promise<void> { this._closed = true; const {promise, resolve} = resolver(); closingInstances.set(this.name, promise);
this._closeAbortController.abort();
getDocument()?.removeEventListener( 'visibilitychange', this._onVisibilityChange, );
await this._ready; const closingPromises = [ this._memdag.close(), this._perdag.close(), this._idbDatabases.close(), ];
this._pullConnectionLoop.close(); this._pushConnectionLoop.close();
for (const subscription of this._subscriptions) { subscription.onDone?.(); } this._subscriptions.clear();
await Promise.all(closingPromises); closingInstances.delete(this.name); resolve(); }
private async _getRoot(): Promise<Hash | undefined> { if (this._closed) { return undefined; } await this._ready; return await db.getRoot(this._memdag, db.DEFAULT_HEAD_NAME); }
private async _checkChange( root: Hash | undefined, changedKeys: sync.ChangedKeysMap, ): Promise<void> { const currentRoot = await this._root; if (root !== undefined && root !== currentRoot) { this._root = Promise.resolve(root); await this._fireOnChange(changedKeys); } }
async createIndex(def: CreateIndexDefinition): Promise<void> { await this._indexOp(tx => tx.createIndex(def)); await this._indexDefinitionChanged(def.name); }
async dropIndex(name: string): Promise<void> { await this._indexOp(tx => tx.dropIndex(name)); await this._indexDefinitionChanged(name); }
private async _indexOp( f: (tx: IndexTransactionImpl) => Promise<void>, ): Promise<void> { await this._ready; const clientID = await this._clientIDPromise; await this._memdag.withWrite(async dagWrite => { const dbWrite = await db.Write.newIndexChange( db.whenceHead(db.DEFAULT_HEAD_NAME), dagWrite, ); const tx = new IndexTransactionImpl(clientID, dbWrite, this._lc); await f(tx); await tx.commit(); }); }
protected async _maybeEndPull( beginPullResult: BeginPullResult, ): Promise<void> { if (this._closed) { return; }
let {syncHead} = beginPullResult; const {requestID} = beginPullResult;
await this._ready; const lc = this._lc .addContext('maybeEndPull') .addContext('request_id', requestID); const {replayMutations, changedKeys} = await sync.maybeEndPull( this._memdag, lc, syncHead, );
if (!replayMutations || replayMutations.length === 0) { await this._checkChange(syncHead, changedKeys); this._schedulePersist(); return; }
for (const mutation of replayMutations) { syncHead = await this._replay( syncHead, mutation.original, mutation.name, mutation.args, mutation.timestamp, ); }
await this._maybeEndPull({...beginPullResult, syncHead}); }
private async _replay<A extends JSONValue>( basis: Hash, original: Hash, name: string, args: A, timestamp: number, ): Promise<Hash> { let mutatorImpl = this._mutatorRegistry.get(name); if (!mutatorImpl) { this._lc.error?.(`Unknown mutator ${name}`); mutatorImpl = async () => { }; } const res = await this._mutate( name, mutatorImpl, args, timestamp, {basis, original}, true, );
return res.ref; }
private async _invokePull(): Promise<boolean> { if (this._isPullDisabled()) { return true; } return await this._wrapInOnlineCheck(async () => { try { this._changeSyncCounters(0, 1); const beginPullResult = await this._beginPull(); if (!beginPullResult.ok) { return false; } if (beginPullResult.syncHead !== emptyHash) { await this._maybeEndPull(beginPullResult); } } finally { this._changeSyncCounters(0, -1); } return true; }, 'Pull'); }
private _isPullDisabled() { return this.pullURL === '' && this.puller === defaultPuller; }
private async _wrapInOnlineCheck( f: () => Promise<boolean>, name: string, ): Promise<boolean> { let online = true;
try { return await f(); } catch (e) {
if (e instanceof PushError || e instanceof PullError) { online = false; this._lc.info?.(`${name} threw:\n`, e, '\nwith cause:\n', e.causedBy); } else { this._lc.info?.(`${name} threw:\n`, e); } return false; } finally { if (this._online !== online) { this._online = online; this.onOnlineChange?.(online); if (online) { void this._recoverMutations(); } } } }
private async _wrapInReauthRetries<R>( f: () => Promise<{ httpRequestInfo: HTTPRequestInfo | undefined; result: R; }>, verb: string, serverURL: string, preAuth: () => MaybePromise<void> = noop, postAuth: () => MaybePromise<void> = noop, ): Promise<{ result: R; authFailure: boolean; }> { let reauthAttempts = 0; let lastResult; do { const {httpRequestInfo, result} = await f(); lastResult = result; if (!httpRequestInfo) { return { result, authFailure: false, }; } const {errorMessage, httpStatusCode} = httpRequestInfo;
if (errorMessage || httpStatusCode >= 400) { this._lc.error?.( `Got error response from server (${serverURL}) doing ${verb}: ${httpStatusCode}` + (errorMessage ? `: ${errorMessage}` : ''), ); } if (httpStatusCode !== httpStatusUnauthorized) { return { result, authFailure: false, }; } if (!this.getAuth) { return { result, authFailure: true, }; } let auth; try { await preAuth(); auth = await this.getAuth(); } finally { await postAuth(); } if (auth === null || auth === undefined) { return { result, authFailure: true, }; } this.auth = auth; reauthAttempts++; } while (reauthAttempts < MAX_REAUTH_TRIES); this._lc.info?.('Tried to reauthenticate too many times'); return { result: lastResult, authFailure: true, }; }
private _isPushDisabled() { return this.pushURL === '' && this.pusher === defaultPusher; }
protected async _invokePush(): Promise<boolean> { if (this._isPushDisabled()) { return true; }
return this._wrapInOnlineCheck(async () => { const {result: pushResponse} = await this._wrapInReauthRetries( async () => { await this._ready; const profileID = await this._profileIDPromise; const clientID = await this._clientIDPromise; const requestID = sync.newRequestID(clientID); const lc = this._lc .addContext('push') .addContext('request_id', requestID); try { this._changeSyncCounters(1, 0); const pushResponse = await sync.push( requestID, this._memdag, lc, profileID, clientID, this.pusher, this.pushURL, this.auth, this.schemaVersion, ); return {result: pushResponse, httpRequestInfo: pushResponse}; } finally { this._changeSyncCounters(-1, 0); } }, 'push', this.pushURL, ); return pushResponse === undefined || pushResponse.httpStatusCode === 200; }, 'Push'); }
private _push(): void { this._pushConnectionLoop.send(); }
pull(): void { this._pullConnectionLoop.send(); }
async poke(poke: Poke): Promise<void> { await this._ready; const clientID = await this._clientIDPromise; const requestID = sync.newRequestID(clientID); const lc = this._lc .addContext('handlePullResponse') .addContext('request_id', requestID);
if (isClientStateNotFoundResponse(poke.pullResponse)) { this._fireOnClientStateNotFound(clientID, reasonServer); return; }
const syncHead = await sync.handlePullResponse( lc, this._memdag, poke.baseCookie, poke.pullResponse, ); if (syncHead === null) { throw new Error( 'unexpected base cookie for poke: ' + JSON.stringify(poke), ); }
await this._maybeEndPull({ requestID, syncHead, ok: true, }); }
protected async _beginPull(): Promise<BeginPullResult> { const { result: {beginPullResponse, requestID}, } = await this._wrapInReauthRetries( async () => { await this._ready; const profileID = await this.profileID; const clientID = await this._clientIDPromise;
const requestID = sync.newRequestID(clientID); const lc = this._lc .addContext('beginPull') .addContext('request_id', requestID); const req = { pullAuth: this.auth, pullURL: this.pullURL, schemaVersion: this.schemaVersion, puller: this.puller, }; const beginPullResponse = await sync.beginPull( profileID, clientID, req, req.puller, requestID, this._memdag, lc, ); return { result: {beginPullResponse, requestID}, httpRequestInfo: beginPullResponse.httpRequestInfo, }; }, 'pull', this.pullURL, () => this._changeSyncCounters(0, -1), () => this._changeSyncCounters(0, 1), );
if (isClientStateNotFoundResponse(beginPullResponse.pullResponse)) { const clientID = await this._clientIDPromise; this._fireOnClientStateNotFound(clientID, reasonServer); }
const {syncHead, httpRequestInfo} = beginPullResponse; return {requestID, syncHead, ok: httpRequestInfo.httpStatusCode === 200}; }
private async _persist(): Promise<void> { if (this._closed) { return; } await this._ready; const clientID = await this.clientID; try { await this._persistLock.withLock(() => persist.persist(clientID, this._memdag, this._perdag), ); } catch (e) { if (e instanceof persist.ClientStateNotFoundError) { this._fireOnClientStateNotFound(clientID, reasonClient); } else { throw e; } } } private _fireOnClientStateNotFound( clientID: sync.ClientID, reason: ClientStateNotFoundReason, ) { this._lc.error?.(`Client state not found, clientID: ${clientID}`); this.onClientStateNotFound?.(reason); }
private _schedulePersist(): void { if (this._persistIsScheduled) { return; } this._persistIsScheduled = true; void (async () => { await requestIdle(PERSIST_TIMEOUT); await this._persist(); this._persistIsScheduled = false; })(); }
private _changeSyncCounters(pushDelta: 0, pullDelta: 1 | -1): void; private _changeSyncCounters(pushDelta: 1 | -1, pullDelta: 0): void; private _changeSyncCounters(pushDelta: number, pullDelta: number): void { this._pushCounter += pushDelta; this._pullCounter += pullDelta; const delta = pushDelta + pullDelta; const counter = this._pushCounter + this._pullCounter; if ((delta === 1 && counter === 1) || counter === 0) { const syncing = counter > 0; Promise.resolve().then(() => this.onSync?.(syncing)); } }
private async _fireOnChange(changedKeys: sync.ChangedKeysMap): Promise<void> { const subscriptions = subscriptionsForChangedKeys( this._subscriptions, changedKeys, ); await this._fireSubscriptions(subscriptions, false); }
private async _indexDefinitionChanged(name: string): Promise<void> { const subscriptions = subscriptionsForIndexDefinitionChanged( this._subscriptions, name, ); await this._fireSubscriptions(subscriptions, false); }
private async _fireSubscriptions( subscriptions: Iterable<Subscription<JSONValue | undefined, unknown>>, skipEqualsCheck: boolean, ) { const subs = [...subscriptions]; if (subs.length === 0) { return; }
type R = | {ok: true; value: JSONValue | undefined} | {ok: false; error: unknown}; const results = await this._queryInternal( SubscriptionTransactionWrapper, async tx => { const promises = subs.map(async s => { try { const value = await s.body(tx); return {ok: true, value} as R; } catch (error) { return {ok: false, error} as R; } finally { s.keys = tx.keys; s.scans = tx.scans; } }); return await Promise.all(promises); }, ); for (let i = 0; i < subs.length; i++) { const s = subs[i]; const result = results[i]; if (result.ok) { const {value} = result; if (skipEqualsCheck || !deepEqual(value, s.lastValue)) { s.lastValue = value; s.onData(value); } } else { if (s.onError) { s.onError(result.error); } else { this._lc.error?.(result.error); } } } }
subscribe<R extends ReadonlyJSONValue | undefined, E>( body: (tx: ReadTransaction) => Promise<R>, { onData, onError, onDone, }: { onData: (result: R) => void; onError?: (error: E) => void; onDone?: () => void; }, ): () => void { const s = { body, onData, onError, onDone, lastValue: undefined, keys: emptySet, scans: [], } as unknown as UnknownSubscription; this._subscriptions.add(s);
void this._scheduleInitialSubscriptionRun(s);
return (): void => { this._subscriptions.delete(s); }; } private async _scheduleInitialSubscriptionRun(s: UnknownSubscription) { this._pendingSubscriptions.add(s);
if (!this._hasPendingSubscriptionRuns) { this._hasPendingSubscriptionRuns = true; await Promise.resolve(); this._hasPendingSubscriptionRuns = false; const subscriptions = [...this._pendingSubscriptions]; this._pendingSubscriptions.clear(); await this._fireSubscriptions(subscriptions, true); } }
async query<R>(body: (tx: ReadTransaction) => Promise<R> | R): Promise<R> { return this._queryInternal(ReadTransactionImpl, body); }
private async _queryInternal< R, C extends ReadTransactionImpl<ReadonlyJSONValue>, >( ctor: new (clientID: string, dbRead: db.Read, lc: LogContext) => C, body: (tx: C) => Promise<R> | R, ): Promise<R> { await this._ready; const clientID = await this._clientIDPromise; return this._memdag.withRead(async dagRead => { const dbRead = await db.readFromDefaultHead(dagRead); const tx = new ctor(clientID, dbRead, this._lc); try { return await body(tx); } catch (ex) { throw await this._convertToClientStateNotFoundError(ex); } }); }
private _register<Return extends JSONValue | void, Args extends JSONValue>( name: string, mutatorImpl: (tx: WriteTransaction, args?: Args) => MaybePromise<Return>, ): (args?: Args) => Promise<Return> { this._mutatorRegistry.set( name, mutatorImpl as ( tx: WriteTransaction, args: JSONValue | undefined, ) => Promise<void | JSONValue>, );
return async (args?: Args): Promise<Return> => ( await this._mutate( name, mutatorImpl, args, performance.now(), undefined, false, ) ).result; }
private _registerMutators< M extends { [key: string]: (tx: WriteTransaction, args?: JSONValue) => MutatorReturn; }, >(regs: M): MakeMutators<M> { type Mut = MakeMutators<M>; const rv: Partial<Mut> = Object.create(null); for (const k in regs) { rv[k] = this._register(k, regs[k]) as MakeMutator<M[typeof k]>; } return rv as Mut; }
private async _mutate<R extends JSONValue | void, A extends JSONValue>( name: string, mutatorImpl: (tx: WriteTransaction, args?: A) => MaybePromise<R>, args: A | undefined, timestamp: number, rebaseOpts: sync.RebaseOpts | undefined, isReplay: boolean, ): Promise<{result: R; ref: Hash}> { if (this._hasPendingSubscriptionRuns) { await Promise.resolve(); }
await this._ready; const clientID = await this._clientIDPromise; return await this._memdag.withWrite(async dagWrite => { let whence: db.Whence | undefined; let originalHash: Hash | null = null; if (rebaseOpts === undefined) { whence = db.whenceHead(db.DEFAULT_HEAD_NAME); } else { await sync.validateRebase(rebaseOpts, dagWrite, name, args); whence = db.whenceHash(rebaseOpts.basis); originalHash = rebaseOpts.original; }
const dbWrite = await db.Write.newLocal( whence, name, deepClone(args ?? null), originalHash, dagWrite, timestamp, );
const tx = new WriteTransactionImpl(clientID, dbWrite, this._lc); try { const result: R = await mutatorImpl(tx, args);
const [ref, changedKeys] = await tx.commit(!isReplay); if (!isReplay) { this._pushConnectionLoop.send(); await this._checkChange(ref, changedKeys); this._schedulePersist(); }
return {result, ref}; } catch (ex) { throw await this._convertToClientStateNotFoundError(ex); } }); }
private async _convertToClientStateNotFoundError( ex: unknown, ): Promise<unknown> { if ( ex instanceof dag.ChunkNotFoundError && (await this._checkForClientStateNotFoundAndCallHandler()) ) { return new persist.ClientStateNotFoundError(await this._clientIDPromise); }
return ex; }
protected async _recoverMutations( preReadClientMap?: persist.ClientMap, ): Promise<boolean> { if ( !this._enableMutationRecovery || this._recoveringMutations || !this.online || this.closed || this._isPushDisabled() ) { return false; } const stepDescription = 'Recovering mutations.'; this._lc.debug?.('Start:', stepDescription); try { this._recoveringMutations = true; await this._ready; await this._recoverMutationsFromPerdag( this._idbDatabase, this._perdag, preReadClientMap, ); for (const database of Object.values( await this._idbDatabases.getDatabases(), )) { if (this.closed) { this._lc.debug?.('Exiting early due to close:', stepDescription); return true; } if ( database.name === this.idbName || database.replicacheName !== this.name || database.replicacheFormatVersion !== REPLICACHE_FORMAT_VERSION ) { continue; } await this._recoverMutationsFromPerdag(database); } } catch (e) { this._logMutationRecoveryError(e, stepDescription); } finally { this._lc.debug?.('End:', stepDescription); this._recoveringMutations = false; } return true; }
private async _recoverMutationsFromPerdag( database: persist.IndexedDBDatabase, perdag?: dag.Store, preReadClientMap?: persist.ClientMap, ): Promise<void> { const stepDescription = `Recovering mutations from db ${database.name}.`; this._lc.debug?.('Start:', stepDescription); let perDagToClose: dag.Store | undefined = undefined; try { if (!perdag) { const perKvStore = new IDBStore(database.name); perdag = perDagToClose = new dag.StoreImpl( perKvStore, dag.throwChunkHasher, assertNotTempHash, ); } let clientMap: persist.ClientMap | undefined = preReadClientMap || (await perdag.withRead(read => persist.getClients(read))); const clientIDsVisited = new Set<sync.ClientID>(); while (clientMap) { let newClientMap: persist.ClientMap | undefined; for (const [clientID, client] of clientMap) { if (this.closed) { this._lc.debug?.('Exiting early due to close:', stepDescription); return; } if (!clientIDsVisited.has(clientID)) { clientIDsVisited.add(clientID); newClientMap = await this._recoverMutationsOfClient( client, clientID, perdag, database, ); if (newClientMap) { break; } } } clientMap = newClientMap; } } catch (e) { this._logMutationRecoveryError(e, stepDescription); } finally { await perDagToClose?.close(); this._lc.debug?.('End:', stepDescription); } }
private async _recoverMutationsOfClient( client: persist.Client, clientID: sync.ClientID, perdag: dag.Store, database: persist.IndexedDBDatabase, ): Promise<persist.ClientMap | undefined> { const selfClientID = await this._clientIDPromise; if (selfClientID === clientID) { return undefined; } if (client.lastServerAckdMutationID >= client.mutationID) { return undefined; } const stepDescription = `Recovering mutations for ${clientID}.`; this._lc.debug?.('Start:', stepDescription); let dagForOtherClientToClose: dag.LazyStore | undefined; try { const dagForOtherClient = (dagForOtherClientToClose = new dag.LazyStore( perdag, MUTATION_RECOVERY_LAZY_STORE_SOURCE_CHUNK_CACHE_SIZE_LIMIT, dag.throwChunkHasher, assertHash, ));
await dagForOtherClient.withWrite(async write => { await write.setHead(db.DEFAULT_HEAD_NAME, client.headHash); await write.commit(); });
if (this._isPushDisabled()) { this._lc.debug?.( `Cannot recover mutations for client ${clientID} because push is disabled.`, ); return; } const {pusher, pushURL} = this;
const pushRequestID = sync.newRequestID(clientID); const pushDescription = 'recoveringMutationsPush'; const pushLC = this._lc .addContext(pushDescription) .addContext('request_id', pushRequestID); const pushSucceeded = await this._wrapInOnlineCheck(async () => { const {result: pushResponse} = await this._wrapInReauthRetries( async () => { assertNotUndefined(dagForOtherClient); const pushResponse = await sync.push( pushRequestID, dagForOtherClient, pushLC, await this.profileID, clientID, pusher, pushURL, this.auth, database.schemaVersion, ); return {result: pushResponse, httpRequestInfo: pushResponse}; }, pushDescription, this.pushURL, ); return !!pushResponse && pushResponse.httpStatusCode === 200; }, pushDescription); if (!pushSucceeded) { this._lc.debug?.( `Failed to recover mutations for client ${clientID} due to a push error.`, ); return; }
if (this._isPullDisabled()) { this._lc.debug?.( `Cannot confirm mutations were recovered for client ${clientID} ` + `because pull is disabled.`, ); return; } const {puller, pullURL} = this;
const requestID = sync.newRequestID(clientID); const pullDescription = 'recoveringMutationsPull'; const pullLC = this._lc .addContext(pullDescription) .addContext('request_id', requestID);
let pullResponse: PullResponse | undefined; const pullSucceeded = await this._wrapInOnlineCheck(async () => { const {result: beginPullResponse} = await this._wrapInReauthRetries( async () => { const beginPullRequest = { pullAuth: this.auth, pullURL, schemaVersion: database.schemaVersion, puller, }; const beginPullResponse = await sync.beginPull( await this.profileID, clientID, beginPullRequest, beginPullRequest.puller, requestID, dagForOtherClient, pullLC, false, ); return { result: beginPullResponse, httpRequestInfo: beginPullResponse.httpRequestInfo, }; }, pullDescription, this.pullURL, ); pullResponse = beginPullResponse.pullResponse; return ( !!pullResponse && beginPullResponse.httpRequestInfo.httpStatusCode === 200 ); }, pullDescription); if (!pullSucceeded) { this._lc.debug?.( `Failed to recover mutations for client ${clientID} due to a pull error.`, ); return; }
if (pullResponse && isClientStateNotFoundResponse(pullResponse)) { this._lc.debug?.( `Client ${selfClientID} cannot recover mutations for client ` + `${clientID}. The client no longer exists on the server.`, ); } else { this._lc.debug?.( `Client ${selfClientID} recovered mutations for client ` + `${clientID}. Details`, { mutationID: client.mutationID, lastServerAckdMutationID: client.lastServerAckdMutationID, lastMutationID: pullResponse?.lastMutationID, }, ); } const newClientMap = await persist.updateClients( (clients: persist.ClientMap) => { assertNotUndefined(pullResponse);
const clientToUpdate = clients.get(clientID); if (!clientToUpdate) { return persist.noClientUpdates; }
if (isClientStateNotFoundResponse(pullResponse)) { const newClients = new Map(clients); newClients.delete(clientID); return {clients: newClients}; }
if ( clientToUpdate.lastServerAckdMutationID >= (pullResponse as PullResponseOK).lastMutationID ) { return persist.noClientUpdates; } return { clients: new Map(clients).set(clientID, { ...clientToUpdate, lastServerAckdMutationID: (pullResponse as PullResponseOK) .lastMutationID, }), }; }, perdag, ); return newClientMap; } catch (e) { this._logMutationRecoveryError(e, stepDescription); return; } finally { await dagForOtherClientToClose?.close(); this._lc.debug?.('End:', stepDescription); } } private _logMutationRecoveryError(e: unknown, stepDescription: string) { if (this.closed) { this._lc.debug?.( `Mutation recovery error likely due to close during:\n${stepDescription}\nError:\n`, e, ); } else { this._lc.error?.( `Mutation recovery error during:\n${stepDescription}\nError:\n`, e, ); } }}
const closingInstances: Map<string, Promise<unknown>> = new Map();
function getDocument(): Document | undefined { return typeof document !== 'undefined' ? document : undefined;}
function reload(): void { if (typeof location !== 'undefined') { location.reload(); }}