Skip to content

Br 20211004 1031 #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
10 changes: 9 additions & 1 deletion iotconnect-sdk/include/iotconnect.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ typedef enum {
// TODO: Sync statuses etc.
} IotconnectConnectionStatus;

typedef enum {
MSG_SEND_SUCCESS, //Received PUBACK for the respective message.
MSG_SEND_TIMEOUT, //Message drop from queue due to maximum retry with PUBACK timeout.
MSG_SEND_FAILED, //Message drop from queue due to disconnection.
} IotconnectMsgSendStatus;

typedef void (*IotConnectMsgSendStatusCallback)(uint32_t msg_id, IotconnectMsgSendStatus status);

typedef void (*IotConnectStatusCallback)(IotconnectConnectionStatus data);

Expand All @@ -37,6 +44,7 @@ typedef struct {
IotclCommandCallback cmd_cb; // callback for command events.
IotclMessageCallback msg_cb; // callback for ALL messages, including the specific ones like cmd or ota callback.
IotConnectStatusCallback status_cb; // callback for connection status
IotConnectMsgSendStatusCallback msg_send_status_cb; // callback for MQTT message send status.
} IotconnectClientConfig;


Expand All @@ -48,7 +56,7 @@ bool iotconnect_sdk_is_connected();

IotclConfig *iotconnect_sdk_get_lib_config();

void iotconnect_sdk_send_packet(const char *data);
void iotconnect_sdk_send_packet(const char *data, uint32_t *p_msg_id);

void iotconnect_sdk_loop();

Expand Down
4 changes: 3 additions & 1 deletion iotconnect-sdk/nrf-layer-lib/include/iotconnect_mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ typedef struct {
const char *env; // Environment name. Contact your representative for details. Same as telemetry config.
IotconnectMqttOnDataCallback data_cb; // callback for OTA events.
IotConnectStatusCallback status_cb; // callback for command events.
IotConnectMsgSendStatusCallback msg_send_status_cb; // callback for message send status.
} IotconnectMqttConfig;

bool iotc_nrf_mqtt_init(IotconnectMqttConfig *config, IotclSyncResponse *sr);
Expand All @@ -31,7 +32,8 @@ void iotc_nrf_mqtt_loop();

bool iotc_nrf_mqtt_is_connected();

int iotc_nrf_mqtt_publish(struct mqtt_client *c, const char *topic, enum mqtt_qos qos, const uint8_t *data, size_t len);
int iotc_nrf_mqtt_publish(struct mqtt_client *c, const char *topic, enum mqtt_qos qos, const uint8_t *data,
size_t len, uint32_t *p_msg_id);

void iotc_nrf_mqtt_abort();

Expand Down
251 changes: 185 additions & 66 deletions iotconnect-sdk/nrf-layer-lib/src/iotconnect_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,21 @@
#include "nrf_cert_store.h"

#define MQTT_PUBACK_TIMEOUT_S 5 //5 secs

SYS_MUTEX_DEFINE(mutex_mqtt_pub);
static unsigned int pending_ack_msg_id = 0;
static bool msg_ack_pending = false;
#define MAXIMUM_MSG_QUEUE 4
#define MAXIMUM_MSG_RESEND 1
typedef struct {
bool in_used;
uint32_t msg_id;
const char *p_pub_topic;
uint8_t *p_msg;
uint32_t msg_len;
uint32_t start_tick;
uint8_t resend_count;
} msg_q_object_t;

static msg_q_object_t msg_queue[MAXIMUM_MSG_QUEUE];
SYS_MUTEX_DEFINE(mutex_msg_q);
static bool msg_queue_initialized = false;

extern struct mqtt_client client;
static IotconnectMqttConfig *config;
Expand Down Expand Up @@ -52,6 +63,125 @@ static int fds_init(struct mqtt_client *c);

static void mqtt_evt_handler(struct mqtt_client *const c, const struct mqtt_evt *evt);

