Skip to content

Commit e37fba4

Browse files
committed
mqtt
1 parent b377e71 commit e37fba4

File tree

2 files changed

+121
-124
lines changed

2 files changed

+121
-124
lines changed

src/eez/mqtt.cpp

+120-123
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ static const char *CLIENT_ID = "BB3_STM32";
6161
static const char *CLIENT_ID = "BB3_Simulator";
6262
#endif
6363

64+
static const uint32_t RECONNECT_AFTER_ERROR_MS = 1000;
65+
6466
static const size_t MAX_PUB_TOPIC_LENGTH = 50;
6567
static const char *PUB_TOPIC_OE = "%s/ch/%d/oe";
6668
static const char *PUB_TOPIC_U_SET = "%s/ch/%d/uset";
@@ -74,6 +76,7 @@ static const char *SUB_TOPIC = "%s/ch/+/+"; // for example: ch/1/set/oe, ch/1/se
7476
static const size_t MAX_PAYLOAD_LENGTH = 100;
7577

7678
ConnectionState g_connectionState = CONNECTION_STATE_IDLE;
79+
uint32_t g_connectionStateChangedTickCount;
7780

7881
static struct {
7982
int oe;
@@ -94,6 +97,8 @@ static struct {
9497
static uint8_t g_lastChannelIndex = 0;
9598
static uint8_t g_lastValueIndex = 0;
9699

100+
void setState(ConnectionState connectionState);
101+
97102
void onIncomingPublish(const char *topic, const char *payload) {
98103
const char *p = topic + 3;
99104

@@ -186,20 +191,26 @@ static char g_topic[MAX_TOPIC_LEN + 1];
186191
static const size_t MAX_PAYLOAD_LEN = 128;
187192
static char g_payload[MAX_PAYLOAD_LEN + 1];
188193
static size_t g_payloadLen;
189-
static mqtt_connection_status_t m_mqttConnectionStatus;
190194

191195
static void dnsFoundCallback(const char* hostname, const ip_addr_t *ipaddr, void *arg) {
192196
if (ipaddr != NULL) {
193197
g_ipaddr = *ipaddr;
194-
g_connectionState = CONNECTION_STATE_DNS_FOUND;
198+
setState(CONNECTION_STATE_DNS_FOUND);
195199
} else {
196-
g_connectionState = CONNECTION_STATE_ERROR;
200+
setState(CONNECTION_STATE_ERROR);
197201
DebugTrace("mqtt dns error: server not found\n");
198202
}
199203
}
200204

201205
static void connectCallback(mqtt_client_t *client, void *arg, mqtt_connection_status_t status) {
202-
m_mqttConnectionStatus = status;
206+
if (g_connectionState == CONNECTION_STATE_CONNECTING) {
207+
if (status == MQTT_CONNECT_ACCEPTED) {
208+
setState(CONNECTION_STATE_CONNECTED);
209+
} else {
210+
setState(CONNECTION_STATE_ERROR);
211+
DebugTrace("mqtt connect error: %d\n", (int)status);
212+
}
213+
}
203214
}
204215

205216
static void requestCallback(void *arg, err_t err) {
@@ -246,6 +257,9 @@ bool publish(char *topic, char *payload, bool retain) {
246257
if (result != ERR_OK) {
247258
if (result != ERR_MEM) {
248259
DebugTrace("mqtt publish error: %d\n", (int)result);
260+
if (result == ERR_CONN) {
261+
reconnect();
262+
}
249263
}
250264
return false;
251265
}
@@ -290,28 +304,31 @@ const char *getSubTopic() {
290304
return g_subTopic;
291305
}
292306

293-
void setConnected(uint32_t tickCount) {
294-
g_connectionState = CONNECTION_STATE_CONNECTED;
295-
307+
void setState(ConnectionState connectionState) {
308+
if (connectionState == CONNECTION_STATE_CONNECTED) {
296309
#if defined(EEZ_PLATFORM_STM32)
297-
mqtt_set_inpub_callback(g_client, incomingPublishCallback, incomingDataCallback, nullptr);
298-
mqtt_subscribe(g_client, getSubTopic(), 0, requestCallback, nullptr);
310+
mqtt_set_inpub_callback(g_client, incomingPublishCallback, incomingDataCallback, nullptr);
311+
mqtt_subscribe(g_client, getSubTopic(), 0, requestCallback, nullptr);
299312
#endif
300313

301314
#if defined(EEZ_PLATFORM_SIMULATOR)
302-
mqtt_subscribe(&g_client, getSubTopic(), 0);
315+
mqtt_subscribe(&g_client, getSubTopic(), 0);
303316
#endif
304317

305-
for(int i = 0; i < CH_NUM; i++) {
306-
g_channelStates[i].oe = -1;
307-
g_channelStates[i].uSet = NAN;
308-
g_channelStates[i].iSet = NAN;
309-
g_channelStates[i].uMon = NAN;
310-
g_channelStates[i].iMon = NAN;
318+
for(int i = 0; i < CH_NUM; i++) {
319+
g_channelStates[i].oe = -1;
320+
g_channelStates[i].uSet = NAN;
321+
g_channelStates[i].iSet = NAN;
322+
g_channelStates[i].uMon = NAN;
323+
g_channelStates[i].iMon = NAN;
324+
}
325+
326+
g_lastChannelIndex = 0;
327+
g_lastValueIndex = 0;
311328
}
312329

313-
g_lastChannelIndex = 0;
314-
g_lastValueIndex = 0;
330+
g_connectionState = connectionState;
331+
g_connectionStateChangedTickCount = millis();
315332
}
316333

317334
void tick(uint32_t tickCount) {
@@ -321,7 +338,7 @@ void tick(uint32_t tickCount) {
321338

322339
else if (g_connectionState == CONNECTION_STATE_IDLE) {
323340
if (persist_conf::devConf.mqttEnabled) {
324-
g_connectionState = CONNECTION_STATE_CONNECT;
341+
setState(CONNECTION_STATE_CONNECT);
325342
}
326343
}
327344

@@ -331,12 +348,12 @@ void tick(uint32_t tickCount) {
331348
ip_addr_t ipaddr;
332349
err_t err = dns_gethostbyname(persist_conf::devConf.mqttHost, &ipaddr, dnsFoundCallback, NULL);
333350
if (err == ERR_OK) {
334-
g_connectionState = CONNECTION_STATE_DNS_FOUND;
351+
setState(CONNECTION_STATE_DNS_FOUND);
335352
g_ipaddr = ipaddr;
336353
} else if (err == ERR_INPROGRESS) {
337-
g_connectionState = CONNECTION_STATE_DNS_IN_PROGRESS;
354+
setState(CONNECTION_STATE_DNS_IN_PROGRESS);
338355
} else {
339-
g_connectionState = CONNECTION_STATE_ERROR;
356+
setState(CONNECTION_STATE_ERROR);
340357
DebugTrace("mqtt dns error: %d\n", (int)err);
341358
}
342359
#endif
@@ -357,13 +374,13 @@ void tick(uint32_t tickCount) {
357374

358375
/* check that we don't have any errors */
359376
if (g_client.error == MQTT_OK) {
360-
setConnected(tickCount);
377+
setState(CONNECTION_STATE_CONNECTED);
361378
} else {
362-
g_connectionState = CONNECTION_STATE_ERROR;
379+
setState(CONNECTION_STATE_ERROR);
363380
DebugTrace("mqtt error: %s\n", mqtt_error_str(g_client.error));
364381
}
365382
} else {
366-
g_connectionState = CONNECTION_STATE_ERROR;
383+
setState(CONNECTION_STATE_ERROR);
367384
DebugTrace("mqtt error: failed to open socket\n");
368385
}
369386
#endif
@@ -380,15 +397,19 @@ void tick(uint32_t tickCount) {
380397
mqtt_disconnect(&g_client);
381398
#endif
382399

383-
g_connectionState = CONNECTION_STATE_IDLE;
400+
setState(CONNECTION_STATE_IDLE);
384401

385402
if (g_connectionState == CONNECTION_STATE_RECONNECT) {
386-
g_connectionState = CONNECTION_STATE_CONNECT;
403+
setState(CONNECTION_STATE_CONNECT);
387404
}
388405
}
389406

390407
else if (g_connectionState == CONNECTION_STATE_ERROR) {
391-
// pass
408+
if (persist_conf::devConf.mqttEnabled) {
409+
if (millis() - g_connectionStateChangedTickCount > RECONNECT_AFTER_ERROR_MS) {
410+
setState(CONNECTION_STATE_CONNECT);
411+
}
412+
}
392413
}
393414

394415
#if defined(EEZ_PLATFORM_STM32)
@@ -404,125 +425,101 @@ void tick(uint32_t tickCount) {
404425
clientInfo.keep_alive = 60; // seconds
405426
clientInfo.will_topic = nullptr; // not used
406427

407-
m_mqttConnectionStatus = MQTT_CONNECT_ACCEPTED;
408-
409428
err_t result = mqtt_client_connect(g_client, &g_ipaddr, persist_conf::devConf.mqttPort, connectCallback, nullptr, &clientInfo);
410429
if (result == ERR_OK) {
411-
g_connectionState = CONNECTION_STATE_CONNECTING;
430+
setState(CONNECTION_STATE_CONNECTING);
412431
} else {
413-
g_connectionState = CONNECTION_STATE_ERROR;
432+
setState(CONNECTION_STATE_ERROR);
414433
DebugTrace("mqtt connect error: %d\n", (int)result);
415434
}
416435
} else {
417-
g_connectionState = CONNECTION_STATE_ERROR;
436+
setState(CONNECTION_STATE_ERROR);
418437
DebugTrace("mqtt error: failed to create a client\n");
419438
}
420439
return;
421440
}
422441
#endif
423442

424-
else if (g_connectionState == CONNECTION_STATE_CONNECTING) {
425-
#if defined(EEZ_PLATFORM_STM32)
426-
if (mqtt_client_is_connected(g_client)) {
427-
setConnected(tickCount);
428-
} else {
429-
if (m_mqttConnectionStatus != MQTT_CONNECT_ACCEPTED) {
430-
g_connectionState = CONNECTION_STATE_ERROR;
431-
DebugTrace("mqtt connect error: %d\n", (int)m_mqttConnectionStatus);
432-
}
433-
}
434-
#endif
435-
}
436-
437443
else if (g_connectionState == CONNECTION_STATE_CONNECTED) {
438-
#if defined(EEZ_PLATFORM_STM32)
439-
if (mqtt_client_is_connected(g_client)) {
440-
#endif
441-
uint8_t lastChannelIndexAtStart = g_lastChannelIndex;
442-
uint8_t lastValueIndexAtStart = g_lastValueIndex;
443-
444-
do {
445-
uint8_t channelIndex = g_lastChannelIndex;
446-
Channel &channel = Channel::get(channelIndex);
447-
448-
uint32_t period = (uint32_t)roundf(persist_conf::devConf.mqttPeriod * 1000000);
449-
450-
if (g_lastValueIndex == 0) {
451-
int oe = channel.isOutputEnabled() ? 1 : 0;
452-
if (oe != g_channelStates[channelIndex].oe) {
453-
if (!publish(channelIndex, PUB_TOPIC_OE, oe)) {
454-
break;
455-
}
456-
g_channelStates[channelIndex].oe = oe;
457-
}
458-
} else if (g_lastValueIndex == 1) {
459-
float uSet = channel_dispatcher::getUSet(channel);
460-
if ((isNaN(g_channelStates[channelIndex].uSet) || uSet != g_channelStates[channelIndex].uSet) && (tickCount - g_channelStates[channelIndex].g_uSetTick) >= period) {
461-
if (!publish(channelIndex, PUB_TOPIC_U_SET, uSet)) {
462-
break;
463-
}
464-
g_channelStates[channelIndex].uSet = uSet;
465-
g_channelStates[channelIndex].g_uSetTick = tickCount;
466-
}
467-
} else if (g_lastValueIndex == 2) {
468-
float iSet = channel_dispatcher::getISet(channel);
469-
if ((isNaN(g_channelStates[channelIndex].iSet) || iSet != g_channelStates[channelIndex].iSet) && (tickCount - g_channelStates[channelIndex].g_iSetTick) >= period) {
470-
if (!publish(channelIndex, PUB_TOPIC_I_SET, iSet)) {
471-
break;
472-
}
473-
g_channelStates[channelIndex].iSet = iSet;
474-
g_channelStates[channelIndex].g_iSetTick = tickCount;
475-
}
476-
} else if (g_lastValueIndex == 3) {
477-
float uMon = channel_dispatcher::getUMonLast(channel);
478-
if ((isNaN(g_channelStates[channelIndex].uMon) || uMon != g_channelStates[channelIndex].uMon) && (tickCount - g_channelStates[channelIndex].g_uMonTick) >= period) {
479-
if (!publish(channelIndex, PUB_TOPIC_U_MON, uMon)) {
480-
break;
481-
}
482-
g_channelStates[channelIndex].uMon = uMon;
483-
g_channelStates[channelIndex].g_uMonTick = tickCount;
484-
}
485-
} else {
486-
float iMon = channel_dispatcher::getIMonLast(channel);
487-
if ((isNaN(g_channelStates[channelIndex].iMon) || iMon != g_channelStates[channelIndex].iMon) && (tickCount - g_channelStates[channelIndex].g_iMonTick) >= period) {
488-
if (!publish(channelIndex, PUB_TOPIC_I_MON, iMon)) {
489-
break;
490-
}
491-
g_channelStates[channelIndex].iMon = iMon;
492-
g_channelStates[channelIndex].g_iMonTick = tickCount;
493-
}
494-
}
495-
496-
if (++g_lastValueIndex == 5) {
497-
g_lastValueIndex = 0;
498-
if (++g_lastChannelIndex == CH_NUM) {
499-
g_lastChannelIndex = 0;
500-
}
501-
}
502-
} while (g_lastChannelIndex != lastChannelIndexAtStart || g_lastValueIndex != lastValueIndexAtStart);
444+
uint8_t lastChannelIndexAtStart = g_lastChannelIndex;
445+
uint8_t lastValueIndexAtStart = g_lastValueIndex;
446+
447+
do {
448+
uint8_t channelIndex = g_lastChannelIndex;
449+
Channel &channel = Channel::get(channelIndex);
450+
451+
uint32_t period = (uint32_t)roundf(persist_conf::devConf.mqttPeriod * 1000000);
452+
453+
if (g_lastValueIndex == 0) {
454+
int oe = channel.isOutputEnabled() ? 1 : 0;
455+
if (oe != g_channelStates[channelIndex].oe) {
456+
if (!publish(channelIndex, PUB_TOPIC_OE, oe)) {
457+
break;
458+
}
459+
g_channelStates[channelIndex].oe = oe;
460+
}
461+
} else if (g_lastValueIndex == 1) {
462+
float uSet = channel_dispatcher::getUSet(channel);
463+
if ((isNaN(g_channelStates[channelIndex].uSet) || uSet != g_channelStates[channelIndex].uSet) && (tickCount - g_channelStates[channelIndex].g_uSetTick) >= period) {
464+
if (!publish(channelIndex, PUB_TOPIC_U_SET, uSet)) {
465+
break;
466+
}
467+
g_channelStates[channelIndex].uSet = uSet;
468+
g_channelStates[channelIndex].g_uSetTick = tickCount;
469+
}
470+
} else if (g_lastValueIndex == 2) {
471+
float iSet = channel_dispatcher::getISet(channel);
472+
if ((isNaN(g_channelStates[channelIndex].iSet) || iSet != g_channelStates[channelIndex].iSet) && (tickCount - g_channelStates[channelIndex].g_iSetTick) >= period) {
473+
if (!publish(channelIndex, PUB_TOPIC_I_SET, iSet)) {
474+
break;
475+
}
476+
g_channelStates[channelIndex].iSet = iSet;
477+
g_channelStates[channelIndex].g_iSetTick = tickCount;
478+
}
479+
} else if (g_lastValueIndex == 3) {
480+
float uMon = channel_dispatcher::getUMonLast(channel);
481+
if ((isNaN(g_channelStates[channelIndex].uMon) || uMon != g_channelStates[channelIndex].uMon) && (tickCount - g_channelStates[channelIndex].g_uMonTick) >= period) {
482+
if (!publish(channelIndex, PUB_TOPIC_U_MON, uMon)) {
483+
break;
484+
}
485+
g_channelStates[channelIndex].uMon = uMon;
486+
g_channelStates[channelIndex].g_uMonTick = tickCount;
487+
}
488+
} else {
489+
float iMon = channel_dispatcher::getIMonLast(channel);
490+
if ((isNaN(g_channelStates[channelIndex].iMon) || iMon != g_channelStates[channelIndex].iMon) && (tickCount - g_channelStates[channelIndex].g_iMonTick) >= period) {
491+
if (!publish(channelIndex, PUB_TOPIC_I_MON, iMon)) {
492+
break;
493+
}
494+
g_channelStates[channelIndex].iMon = iMon;
495+
g_channelStates[channelIndex].g_iMonTick = tickCount;
496+
}
497+
}
498+
499+
if (++g_lastValueIndex == 5) {
500+
g_lastValueIndex = 0;
501+
if (++g_lastChannelIndex == CH_NUM) {
502+
g_lastChannelIndex = 0;
503+
}
504+
}
505+
} while (g_lastChannelIndex != lastChannelIndexAtStart || g_lastValueIndex != lastValueIndexAtStart);
503506

504507
#if defined(EEZ_PLATFORM_SIMULATOR)
505-
mqtt_sync(&g_client);
506-
#endif
507-
508-
#if defined(EEZ_PLATFORM_STM32)
509-
} else {
510-
g_connectionState = CONNECTION_STATE_RECONNECT;
511-
}
508+
mqtt_sync(&g_client);
512509
#endif
513510
}
514511
}
515512

516513
void reconnect() {
517514
if (persist_conf::devConf.mqttEnabled) {
518-
if (g_connectionState == CONNECTION_STATE_IDLE || g_connectionState != CONNECTION_STATE_ERROR) {
519-
g_connectionState = CONNECTION_STATE_CONNECT;
515+
if (g_connectionState == CONNECTION_STATE_IDLE || g_connectionState == CONNECTION_STATE_ERROR) {
516+
setState(CONNECTION_STATE_CONNECT);
520517
} else {
521-
g_connectionState = CONNECTION_STATE_RECONNECT;
518+
setState(CONNECTION_STATE_RECONNECT);
522519
}
523520
} else {
524-
if (g_connectionState == CONNECTION_STATE_IDLE || g_connectionState != CONNECTION_STATE_ERROR) {
525-
g_connectionState = CONNECTION_STATE_DISCONNECT;
521+
if (g_connectionState != CONNECTION_STATE_IDLE && g_connectionState != CONNECTION_STATE_ERROR) {
522+
setState(CONNECTION_STATE_DISCONNECT);
526523
}
527524
}
528525
}

src/eez/mqtt.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,4 @@ void tick(uint32_t tickCount);
4545
void reconnect();
4646

4747
} // mqtt
48-
} // eez
48+
} // eez

0 commit comments

Comments
 (0)