-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
71 lines (59 loc) · 2.14 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
const fastifyPlugin = require('fastify-plugin');
const { Kafka } = require('kafkajs');
const defaultOptions = {
clientConfig: {
brokers: ['localhost:9092'],
clientId: 'fastify-kafkajs'
},
consumers: [],
ignoreOnClose: false
};
async function fastifyKafkaJS(fastify, options) {
const actualOptions = Object.assign({}, defaultOptions, options);
const kafka = new Kafka(actualOptions.clientConfig);
const logger = fastify.log.child({ plugin: 'fastify-kafkajs' });
const producer = kafka.producer(actualOptions.producerConfig);
const consumers = [];
fastify.addHook('onClose', async () => {
if (!actualOptions.ignoreOnClose) {
if (consumers.length === 0) {
logger.info('disconnecting producer');
await producer.disconnect();
} else {
logger.info('disconnecting producer and consumers');
const promises = consumers.map((c) => c.disconnect());
promises.push(producer.disconnect());
await Promise.all(promises);
}
}
});
logger.info('connecting producer');
await producer.connect();
if (actualOptions.consumers.length > 0) {
logger.info('creating consumers');
for (const c of actualOptions.consumers) {
const consumer = kafka.consumer(c.consumerConfig);
consumers.push(consumer);
}
await Promise.all(
consumers.map(async (c, idx) => {
logger.debug(`consumer #${idx} connecting`);
await c.connect();
logger.debug(`consumer #${idx} subscribing`);
await c.subscribe(actualOptions.consumers[idx].subscription);
logger.debug(`consumer #${idx} starting`);
await c.run(actualOptions.consumers[idx].runConfig);
logger.debug(`consumer #${idx} started`);
})
);
}
fastify.decorate('kafka', {
client: kafka,
producer,
consumers
});
}
module.exports = fastifyPlugin(fastifyKafkaJS, {
fastify: '>=2.0.0',
name: 'fastify-kafkajs'
});