static int nrf_mqtt_publish(struct mqtt_client *c, const char *topic, enum mqtt_qos qos,
const uint8_t *data, size_t len, uint32_t *p_msg_id);

static void msg_queue_clear(bool invoke_cb) {
if (msg_queue_initialized) {
//mutex lock
sys_mutex_lock(&mutex_msg_q, K_FOREVER);
for (int i=0; i<ARRAY_SIZE(msg_queue); i++) {
if (msg_queue[i].in_used) {
uint32_t msg_id = msg_queue[i].msg_id;
free(msg_queue[i].p_msg);
msg_queue[i].in_used = false;
if (invoke_cb && config->msg_send_status_cb) {
config->msg_send_status_cb(msg_id, MSG_SEND_FAILED);
}
}
}
//printk("msg queue cleared\n");
//mutex unlock
sys_mutex_unlock(&mutex_msg_q);
}
}

static void msg_queue_init(void) {
if (!msg_queue_initialized) {
for (int i=0; i<ARRAY_SIZE(msg_queue); i++) {
msg_queue[i].in_used = false;
}
msg_queue_initialized = true;
//printk("msg queue initialized\n");
}
else {
msg_queue_clear(false);
}
}

static bool msg_queue_add(uint32_t msg_id, const char *p_pub_topic,
uint8_t *p_msg, uint32_t msg_len) {
bool msg_added = false;
if (msg_queue_initialized) {
//mutex lock
sys_mutex_lock(&mutex_msg_q, K_FOREVER);
for (int i=0; i<ARRAY_SIZE(msg_queue); i++) {
if (!msg_queue[i].in_used) {
msg_queue[i].p_msg = (uint8_t *)malloc(msg_len);
if (msg_queue[i].p_msg) {
memcpy(msg_queue[i].p_msg, p_msg, msg_len);
msg_queue[i].msg_len = msg_len;
msg_queue[i].msg_id = msg_id;
msg_queue[i].p_pub_topic = p_pub_topic;
msg_queue[i].start_tick = k_uptime_get_32();
msg_queue[i].resend_count = MAXIMUM_MSG_RESEND;
msg_queue[i].in_used = true;
msg_added = true;
//printk("message added to queue. (msg id: %d)\n", msg_id);
}
else {
printk("ERR: Unable to add message to queue!\n");
}
break;
}
}
//mutex unlock
sys_mutex_unlock(&mutex_msg_q);
}
return msg_added;
}

static void msg_queue_delete(uint32_t msg_id) {
if (msg_queue_initialized) {
//mutex lock
sys_mutex_lock(&mutex_msg_q, K_FOREVER);
for (int i=0; i<ARRAY_SIZE(msg_queue); i++) {
if (msg_queue[i].in_used && (msg_queue[i].msg_id == msg_id)) {
free(msg_queue[i].p_msg);
msg_queue[i].in_used = false;
//printk("message deleted from queue. (msg id: %d)\n", msg_id);
break;
}
}
//mutex unlock
sys_mutex_unlock(&mutex_msg_q);
}
}

static void msg_queue_proc(void) {
if (msg_queue_initialized) {
//mutex lock
sys_mutex_lock(&mutex_msg_q, K_FOREVER);
for (int i=0; i<ARRAY_SIZE(msg_queue); i++) {
if (msg_queue[i].in_used) {
if ((k_uptime_get_32() - msg_queue[i].start_tick) >= (MQTT_PUBACK_TIMEOUT_S * 1000)) {
//printk("ERR: Send message timeout! (msg id: %d)\n", msg_queue[i].msg_id);
if (msg_queue[i].resend_count) {
printk("Re-send message again. (msg id: %d)\n", msg_queue[i].msg_id);
uint32_t msg_id = msg_queue[i].msg_id;
int err = nrf_mqtt_publish(&client, msg_queue[i].p_pub_topic, MQTT_QOS_1_AT_LEAST_ONCE,
msg_queue[i].p_msg, msg_queue[i].msg_len, &msg_id);
if (err) {
printk("message resend failed! %d\n", err);
}
msg_queue[i].start_tick = k_uptime_get_32();
msg_queue[i].resend_count--;
}
else {
//printk("Maximum re-send reached! Abort message! (msg id: %d)\n", msg_queue[i].msg_id);
uint32_t msg_id = msg_queue[i].msg_id;
msg_queue_delete(msg_id);
if (config->msg_send_status_cb) {
config->msg_send_status_cb(msg_id, MSG_SEND_TIMEOUT);
}
}
}
}
}
//mutex unlock
sys_mutex_unlock(&mutex_msg_q);
}
}

