Skip to content

Commit 21c68d3

Browse files
committed
messages handling inside queue
1 parent 0ee8f67 commit 21c68d3

File tree

4 files changed

+114
-59
lines changed

4 files changed

+114
-59
lines changed

package-lock.json

+35
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"license": "MIT",
3333
"dependencies": {
3434
"debug": "^4.4.0",
35+
"p-queue": "^8.1.0",
3536
"pg": "^8.13.3",
3637
"pg-cursor": "^2.12.3",
3738
"robot3": "^1.1.1",

src/index.ts

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export class PgTrxOutbox implements StartStop {
4141
}
4242

4343
async start() {
44+
await this.transfer.start()
4445
await this.adapter.start()
4546
await this.pg.start()
4647
await this.es.start()
@@ -52,6 +53,7 @@ export class PgTrxOutbox implements StartStop {
5253
await this.es.stop()
5354
await this.notifier?.stop()
5455
await this.poller?.stop()
56+
await this.transfer.stop()
5557
await this.pg.stop()
5658
await this.adapter.stop()
5759
}

src/transfer.ts

+76-59
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
import type { PoolClient } from 'pg'
22
import { Pg } from './pg.ts'
3-
import type { Adapter, Options, OutboxMessage } from './types.ts'
3+
import type { Adapter, Options, OutboxMessage, StartStop } from './types.ts'
44
import thr from 'throw'
55
import type { Es } from './es.ts'
66
import { match, P } from 'ts-pattern'
77
import { inspect } from 'node:util'
88
import debug from 'debug'
99
import { appName } from './app-name.ts'
10+
import type PQueue from 'p-queue'
1011

11-
export class Transfer {
12+
export class Transfer implements StartStop {
1213
private logger = debug(`pg-trx-outbox:${appName}`)
14+
private queue!: PQueue
1315

1416
private readonly options: Options
1517
private readonly pg: Pg
@@ -23,69 +25,84 @@ export class Transfer {
2325
this.pg = pg
2426
this.adapter = adapter
2527
this.es = es
28+
29+
import('p-queue').then(({ default: PQueue }) => (this.queue = new PQueue({ concurrency: 1 })))
30+
}
31+
32+
async start() {}
33+
34+
async stop() {
35+
this.queue.clear()
36+
await this.queue.onIdle()
2637
}
2738

2839
async transferMessages() {
29-
let messages: readonly OutboxMessage[] = []
30-
const client = await this.pg.getClient()
31-
try {
32-
await client.query('begin')
33-
messages = await this.fetchPgMessages(client)
34-
if (messages.length) {
35-
const results = await this.adapter.send(messages)
36-
const ids = []
37-
const responses = []
38-
const errors = []
39-
const metas = []
40-
const processed = []
41-
const attempts = []
42-
const sinceAt = []
43-
for (const [i, resp] of results.entries()) {
44-
const message = messages[i] ?? thr(new Error('Message not exists for result'))
45-
ids.push(message.id)
46-
metas.push(resp.meta ?? null)
47-
responses.push(
48-
resp.status === 'fulfilled'
49-
? typeof resp.value === 'string' || Array.isArray(resp.value)
50-
? { r: resp.value }
51-
: resp.value
52-
: null
53-
)
54-
errors.push(resp.status === 'rejected' ? this.normalizeError(resp.reason) : message.error)
55-
const needRetry =
56-
resp.status === 'rejected' &&
57-
this.options.outboxOptions?.retryError?.(resp.reason) &&
58-
message.attempts < (this.options.outboxOptions?.retryMaxAttempts ?? 5)
59-
processed.push(!needRetry)
60-
attempts.push(message.attempts + (needRetry ? 1 : 0))
61-
sinceAt.push(
62-
needRetry ? new Date(Date.now() + (this.options.outboxOptions?.retryDelay ?? 5) * 1000) : message.since_at
63-
)
64-
}
65-
await this.updateToProcessed(client, ids, responses, errors, metas, processed, attempts, sinceAt)
66-
this.es.setLastEventId(messages.at(-1)?.id ?? '0')
67-
}
68-
} catch (e) {
69-
if ((e as { code: string }).code !== '55P03') {
40+
if (this.queue.pending + this.queue.size >= 2) {
41+
return
42+
}
43+
44+
await this.queue.add(async () => {
45+
let messages: readonly OutboxMessage[] = []
46+
const client = await this.pg.getClient()
47+
try {
48+
await client.query('begin')
49+
messages = await this.fetchPgMessages(client)
7050
if (messages.length) {
71-
await this.updateToProcessed(
72-
client,
73-
messages.map(r => r.id),
74-
messages.map(() => null),
75-
messages.map(() => this.normalizeError(e)),
76-
messages.map(() => null),
77-
messages.map(() => true),
78-
messages.map(m => m.attempts),
79-
messages.map(m => m.since_at)
80-
)
51+
const results = await this.adapter.send(messages)
52+
const ids = []
53+
const responses = []
54+
const errors = []
55+
const metas = []
56+
const processed = []
57+
const attempts = []
58+
const sinceAt = []
59+
for (const [i, resp] of results.entries()) {
60+
const message = messages[i] ?? thr(new Error('Message not exists for result'))
61+
ids.push(message.id)
62+
metas.push(resp.meta ?? null)
63+
responses.push(
64+
resp.status === 'fulfilled'
65+
? typeof resp.value === 'string' || Array.isArray(resp.value)
66+
? { r: resp.value }
67+
: resp.value
68+
: null
69+
)
70+
errors.push(resp.status === 'rejected' ? this.normalizeError(resp.reason) : message.error)
71+
const needRetry =
72+
resp.status === 'rejected' &&
73+
this.options.outboxOptions?.retryError?.(resp.reason) &&
74+
message.attempts < (this.options.outboxOptions?.retryMaxAttempts ?? 5)
75+
processed.push(!needRetry)
76+
attempts.push(message.attempts + (needRetry ? 1 : 0))
77+
sinceAt.push(
78+
needRetry ? new Date(Date.now() + (this.options.outboxOptions?.retryDelay ?? 5) * 1000) : message.since_at
79+
)
80+
}
81+
await this.updateToProcessed(client, ids, responses, errors, metas, processed, attempts, sinceAt)
82+
this.es.setLastEventId(messages.at(-1)?.id ?? '0')
8183
}
82-
throw e
84+
} catch (e) {
85+
if ((e as { code: string }).code !== '55P03') {
86+
if (messages.length) {
87+
await this.updateToProcessed(
88+
client,
89+
messages.map(r => r.id),
90+
messages.map(() => null),
91+
messages.map(() => this.normalizeError(e)),
92+
messages.map(() => null),
93+
messages.map(() => true),
94+
messages.map(m => m.attempts),
95+
messages.map(m => m.since_at)
96+
)
97+
}
98+
throw e
99+
}
100+
} finally {
101+
await client.query('commit')
102+
client.release()
83103
}
84-
} finally {
85-
await client.query('commit')
86-
client.release()
87-
}
88-
await this.adapter.onHandled(messages)
104+
await this.adapter.onHandled(messages)
105+
})
89106
}
90107

91108
private normalizeError(error: unknown) {

0 commit comments

Comments
 (0)