Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ module.exports.interceptors = {
dns: require('./lib/interceptor/dns'),
cache: require('./lib/interceptor/cache'),
decompress: require('./lib/interceptor/decompress'),
deduplicate: require('./lib/interceptor/deduplicate')
deduplicate: require('./lib/interceptor/deduplicate'),
priority: require('./lib/interceptor/priority')
}

module.exports.cacheStores = {
Expand Down
88 changes: 88 additions & 0 deletions lib/interceptor/priority.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
'use strict'

const DecoratorHandler = require('../handler/decorator-handler')

class PriorityQueue {
#queue = []
#concurrency
#running = 0

constructor (concurrency = 1) {
this.#concurrency = concurrency
}

acquire (callback, priority = 0) {
this.#queue.push({ callback, priority })
this.#queue.sort((a, b) => b.priority - a.priority)
this.#dispatch()
}

release () {
this.#running--
this.#dispatch()
}

#dispatch () {
while (this.#running < this.#concurrency && this.#queue.length > 0) {
const entry = this.#queue.shift()
this.#running++
entry.callback()
}
}
}

class PriorityHandler extends DecoratorHandler {
#priorityQueue

constructor (handler, priorityQueue) {
super(handler)
this.#priorityQueue = priorityQueue
}

onResponseEnd (controller, trailers) {
this.#release()
return super.onResponseEnd(controller, trailers)
}

onResponseError (controller, err) {
this.#release()
return super.onResponseError(controller, err)
}

#release () {
if (this.#priorityQueue) {
const priorityQueue = this.#priorityQueue
this.#priorityQueue = null
priorityQueue.release()
}
}
}

function createPriorityInterceptor ({ concurrency } = { concurrency: 1 }) {
return (dispatch) => {
const queues = new Map()

return function priorityInterceptor (opts, handler) {
if (opts.priority == null || !opts.origin) {
return dispatch(opts, handler)
}

let queue = queues.get(opts.origin)
if (!queue) {
queue = new PriorityQueue(concurrency)
queues.set(opts.origin, queue)
}

queue.acquire(() => {
const priorityHandler = new PriorityHandler(handler, queue)
try {
dispatch(opts, priorityHandler)
} catch (err) {
priorityHandler.onResponseError(null, err)
}
}, opts.priority)
}
}
}

module.exports = createPriorityInterceptor
165 changes: 165 additions & 0 deletions test/interceptors/priority.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
'use strict'

const { createServer } = require('node:http')
const { describe, test, after } = require('node:test')
const { once } = require('node:events')
const { strictEqual, deepStrictEqual } = require('node:assert')
const { setTimeout: sleep } = require('node:timers/promises')
const { Client, interceptors } = require('../../index')

describe('Priority Interceptor', () => {
test('dispatches requests without priority normally', async () => {
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.end('ok')
}).listen(0)

await once(server, 'listening')

const client = new Client(`http://localhost:${server.address().port}`)
.compose(interceptors.priority())

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const res = await client.request({
origin: `http://localhost:${server.address().port}`,
method: 'GET',
path: '/'
})

const body = await res.body.text()
strictEqual(res.statusCode, 200)
strictEqual(body, 'ok')
})

test('dispatches requests with priority', async () => {
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.end('ok')
}).listen(0)

await once(server, 'listening')

const client = new Client(`http://localhost:${server.address().port}`)
.compose(interceptors.priority())

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const res = await client.request({
origin: `http://localhost:${server.address().port}`,
method: 'GET',
path: '/',
priority: 1
})

const body = await res.body.text()
strictEqual(res.statusCode, 200)
strictEqual(body, 'ok')
})

test('higher priority requests are dispatched first', async () => {
const order = []
const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => {
await sleep(50)
order.push(req.url)
res.end(req.url)
}).listen(0)

await once(server, 'listening')

const client = new Client(`http://localhost:${server.address().port}`)
.compose(interceptors.priority({ concurrency: 1 }))

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const origin = `http://localhost:${server.address().port}`

// Send requests with different priorities
// With concurrency 1, the first request dispatches immediately.
// The remaining requests queue by priority (higher = first).
const results = await Promise.all([
client.request({ origin, method: 'GET', path: '/first', priority: 1 }),
client.request({ origin, method: 'GET', path: '/high', priority: 10 }),
client.request({ origin, method: 'GET', path: '/low', priority: 0 }),
client.request({ origin, method: 'GET', path: '/medium', priority: 5 })
])

// Read all bodies to ensure completion
for (const res of results) {
await res.body.text()
}

// The first request dispatched immediately, then high, medium, low
deepStrictEqual(order, ['/first', '/high', '/medium', '/low'])
})

test('requests without priority bypass the queue', async () => {
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.end('ok')
}).listen(0)

await once(server, 'listening')

const client = new Client(`http://localhost:${server.address().port}`)
.compose(interceptors.priority())

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const origin = `http://localhost:${server.address().port}`

// Request without priority should go through directly
const res = await client.request({
origin,
method: 'GET',
path: '/'
})

const body = await res.body.text()
strictEqual(res.statusCode, 200)
strictEqual(body, 'ok')
})

test('handles request errors gracefully', async () => {
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.destroy()
}).listen(0)

await once(server, 'listening')

const client = new Client(`http://localhost:${server.address().port}`)
.compose(interceptors.priority())

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const origin = `http://localhost:${server.address().port}`

await client.request({
origin,
method: 'GET',
path: '/',
priority: 1
}).then(() => {
throw new Error('should have thrown')
}).catch((err) => {
strictEqual(err.code, 'UND_ERR_SOCKET')
})
})
})
Loading