static unsigned int get_next_message_id() {
if (rolling_message_id + 1 >= UINT_MAX) {
Expand Down Expand Up @@ -221,71 +351,55 @@ static bool client_init() {
return true;
}


/**@brief Function to publish data on the configured topic
*/
int iotc_nrf_mqtt_publish(struct mqtt_client *c, const char *topic, enum mqtt_qos qos,
const uint8_t *data, size_t len) {
static int nrf_mqtt_publish(struct mqtt_client *c, const char *topic, enum mqtt_qos qos,
const uint8_t *data, size_t len, uint32_t *p_msg_id) {
struct mqtt_publish_param param;
int retry_count = 2;
int code;

//mutex lock
sys_mutex_lock(&mutex_mqtt_pub, K_FOREVER);

do {

param.message.topic.qos = qos;
param.message.topic.topic.utf8 = (uint8_t *) topic;
param.message.topic.topic.size = strlen(param.message.topic.topic.utf8);
param.message.payload.data = (uint8_t *) data;
param.message.payload.len = len;
param.message.topic.qos = qos;
param.message.topic.topic.utf8 = (uint8_t *) topic;
param.message.topic.topic.size = strlen(param.message.topic.topic.utf8);
param.message.payload.data = (uint8_t *) data;
param.message.payload.len = len;
// if p_msg_id contains non-zero value, the message is re-send from msg_queue_proc().
// Assign this message with assigned messsage id in p_msg_id.
// else assign current running message id to this message.
if (p_msg_id && *p_msg_id) {
param.message_id = *p_msg_id;
}
else {
param.message_id = get_next_message_id();
param.dup_flag = 0;
param.retain_flag = 0;

code = mqtt_publish(c, &param);
if (code != 0) {
goto publish_done;
}

if (param.message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {

pending_ack_msg_id = param.message_id;
msg_ack_pending = true;

uint32_t start_ticks = k_uptime_get_32();

do {

iotc_nrf_mqtt_loop();
k_msleep(1);

} while (msg_ack_pending &&
(k_uptime_get_32() - start_ticks) < (MQTT_PUBACK_TIMEOUT_S * 1000));

if (!msg_ack_pending) {
code = 0;
break;
}

if (retry_count > 0) {
retry_count--;
if (retry_count == 0) {
msg_ack_pending = false;
code = -ETIMEDOUT;
}
}
}
param.dup_flag = 0;
param.retain_flag = 0;

code = mqtt_publish(c, &param);

// Only add the message to msg queue when:
// mqtt_publish() is successful.
// p_msg_id is NULL or contains zero. (not re-send from msg_queue_proc())
// QOS is 1.
if (!code && !(p_msg_id && *p_msg_id) && (qos == MQTT_QOS_1_AT_LEAST_ONCE)) {
msg_queue_add(param.message_id, topic, (uint8_t *)data, len);
// Return back the assigned message id if p_msg_id is not NULL.
if (p_msg_id) {
*p_msg_id = param.message_id;
}
}

} while ((param.message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) && (retry_count > 0));

publish_done:
return code;
}

//mutex unlock
sys_mutex_unlock(&mutex_mqtt_pub);
/**@brief Function to publish data on the configured topic
*/
int iotc_nrf_mqtt_publish(struct mqtt_client *c, const char *topic, enum mqtt_qos qos,
const uint8_t *data, size_t len, uint32_t *p_msg_id) {

return code;
if (p_msg_id ) {
//New message. Force p_msg_id to zero.
*p_msg_id = 0;
}
return nrf_mqtt_publish(c, topic, qos, data, len, p_msg_id);
}

static void mqtt_evt_handler(struct mqtt_client *const c,
Expand Down Expand Up @@ -317,6 +431,7 @@ static void mqtt_evt_handler(struct mqtt_client *const c,
if (config->status_cb) {
config->status_cb(MQTT_DISCONNECTED);
}
msg_queue_clear(true);
break;

case MQTT_EVT_PUBLISH: {
Expand Down Expand Up @@ -351,11 +466,9 @@ static void mqtt_evt_handler(struct mqtt_client *const c,
}

printk("Packet id: %u acknowledged\n", evt->param.puback.message_id);

if (msg_ack_pending) {
if (pending_ack_msg_id == evt->param.puback.message_id) {
msg_ack_pending = false;
}
msg_queue_delete(evt->param.puback.message_id);
if (config->msg_send_status_cb) {
config->msg_send_status_cb(evt->param.puback.message_id, MSG_SEND_SUCCESS);
}
break;

Expand Down Expand Up @@ -429,6 +542,9 @@ bool iotc_nrf_mqtt_init(IotconnectMqttConfig *c, IotclSyncResponse *sr) {
return false;
}

//init msg queue
msg_queue_init();

fds.fd = -1;
sync_response = sr;
config = c;
Expand Down Expand Up @@ -458,6 +574,9 @@ bool iotc_nrf_mqtt_init(IotconnectMqttConfig *c, IotclSyncResponse *sr) {
void iotc_nrf_mqtt_loop(void) {
int err;

// process msg queue procedure
msg_queue_proc();

//if no FD return.
if (fds.fd == -1) {
return;
Expand Down
15 changes: 12 additions & 3 deletions iotconnect-sdk/src/iotconnect.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ static void on_iotconnect_status(IotconnectConnectionStatus status) {
}
}


static void on_msg_send_status(uint32_t msg_id, IotconnectMsgSendStatus status) {
if (config.msg_send_status_cb) {
config.msg_send_status_cb(msg_id, status);
}
}
///////////////////////////////////////////////////////////////////////////////////
// Get All twin property from C2D
void iotconnect_sdk_disconnect() {
Expand All @@ -202,8 +206,12 @@ void iotconnect_sdk_disconnect() {
k_msleep(100);
}

void iotconnect_sdk_send_packet(const char *data) {
if (0 != iotc_nrf_mqtt_publish(&client, sync_response->broker.pub_topic, MQTT_QOS_0_AT_MOST_ONCE, data, strlen(data))) {
// Send MQTT message.
// p_msg_id points to a 32-bit variable that store the assigned message id of the message
// being sent.
void iotconnect_sdk_send_packet(const char *data, uint32_t *p_msg_id) {
if (0 != iotc_nrf_mqtt_publish(&client, sync_response->broker.pub_topic, MQTT_QOS_1_AT_LEAST_ONCE, data, strlen(data),
....p_msg_id)) {
printk("\n\t Device_Attributes_Data Publish failure");
}
}
Expand Down Expand Up @@ -302,6 +310,7 @@ int iotconnect_sdk_init() {
mqtt_config.tls_verify = CONFIG_PEER_VERIFY;
mqtt_config.data_cb = iotc_on_mqtt_data;
mqtt_config.status_cb = on_iotconnect_status;
mqtt_config.msg_send_status_cb = on_msg_send_status;

if (!iotc_nrf_mqtt_init(&mqtt_config, sync_response)) {
return -8;
Expand Down
Loading