Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use built-in async for mqtt #24786

Merged
merged 14 commits into from
Nov 19, 2024
13 changes: 7 additions & 6 deletions lib/controller.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type {IClientPublishOptions} from 'mqtt';
import type * as SdNotify from 'sd-notify';

import assert from 'assert';
Expand Down Expand Up @@ -265,14 +266,14 @@ export class Controller {
message = newState;
}

const options: MQTTOptions = {
retain: utils.getObjectProperty(entity.options, 'retain', false) as boolean,
qos: utils.getObjectProperty(entity.options, 'qos', 0) as 0 | 1 | 2,
const options: IClientPublishOptions = {
retain: utils.getObjectProperty(entity.options, 'retain', false),
qos: utils.getObjectProperty(entity.options, 'qos', 0),
};
const retention = utils.getObjectProperty<number | false>(entity.options, 'retention', false);

const retention = utils.getObjectProperty(entity.options, 'retention', false);
if (retention !== false) {
options.properties = {messageExpiryInterval: retention as number};
options.properties = {messageExpiryInterval: retention};
}

if (entity.isDevice() && settings.get().mqtt.include_device_information) {
Expand Down Expand Up @@ -328,7 +329,7 @@ export class Controller {
this.eventBus.emitPublishEntityState({entity, message, stateChangeReason, payload});
}

async iteratePayloadAttributeOutput(topicRoot: string, payload: KeyValue, options: MQTTOptions): Promise<void> {
async iteratePayloadAttributeOutput(topicRoot: string, payload: KeyValue, options: IClientPublishOptions): Promise<void> {
for (const [key, value] of Object.entries(payload)) {
let subPayload = value;
let message = null;
Expand Down
8 changes: 4 additions & 4 deletions lib/extension/homeassistant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ export default class HomeAssistant extends Extension {
this.eventBus.onEntityOptionsChanged(this, async (data) => await this.discover(data.entity));
this.eventBus.onExposesChanged(this, async (data) => await this.discover(data.device));

this.mqtt.subscribe(this.statusTopic);
this.mqtt.subscribe(DEFAULT_STATUS_TOPIC);
await this.mqtt.subscribe(this.statusTopic);
await this.mqtt.subscribe(DEFAULT_STATUS_TOPIC);

/**
* Prevent unnecessary re-discovery of entities by waiting 5 seconds for retained discovery messages to come in.
Expand All @@ -457,9 +457,9 @@ export default class HomeAssistant extends Extension {
}

logger.debug(`Discovering entities to Home Assistant in ${discoverWait}s`);
this.mqtt.subscribe(`${this.discoveryTopic}/#`);
await this.mqtt.subscribe(`${this.discoveryTopic}/#`);
setTimeout(async () => {
this.mqtt.unsubscribe(`${this.discoveryTopic}/#`);
await this.mqtt.unsubscribe(`${this.discoveryTopic}/#`);
logger.debug(`Discovering entities to Home Assistant`);

await this.discover(this.bridge);
Expand Down
119 changes: 62 additions & 57 deletions lib/mqtt.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import type {IClientOptions, IClientPublishOptions, MqttClient} from 'mqtt';

import fs from 'fs';

import bind from 'bind-decorator';
import * as mqtt from 'mqtt';
import {connectAsync} from 'mqtt';

import logger from './util/logger';
import * as settings from './util/settings';
Expand All @@ -12,13 +14,11 @@ const NS = 'z2m:mqtt';
export default class MQTT {
private publishedTopics: Set<string> = new Set();
private connectionTimer?: NodeJS.Timeout;
// @ts-expect-error initialized in `connect`
private client: mqtt.MqttClient;
private client!: MqttClient;
private eventBus: EventBus;
private initialConnect = true;
private republishRetainedTimer?: NodeJS.Timeout;
public retainedMessages: {
[s: string]: {payload: string; options: MQTTOptions; skipLog: boolean; skipReceive: boolean; topic: string; base: string};
[s: string]: {payload: string; options: IClientPublishOptions; skipLog: boolean; skipReceive: boolean; topic: string; base: string};
} = {};

constructor(eventBus: EventBus) {
Expand All @@ -30,13 +30,14 @@ export default class MQTT {

logger.info(`Connecting to MQTT server at ${mqttSettings.server}`);

const options: mqtt.IClientOptions = {
const options: IClientOptions = {
will: {
topic: `${settings.get().mqtt.base_topic}/bridge/state`,
payload: Buffer.from(JSON.stringify({state: 'offline'})),
retain: settings.get().mqtt.force_disable_retain ? false : true,
qos: 1,
},
properties: {maximumPacketSize: mqttSettings.maximum_packet_size},
};

if (mqttSettings.version) {
Expand Down Expand Up @@ -78,49 +79,41 @@ export default class MQTT {
options.rejectUnauthorized = false;
}

return await new Promise((resolve, reject) => {
this.client = mqtt.connect(mqttSettings.server, options);
// https://github.com/Koenkk/zigbee2mqtt/issues/9822
this.client.stream.setMaxListeners(0);

this.client.on('connect', async () => {
// Set timer at interval to check if connected to MQTT server.
clearTimeout(this.connectionTimer);
this.connectionTimer = setInterval(() => {
if (this.client.reconnecting) {
logger.error('Not connected to MQTT server!');
}
}, utils.seconds(10));

logger.info('Connected to MQTT server');
await this.publishStateOnline();

if (!this.initialConnect) {
this.republishRetainedTimer = setTimeout(async () => {
// Republish retained messages in case MQTT broker does not persist them.
// https://github.com/Koenkk/zigbee2mqtt/issues/9629
for (const msg of Object.values(this.retainedMessages)) {
await this.publish(msg.topic, msg.payload, msg.options, msg.base, msg.skipLog, msg.skipReceive);
}
}, 2000);
}

this.initialConnect = false;
this.subscribe(`${settings.get().mqtt.base_topic}/#`);
resolve();
});
this.client = await connectAsync(mqttSettings.server, options);

this.client.on('error', (err) => {
logger.error(`MQTT error: ${err.message}`);
reject(err);
});
// https://github.com/Koenkk/zigbee2mqtt/issues/9822
this.client.stream.setMaxListeners(0);

this.client.on('message', this.onMessage);
this.client.on('error', (err) => {
logger.error(`MQTT error: ${err.message}`);
});
}

@bind async publishStateOnline(): Promise<void> {
await this.publish('bridge/state', JSON.stringify({state: 'online'}), {retain: true, qos: 0});
if (mqttSettings.version != undefined && mqttSettings.version >= 5) {
this.client.on('disconnect', (packet) => {
logger.error(`MQTT disconnect: reason ${packet.reasonCode} (${packet.properties?.reasonString})`);
});
}

this.client.on('message', this.onMessage);

await this.onConnect();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method logs logger.info('Connected to MQTT server'); while it may not be the case yet, I'm also not sure about the subscribe before it gets connected?


this.client.on('connect', this.onConnect);

this.republishRetainedTimer = setTimeout(async () => {
// Republish retained messages in case MQTT broker does not persist them.
// https://github.com/Koenkk/zigbee2mqtt/issues/9629
for (const msg of Object.values(this.retainedMessages)) {
await this.publish(msg.topic, msg.payload, msg.options, msg.base, msg.skipLog, msg.skipReceive);
}
}, 2000);

// Set timer at interval to check if connected to MQTT server.
this.connectionTimer = setInterval(() => {
if (!this.isConnected()) {
logger.error('Not connected to MQTT server!');
}
}, utils.seconds(10));
}

async disconnect(): Promise<void> {
Expand All @@ -129,15 +122,21 @@ export default class MQTT {
await this.publish('bridge/state', JSON.stringify({state: 'offline'}), {retain: true, qos: 0});
this.eventBus.removeListeners(this);
logger.info('Disconnecting from MQTT server');
this.client?.end();
await this.client?.endAsync();
}

subscribe(topic: string): void {
this.client.subscribe(topic);
async subscribe(topic: string): Promise<void> {
await this.client.subscribeAsync(topic);
}

unsubscribe(topic: string): void {
this.client.unsubscribe(topic);
async unsubscribe(topic: string): Promise<void> {
await this.client.unsubscribeAsync(topic);
}

@bind private async onConnect(): Promise<void> {
logger.info('Connected to MQTT server');
await this.publish('bridge/state', JSON.stringify({state: 'online'}), {retain: true, qos: 0});
await this.subscribe(`${settings.get().mqtt.base_topic}/#`);
}

@bind public onMessage(topic: string, message: Buffer): void {
Expand All @@ -155,18 +154,18 @@ export default class MQTT {
}

isConnected(): boolean {
return this.client && !this.client.reconnecting;
return this.client && !this.client.reconnecting && !this.client.disconnecting && !this.client.disconnected;
}

async publish(
topic: string,
payload: string,
options: MQTTOptions = {},
options: IClientPublishOptions = {},
base = settings.get().mqtt.base_topic,
skipLog = false,
skipReceive = true,
): Promise<void> {
const defaultOptions: {qos: QoS; retain: boolean} = {qos: 0, retain: false};
const defaultOptions = {qos: 0 as const, retain: false};
topic = `${base}/${topic}`;

if (skipReceive) {
Expand Down Expand Up @@ -197,14 +196,20 @@ export default class MQTT {
logger.info(() => `MQTT publish: topic '${topic}', payload '${payload}'`, NS);
}

const actualOptions: mqtt.IClientPublishOptions = {...defaultOptions, ...options};
const actualOptions: IClientPublishOptions = {...defaultOptions, ...options};

if (settings.get().mqtt.force_disable_retain) {
actualOptions.retain = false;
}

return await new Promise<void>((resolve) => {
this.client.publish(topic, payload, actualOptions, () => resolve());
});
try {
await this.client.publishAsync(topic, payload, actualOptions);
} catch (error) {
/* istanbul ignore else */
if (!skipLog) {
logger.error(`MQTT server error: ${(error as Error).message}`);
logger.error(`Could not send message: topic: '${topic}', payload: '${payload}`);
}
}
}
}
7 changes: 1 addition & 6 deletions lib/types/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ declare global {
type Device = TypeDevice;
type State = TypeState;
type Extension = TypeExtension;
type QoS = 0 | 1 | 2;

// Types
type ExternalDefinition = zhc.Definition & {homeassistant: unknown};
Expand All @@ -40,11 +39,6 @@ declare global {
error?: string;
transaction?: string;
}
interface MQTTOptions {
qos?: QoS;
retain?: boolean;
properties?: {messageExpiryInterval: number};
}
type Scene = {id: number; name: string};
type StateChangeReason = 'publishDebounce' | 'groupOptimistic' | 'lastSeenChanged' | 'publishCached' | 'publishThrottle';
type PublishEntityState = (entity: Device | Group, payload: KeyValue, stateChangeReason?: StateChangeReason) => Promise<void>;
Expand Down Expand Up @@ -137,6 +131,7 @@ declare global {
cert?: string;
client_id?: string;
reject_unauthorized?: boolean;
maximum_packet_size: number;
};
serial: {
disable_led: boolean;
Expand Down
11 changes: 10 additions & 1 deletion lib/util/settings.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,17 @@
"type": "boolean",
"title": "Force disable retain",
"requiresRestart": true,
"description": "Disable retain for all send messages. ONLY enable if you MQTT broker doesn't support retained message (e.g. AWS IoT core, Azure IoT Hub, Google Cloud IoT core, IBM Watson IoT Platform). Enabling will break the Home Assistant integration",
"description": "Disable retain for all send messages. ONLY enable if your MQTT broker doesn't support retained message (e.g. AWS IoT core, Azure IoT Hub, Google Cloud IoT core, IBM Watson IoT Platform). Enabling will break the Home Assistant integration",
"default": false
},
"maximum_packet_size": {
"type": "number",
"title": "Maximum packet size",
"requiresRestart": true,
"description": "Specifies the maximum allowed packet length (in bytes) that the server can send to Zigbee2MQTT. NOTE: The same value exists in your MQTT broker but for the length the client can send to it instead.",
"default": 1048576,
"minimum": 20,
"maximum": 268435456
}
},
"required": ["server"]
Expand Down
2 changes: 2 additions & 0 deletions lib/util/settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const defaults: RecursivePartial<Settings> = {
base_topic: 'zigbee2mqtt',
include_device_information: false,
force_disable_retain: false,
// 1MB = roughly 3.5KB per device * 300 devices for `/bridge/devices`
maximum_packet_size: 1048576,
},
serial: {
disable_led: false,
Expand Down
2 changes: 1 addition & 1 deletion lib/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ function equalsPartial(object: KeyValue, expected: KeyValue): boolean {
return true;
}

function getObjectProperty(object: KeyValue, key: string, defaultValue: unknown): unknown {
function getObjectProperty<T>(object: KeyValue, key: string, defaultValue: NoInfer<T>): T {
return object && object[key] !== undefined ? object[key] : defaultValue;
}

Expand Down
Loading