P(romise) Queue
Promise queue with concurrency control
Useful for rate-limiting async (or sync) operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.
Usage
Here we run only one promise at the time. For example, set concurrency
to 4 to run four promises at the same time.
import PQueue from 'https://deno.land/x/p_queue@1.0.0/mod.ts'
const queue = new PQueue({
concurrency: 1
})
async function one () {
await queue.add(() => fetch('https://sindresorhus.com'))
console.log('Done: sindresorhus.com')
}
async function two () {
await queue.add(() => fetch('https://avajs.dev'))
console.log('Done: avajs.dev')
}
async function three () {
const task = await getUnicornTask()
await queue.add(task)
console.log('Done: Unicorn task')
}
one()
two()
three()
API
See https://doc.deno.land/https/deno.land/x/p_queue@1.0.0/mod.ts
Events
active
Emitted as each item is processed in the queue for the purpose of tracking progress.
import PQueue from 'https://deno.land/x/p_queue@1.0.0/mod.ts'
const delay = (ms: number) => new Promise(r => setTimeout(r, ms))
const queue = new PQueue({
concurrency: 2
})
let count = 0
queue.on('active', () => {
console.log(`Working on item #${++count}. Size: <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mrow><mi>q</mi><mi>u</mi><mi>e</mi><mi>u</mi><mi>e</mi><mi mathvariant="normal">.</mi><mi>s</mi><mi>i</mi><mi>z</mi><mi>e</mi></mrow><mi>P</mi><mi>e</mi><mi>n</mi><mi>d</mi><mi>i</mi><mi>n</mi><mi>g</mi><mo>:</mo></mrow><annotation encoding="application/x-tex">{queue.size} Pending: </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.8889em;vertical-align:-0.1944em;"></span><span class="mord"><span class="mord mathnormal" style="margin-right:0.03588em;">q</span><span class="mord mathnormal">u</span><span class="mord mathnormal">e</span><span class="mord mathnormal">u</span><span class="mord mathnormal">e</span><span class="mord">.</span><span class="mord mathnormal">s</span><span class="mord mathnormal">i</span><span class="mord mathnormal">ze</span></span><span class="mord mathnormal" style="margin-right:0.13889em;">P</span><span class="mord mathnormal">e</span><span class="mord mathnormal">n</span><span class="mord mathnormal">d</span><span class="mord mathnormal">in</span><span class="mord mathnormal" style="margin-right:0.03588em;">g</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">:</span></span></span></span>{queue.pending}`)
})
queue.add(() => Promise.resolve())
queue.add(() => delay(2000))
queue.add(() => Promise.resolve())
queue.add(() => Promise.resolve())
queue.add(() => delay(500))
idle
Emitted every time the queue becomes empty and all promises have completed queue.size === 0 && queue.pending === 0
.
import PQueue from 'https://deno.land/x/p_queue@1.0.0/mod.ts'
const delay = (ms: number) => new Promise(r => setTimeout(r, ms))
const queue = new PQueue()
queue.on('idle', () => {
console.log(`Queue is idle. Size: <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mrow><mi>q</mi><mi>u</mi><mi>e</mi><mi>u</mi><mi>e</mi><mi mathvariant="normal">.</mi><mi>s</mi><mi>i</mi><mi>z</mi><mi>e</mi></mrow><mi>P</mi><mi>e</mi><mi>n</mi><mi>d</mi><mi>i</mi><mi>n</mi><mi>g</mi><mo>:</mo></mrow><annotation encoding="application/x-tex">{queue.size} Pending: </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.8889em;vertical-align:-0.1944em;"></span><span class="mord"><span class="mord mathnormal" style="margin-right:0.03588em;">q</span><span class="mord mathnormal">u</span><span class="mord mathnormal">e</span><span class="mord mathnormal">u</span><span class="mord mathnormal">e</span><span class="mord">.</span><span class="mord mathnormal">s</span><span class="mord mathnormal">i</span><span class="mord mathnormal">ze</span></span><span class="mord mathnormal" style="margin-right:0.13889em;">P</span><span class="mord mathnormal">e</span><span class="mord mathnormal">n</span><span class="mord mathnormal">d</span><span class="mord mathnormal">in</span><span class="mord mathnormal" style="margin-right:0.03588em;">g</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">:</span></span></span></span>{queue.pending}`)
})
const job1 = queue.add(() => delay(2000))
const job2 = queue.add(() => delay(500))
await job1
await job2
// => 'Queue is idle. Size: 0 Pending: 0'
await queue.add(() => delay(600))
// => 'Queue is idle. Size: 0 Pending: 0'
The idle
event is emitted every time the queue reaches an idle state. On the other hand, the promise the onIdle()
function returns resolves once the queue becomes idle instead of every time the queue is idle.
add
Emitted every time the add method is called and the number of pending or queued tasks is increased.
next
Emitted every time a task is completed and the number of pending or queued tasks is decreased.
import PQueue from 'https://deno.land/x/p_queue@1.0.0/mod.ts'
const delay = (ms: number) => new Promise(r => setTimeout(r, ms))
const queue = new PQueue()
queue.on('add', () => {
console.log(`Task is added. Size: <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mrow><mi>q</mi><mi>u</mi><mi>e</mi><mi>u</mi><mi>e</mi><mi mathvariant="normal">.</mi><mi>s</mi><mi>i</mi><mi>z</mi><mi>e</mi></mrow><mi>P</mi><mi>e</mi><mi>n</mi><mi>d</mi><mi>i</mi><mi>n</mi><mi>g</mi><mo>:</mo></mrow><annotation encoding="application/x-tex">{queue.size} Pending: </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.8889em;vertical-align:-0.1944em;"></span><span class="mord"><span class="mord mathnormal" style="margin-right:0.03588em;">q</span><span class="mord mathnormal">u</span><span class="mord mathnormal">e</span><span class="mord mathnormal">u</span><span class="mord mathnormal">e</span><span class="mord">.</span><span class="mord mathnormal">s</span><span class="mord mathnormal">i</span><span class="mord mathnormal">ze</span></span><span class="mord mathnormal" style="margin-right:0.13889em;">P</span><span class="mord mathnormal">e</span><span class="mord mathnormal">n</span><span class="mord mathnormal">d</span><span class="mord mathnormal">in</span><span class="mord mathnormal" style="margin-right:0.03588em;">g</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">:</span></span></span></span>{queue.pending}`)
})
queue.on('next', () => {
console.log(`Task is completed. Size: <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mrow><mi>q</mi><mi>u</mi><mi>e</mi><mi>u</mi><mi>e</mi><mi mathvariant="normal">.</mi><mi>s</mi><mi>i</mi><mi>z</mi><mi>e</mi></mrow><mi>P</mi><mi>e</mi><mi>n</mi><mi>d</mi><mi>i</mi><mi>n</mi><mi>g</mi><mo>:</mo></mrow><annotation encoding="application/x-tex">{queue.size} Pending: </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.8889em;vertical-align:-0.1944em;"></span><span class="mord"><span class="mord mathnormal" style="margin-right:0.03588em;">q</span><span class="mord mathnormal">u</span><span class="mord mathnormal">e</span><span class="mord mathnormal">u</span><span class="mord mathnormal">e</span><span class="mord">.</span><span class="mord mathnormal">s</span><span class="mord mathnormal">i</span><span class="mord mathnormal">ze</span></span><span class="mord mathnormal" style="margin-right:0.13889em;">P</span><span class="mord mathnormal">e</span><span class="mord mathnormal">n</span><span class="mord mathnormal">d</span><span class="mord mathnormal">in</span><span class="mord mathnormal" style="margin-right:0.03588em;">g</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">:</span></span></span></span>{queue.pending}`)
})
const job1 = queue.add(() => delay(2000))
const job2 = queue.add(() => delay(500))
await job1
await job2
//=> 'Task is added. Size: 0 Pending: 1'
//=> 'Task is added. Size: 0 Pending: 2'
await queue.add(() => delay(600))
//=> 'Task is completed. Size: 0 Pending: 1'
//=> 'Task is completed. Size: 0 Pending: 0'
Advanced example
A more advanced example to help you understand the flow.
import PQueue from 'https://deno.land/x/p_queue@1.0.0/mod.ts'
const delay = (ms: number) => new Promise(r => setTimeout(r, ms))
const queue = new PQueue({
concurrency: 1
})
async function taskOne () {
await delay(200)
console.log(`8. Pending promises: ${queue.pending}`)
//=> '8. Pending promises: 0'
(async () => {
await queue.add(async () => 'π')
console.log('11. Resolved')
})()
console.log('9. Added π')
console.log(`10. Pending promises: ${queue.pending}`)
//=> '10. Pending promises: 1'
await queue.onIdle()
console.log('12. All work is done')
}
async function taskTwo () {
await queue.add(async () => 'π¦')
console.log('5. Resolved')
}
console.log('1. Added π¦')
;(async () => {
await queue.add(async () => 'π΄')
console.log('6. Resolved')
})()
console.log('2. Added π΄')
;(async () => {
await queue.onEmpty()
console.log('7. Queue is empty')
})()
console.log(`3. Queue size: ${queue.size}`)
//=> '3. Queue size: 1`
console.log(`4. Pending promises: ${queue.pending}`)
//=> '4. Pending promises: 1'
$ node example.js
01. Added π¦
02. Added π΄
03. Queue size: 1
04. Pending promises: 1
05. Resolved π¦
06. Resolved π΄
07. Queue is empty
08. Pending promises: 0
09. Added π
10. Pending promises: 1
11. Resolved π
12. All work is done
Custom QueueClass
For implementing more complex scheduling policies, you can provide a QueueClass in the options:
class QueueClass {
constructor() {
this._queue = []
}
enqueue(run, options) {
this._queue.push(run)
}
dequeue() {
return this._queue.shift()
}
get size() {
return this._queue.length
}
filter(options) {
return this._queue
}
}
const queue = new PQueue({
queueClass: QueueClass
})
p-queue
will call corresponding methods to put and get operations from this queue.
FAQ
concurrency
and intervalCap
options affect each other?
How do the They are just different constraints. The concurrency
option limits how many things run at the same time. The intervalCap
option limits how many things run in total during the interval (over time).
License/Credits
- P(romise) Queue is licensed under the MIT license.
- Code is adapted from Sindreβs p-queue for node (also under the MIT license)