From 0d0520b310cd5076f014a50a1068fcc2fa50c4b0 Mon Sep 17 00:00:00 2001 From: Amol Yadav Date: Tue, 10 Mar 2026 19:57:05 +0530 Subject: [PATCH] added priotization intercepter --- index.js | 3 +- lib/interceptor/priority.js | 88 ++++++++++++++++++ test/interceptors/priority.js | 165 ++++++++++++++++++++++++++++++++++ 3 files changed, 255 insertions(+), 1 deletion(-) create mode 100644 lib/interceptor/priority.js create mode 100644 test/interceptors/priority.js diff --git a/index.js b/index.js index 708a8ee80c5..18b21c9e297 100644 --- a/index.js +++ b/index.js @@ -52,7 +52,8 @@ module.exports.interceptors = { dns: require('./lib/interceptor/dns'), cache: require('./lib/interceptor/cache'), decompress: require('./lib/interceptor/decompress'), - deduplicate: require('./lib/interceptor/deduplicate') + deduplicate: require('./lib/interceptor/deduplicate'), + priority: require('./lib/interceptor/priority') } module.exports.cacheStores = { diff --git a/lib/interceptor/priority.js b/lib/interceptor/priority.js new file mode 100644 index 00000000000..3812cf56b03 --- /dev/null +++ b/lib/interceptor/priority.js @@ -0,0 +1,88 @@ +'use strict' + +const DecoratorHandler = require('../handler/decorator-handler') + +class PriorityQueue { + #queue = [] + #concurrency + #running = 0 + + constructor (concurrency = 1) { + this.#concurrency = concurrency + } + + acquire (callback, priority = 0) { + this.#queue.push({ callback, priority }) + this.#queue.sort((a, b) => b.priority - a.priority) + this.#dispatch() + } + + release () { + this.#running-- + this.#dispatch() + } + + #dispatch () { + while (this.#running < this.#concurrency && this.#queue.length > 0) { + const entry = this.#queue.shift() + this.#running++ + entry.callback() + } + } +} + +class PriorityHandler extends DecoratorHandler { + #priorityQueue + + constructor (handler, priorityQueue) { + super(handler) + this.#priorityQueue = priorityQueue + } + + onResponseEnd (controller, trailers) { + this.#release() + return super.onResponseEnd(controller, trailers) + } + + onResponseError (controller, err) { + this.#release() + return super.onResponseError(controller, err) + } + + #release () { + if (this.#priorityQueue) { + const priorityQueue = this.#priorityQueue + this.#priorityQueue = null + priorityQueue.release() + } + } +} + +function createPriorityInterceptor ({ concurrency } = { concurrency: 1 }) { + return (dispatch) => { + const queues = new Map() + + return function priorityInterceptor (opts, handler) { + if (opts.priority == null || !opts.origin) { + return dispatch(opts, handler) + } + + let queue = queues.get(opts.origin) + if (!queue) { + queue = new PriorityQueue(concurrency) + queues.set(opts.origin, queue) + } + + queue.acquire(() => { + const priorityHandler = new PriorityHandler(handler, queue) + try { + dispatch(opts, priorityHandler) + } catch (err) { + priorityHandler.onResponseError(null, err) + } + }, opts.priority) + } + } +} + +module.exports = createPriorityInterceptor diff --git a/test/interceptors/priority.js b/test/interceptors/priority.js new file mode 100644 index 00000000000..dba9d19a88a --- /dev/null +++ b/test/interceptors/priority.js @@ -0,0 +1,165 @@ +'use strict' + +const { createServer } = require('node:http') +const { describe, test, after } = require('node:test') +const { once } = require('node:events') +const { strictEqual, deepStrictEqual } = require('node:assert') +const { setTimeout: sleep } = require('node:timers/promises') +const { Client, interceptors } = require('../../index') + +describe('Priority Interceptor', () => { + test('dispatches requests without priority normally', async () => { + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.end('ok') + }).listen(0) + + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.priority()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const res = await client.request({ + origin: `http://localhost:${server.address().port}`, + method: 'GET', + path: '/' + }) + + const body = await res.body.text() + strictEqual(res.statusCode, 200) + strictEqual(body, 'ok') + }) + + test('dispatches requests with priority', async () => { + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.end('ok') + }).listen(0) + + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.priority()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const res = await client.request({ + origin: `http://localhost:${server.address().port}`, + method: 'GET', + path: '/', + priority: 1 + }) + + const body = await res.body.text() + strictEqual(res.statusCode, 200) + strictEqual(body, 'ok') + }) + + test('higher priority requests are dispatched first', async () => { + const order = [] + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + await sleep(50) + order.push(req.url) + res.end(req.url) + }).listen(0) + + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.priority({ concurrency: 1 })) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const origin = `http://localhost:${server.address().port}` + + // Send requests with different priorities + // With concurrency 1, the first request dispatches immediately. + // The remaining requests queue by priority (higher = first). + const results = await Promise.all([ + client.request({ origin, method: 'GET', path: '/first', priority: 1 }), + client.request({ origin, method: 'GET', path: '/high', priority: 10 }), + client.request({ origin, method: 'GET', path: '/low', priority: 0 }), + client.request({ origin, method: 'GET', path: '/medium', priority: 5 }) + ]) + + // Read all bodies to ensure completion + for (const res of results) { + await res.body.text() + } + + // The first request dispatched immediately, then high, medium, low + deepStrictEqual(order, ['/first', '/high', '/medium', '/low']) + }) + + test('requests without priority bypass the queue', async () => { + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.end('ok') + }).listen(0) + + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.priority()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const origin = `http://localhost:${server.address().port}` + + // Request without priority should go through directly + const res = await client.request({ + origin, + method: 'GET', + path: '/' + }) + + const body = await res.body.text() + strictEqual(res.statusCode, 200) + strictEqual(body, 'ok') + }) + + test('handles request errors gracefully', async () => { + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.destroy() + }).listen(0) + + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.priority()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const origin = `http://localhost:${server.address().port}` + + await client.request({ + origin, + method: 'GET', + path: '/', + priority: 1 + }).then(() => { + throw new Error('should have thrown') + }).catch((err) => { + strictEqual(err.code, 'UND_ERR_SOCKET') + }) + }) +})