Module

std/node/stream.ts>pipeline

Deno standard library
Go to Latest
namespace pipeline
import { pipeline } from "https://dotland.deno.dev/std@0.177.0/node/stream.ts";
function pipeline
import { pipeline } from "https://dotland.deno.dev/std@0.177.0/node/stream.ts";

A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

// A pipeline to gzip a potentially huge tar file efficiently:

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  }
);

The pipeline API provides a promise version, which can also receive an options argument as the last parameter with asignal AbortSignal property. When the signal is aborted,destroy will be called on the underlying pipeline, with anAbortError.

const { pipeline } = require('stream/promises');

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

To use an AbortSignal, pass it inside an options object, as the last argument:

const { pipeline } = require('stream/promises');

async function run() {
  const ac = new AbortController();
  const signal = ac.signal;

  setTimeout(() => ac.abort(), 1);
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz'),
    { signal },
  );
}

run().catch(console.error); // AbortError

The pipeline API also supports async generators:

const { pipeline } = require('stream/promises');
const fs = require('fs');

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, signal) {
      source.setEncoding('utf8');  // Work with strings rather than `Buffer`s.
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal });
      }
    },
    fs.createWriteStream('uppercase.txt')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.

const { pipeline } = require('stream/promises');
const fs = require('fs');

async function run() {
  await pipeline(
    async function * (signal) {
      await someLongRunningfn({ signal });
      yield 'asd';
    },
    fs.createWriteStream('uppercase.txt')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

stream.pipeline() will call stream.destroy(err) on all streams except:

  • Readable streams which have emitted 'end' or 'close'.
  • Writable streams which have emitted 'finish' or 'close'.

stream.pipeline() leaves dangling event listeners on the streams after the callback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.

Type Parameters

A extends PipelineSource<any>
B extends PipelineDestination<A, any>

Parameters

source: A

Called when the pipeline is fully done.

destination: B
optional
callback: PipelineCallback<B>

Returns

B extends WritableStream ? B : WritableStream

Type Parameters

A extends PipelineSource<any>
T1 extends PipelineTransform<A, any>
B extends PipelineDestination<T1, any>

Parameters

source: A
transform1: T1
destination: B
optional
callback: PipelineCallback<B>

Returns

B extends WritableStream ? B : WritableStream

Type Parameters

A extends PipelineSource<any>
T1 extends PipelineTransform<A, any>
T2 extends PipelineTransform<T1, any>
B extends PipelineDestination<T2, any>

Parameters

source: A
transform1: T1
transform2: T2
destination: B
optional
callback: PipelineCallback<B>

Returns

B extends WritableStream ? B : WritableStream

Type Parameters

A extends PipelineSource<any>
T1 extends PipelineTransform<A, any>
T2 extends PipelineTransform<T1, any>
T3 extends PipelineTransform<T2, any>
B extends PipelineDestination<T3, any>

Parameters

source: A
transform1: T1
transform2: T2
transform3: T3
destination: B
optional
callback: PipelineCallback<B>

Returns

B extends WritableStream ? B : WritableStream

Type Parameters

A extends PipelineSource<any>
T1 extends PipelineTransform<A, any>
T2 extends PipelineTransform<T1, any>
T3 extends PipelineTransform<T2, any>
T4 extends PipelineTransform<T3, any>
B extends PipelineDestination<T4, any>

Parameters

source: A
transform1: T1
transform2: T2
transform3: T3
transform4: T4
destination: B
optional
callback: PipelineCallback<B>

Returns

B extends WritableStream ? B : WritableStream

Parameters

streams: ReadonlyArray<ReadableStream | WritableStream | ReadWriteStream>
optional
callback: (err: ErrnoException | null) => void

Returns

WritableStream

Parameters

stream1: ReadableStream
stream2: ReadWriteStream | WritableStream
...streams: Array<ReadWriteStream | WritableStream | ((err: ErrnoException | null) => void)>

Returns

WritableStream
import pipeline
import { pipeline } from "https://dotland.deno.dev/std@0.177.0/node/stream.ts";