From b5bf869dfb976503ec146d79b588eeff61f13e50 Mon Sep 17 00:00:00 2001 From: guowenxue <guowenxue@gmail.com> Date: Tue, 20 Apr 2021 23:55:07 +0800 Subject: [PATCH] Add mqtt folder in iotd --- iotd/lylib/util_proc.c | 94 +++++- iotd/mqtt/mqtt.c | 360 +++++++++++++++++++++++++ iotd/lylib/util_proc.h | 3 iotd/makefile | 2 iotd/mqtt/mqtt.h | 45 +++ iotd/main.c | 277 ++----------------- iotd/mqtt/makefile | 15 + iotd/conf/conf.h | 25 - 8 files changed, 533 insertions(+), 288 deletions(-) diff --git a/iotd/conf/conf.h b/iotd/conf/conf.h index b905f39..68f0c50 100644 --- a/iotd/conf/conf.h +++ b/iotd/conf/conf.h @@ -14,6 +14,7 @@ #define __CONF_H_ #include "hal/hal.h" +#include "mqtt/mqtt.h" enum { @@ -30,30 +31,6 @@ int loglevel; /* logger level */ int logsize; /* logger file maxsize, oversize will rollback */ } log_ctx_t; - -typedef struct mqtt_ctx_s -{ - char id[32]; /* production ID */ - - /* Broker settings */ - char host[128]; /* MQTT broker server name */ - int port; /* MQTT broker listen port */ - char uid[64]; /* username */ - char pwd[64]; /* password */ - int keepalive; /* MQTT broker send PING message to subsciber/publisher keepalive timeout<seconds> */ - - /* Subscriber settings */ - int sub_enable; /* Subscriber enable or not */ - char subTopic[256]; /* Subscriber topic */ - int subQos; /* Subscriber Qos */ - - /* Publisher settings */ - int pub_enable; /* Publisher enable or not */ - char pubTopic[256]; /* Publisher topic */ - int pubQos; /* Publisher Qos */ - int interval; /* Publish interval */ -} mqtt_ctx_t; - typedef struct iotd_ctx_s { diff --git a/iotd/lylib/util_proc.c b/iotd/lylib/util_proc.c index a1af244..700bb65 100644 --- a/iotd/lylib/util_proc.c +++ b/iotd/lylib/util_proc.c @@ -97,7 +97,7 @@ * *****************************************************************************/ void daemonize(int nochdir, int noclose) { - int retval, fd; + int rv, fd; int i; /* already a daemon */ @@ -105,11 +105,11 @@ return; /* fork error */ - retval = fork(); - if (retval < 0) exit(1); + rv = fork(); + if (rv < 0) exit(1); /* parent process exit */ - if (retval > 0) + if (rv > 0) exit(0); /* obtain a new process session group */ @@ -141,6 +141,49 @@ return; } + +/* **************************************************************************** + * FunctionName: check_set_program_running + * Description : check program already running or not, if not then run it and + * record pid into $pidfile + * Inputs : daemon: set program running in daemon or not + * pid_file:The record PID file path + * Output : NONE + * Return : 0: Record successfully Else: Failure + * *****************************************************************************/ + +int check_set_program_running(int daemon, char *pidfile) +{ + if( !pidfile ) + return 0; + + if( check_daemon_running(pidfile) ) + { + log_err("Program already running, process exit now"); + return -1; + } + + if( daemon ) + { + if( set_daemon_running(pidfile) < 0 ) + { + log_err("set program running as daemon failure\n"); + return -2; + } + } + else + { + if( record_daemon_pid(pidfile) < 0 ) + { + log_err("record program running PID failure\n"); + return -3; + } + } + + return 0; +} + + /* **************************************************************************** * FunctionName: record_daemon_pid @@ -229,11 +272,11 @@ * *****************************************************************************/ int check_daemon_running(const char *pid_file) { - int retVal = -1; + int rv = -1; struct stat fStatBuf; - retVal = stat(pid_file, &fStatBuf); - if (0 == retVal) + rv = stat(pid_file, &fStatBuf); + if (0 == rv) { pid_t pid = -1; printf("PID record file \"%s\" exist.\n", pid_file); @@ -241,7 +284,7 @@ pid = get_daemon_pid(pid_file); if (pid > 0) /* Process pid exist */ { - if ((retVal = kill(pid, 0)) == 0) + if ((rv = kill(pid, 0)) == 0) { printf("Program with PID[%d] seems running.\n", pid); return 1; @@ -327,34 +370,45 @@ /* start a new thread to run $thread_workbody point function */ int thread_start(pthread_t *thread_id, thread_body_t thread_workbody, void *thread_arg) { - int retval = 0; + int rv = 0; + pthread_t tid; - pthread_attr_t thread_attr; + pthread_attr_t thread_attr; /* Initialize the thread attribute */ - retval = pthread_attr_init(&thread_attr); - if(retval) + rv = pthread_attr_init(&thread_attr); + if(rv) return -1; /* Set the stack size of the thread */ - retval = pthread_attr_setstacksize(&thread_attr, 120 * 1024); - if(retval) + rv = pthread_attr_setstacksize(&thread_attr, 120 * 1024); + if(rv) goto CleanUp; /* Set thread to detached state:Don`t need pthread_join */ - retval = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED); - if(retval) + rv = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED); + if(rv) goto CleanUp; /* Create the thread */ - retval = pthread_create(thread_id, &thread_attr, thread_workbody, thread_arg); - if(retval) + rv = pthread_create(&tid, &thread_attr, thread_workbody, thread_arg); + if(rv) goto CleanUp; - + CleanUp: + + + if( thread_id ) + { + if( rv ) + *thread_id = 0; + else + *thread_id = tid; + } + /* Destroy the attributes of thread */ pthread_attr_destroy(&thread_attr); - return retval; + return rv; } diff --git a/iotd/lylib/util_proc.h b/iotd/lylib/util_proc.h index 24745de..009cf70 100644 --- a/iotd/lylib/util_proc.h +++ b/iotd/lylib/util_proc.h @@ -34,6 +34,9 @@ /* excute a linux command by system() */ extern void exec_system_cmd(const char *format, ...); +/* check program already running or not, if not then run it and record pid into $pidfile */ +extern int check_set_program_running(int daemon, char *pidfile); + /* check program already running or not from $pid_file */ extern int check_daemon_running(const char *pid_file); diff --git a/iotd/main.c b/iotd/main.c index f504b8f..e7833ff 100644 --- a/iotd/main.c +++ b/iotd/main.c @@ -26,17 +26,14 @@ #include "lylib/util_proc.h" #include "hal/hal.h" #include "conf/conf.h" +#include "mqtt/mqtt.h" #define PROG_VERSION "v1.0.0" #define DAEMON_PIDFILE "/tmp/.iotd.pid" -int check_set_program_running(int daemon); -void *mqtt_sub_worker(void *args); - static void program_usage(char *progname) { - printf("Usage: %s [OPTION]...\n", progname); printf(" %s is LingYun studio MQTT daemon program running on RaspberryPi\n", progname); @@ -48,12 +45,6 @@ printf("\n%s version %s\n", progname, PROG_VERSION); return; -} - -void auto_light_off(int signum) -{ - log_nrml("turn hallway auto light off now\n"); - //turn_light(LIGHT_HALLWAY, OFF); } int main (int argc, char **argv) @@ -133,19 +124,46 @@ } install_default_signal(); - signal(SIGALRM, auto_light_off); - if( check_set_program_running(daemon) < 0 ) + if( check_set_program_running(daemon, DAEMON_PIDFILE) < 0 ) goto OUT; mosquitto_lib_init(); - if( thread_start(&tid, mqtt_sub_worker, &ctx ) < 0 ) + + /*+--------------------------------------------+ + *| Start MQTT subsciber thread if enable | + *+--------------------------------------------+*/ + if( ctx.mqtt_ctx.sub_enable ) { - log_fatal("Start MQTT subsciber worker thread failure\n"); - goto OUT; + if( thread_start(&tid, mqtt_sub_worker, &ctx ) < 0 ) + { + log_fatal("Start MQTT subsciber worker thread failure\n"); + goto OUT; + } + else + { + log_nrml("Start MQTT subsciber worker thread ok\n"); + } } - log_nrml("Start MQTT subsciber worker thread ok\n"); + + + /*+--------------------------------------------+ + *| Start MQTT publisher thread if enable | + *+--------------------------------------------+*/ + if( ctx.mqtt_ctx.pub_enable ) + { + if( thread_start(&tid, mqtt_pub_worker, &ctx ) < 0 ) + { + log_fatal("Start MQTT publisher worker thread failure\n"); + goto OUT; + } + else + { + log_nrml("Start MQTT publisher worker thread ok\n"); + } + } + log_nrml("Start infrared monitor thread working...\n"); while( ! g_signal.stop ) @@ -180,231 +198,4 @@ return 0; } /* ----- End of main() ----- */ -int check_set_program_running(int daemon) -{ - if( check_daemon_running(DAEMON_PIDFILE) ) - { - log_err("Program already running, process exit now"); - return -1; - } - - if( daemon ) - { - if( set_daemon_running(DAEMON_PIDFILE) < 0 ) - { - log_err("set program running as daemon failure\n"); - return -2; - } - } - else - { - if( record_daemon_pid(DAEMON_PIDFILE) < 0 ) - { - log_err("record program running PID failure\n"); - return -3; - } - } - - return 0; -} - - -void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result) -{ - iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; - - if( result ) - { - log_err("Subscriber connect to broker server failed, rv=%d\n", result); - return ; - } - - log_nrml("Subscriber connect to broker server[%s:%d] successfully\n", ctx->mqtt_ctx.host, ctx->mqtt_ctx.port); - mosquitto_subscribe(mosq, NULL, ctx->mqtt_ctx.subTopic, ctx->mqtt_ctx.subQos); -} - -void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result) -{ - iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; - - log_warn("Subscriber disconnect to broker server[%s:%d], reason=%d\n", - ctx->mqtt_ctx.host, ctx->mqtt_ctx.port, result); -} - -void proc_json_items(cJSON *root) -{ - int i; - char *value; - cJSON *item; - cJSON *array; - - if( !root ) - { - log_err("Invalid input arguments $root\n"); - return ; - } - - for( i=0; i<cJSON_GetArraySize(root); i++ ) - { - item = cJSON_GetArrayItem(root, i); - if( !item ) - break; - - /* if item is cJSON_Object, then recursive call proc_json */ - if( cJSON_Object == item->type ) - { - proc_json_items(item); - } - else if( cJSON_Array != item->type ) - { - value = cJSON_Print(item); - - /* light controled by relay */ - if( !strcasecmp(item->string, "hallway") ) - { - if( strcasestr(value, "on") ) - { - //turn_light(LIGHT_HALLWAY, ON); - log_nrml("Turn on hallway light\n"); - } - else if( strcasestr(value, "off") ) - { - //turn_light(LIGHT_HALLWAY, OFF); - log_nrml("Turn off hallway light\n"); - } - } - else if( !strcasecmp(item->string, "livroom_left") ) - { - if( strcasestr(value, "on") ) - { - //turn_light(LIGHT_LVROOM_LEFT, ON); - log_nrml("Turn on livingroom left light\n"); - } - else if( strcasestr(value, "off") ) - { - //turn_light(LIGHT_LVROOM_LEFT, OFF); - log_nrml("Turn off livingroom left light\n"); - } - } - if( !strcasecmp(item->string, "livroom_right") ) - { - if( strcasestr(value, "on") ) - { - //turn_light(LIGHT_LVROOM_RIGHT, ON); - log_nrml("Turn on livingroom right light\n"); - } - else if( strcasestr(value, "off") ) - { - //turn_light(LIGHT_LVROOM_RIGHT, OFF); - log_nrml("Turn off livingroom right light\n"); - } - } - - free(value); /* must free it, or it will result memory leak */ - } - } - -} - -void sub_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) -{ - iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; - - cJSON *root = NULL; - cJSON *item; - char *value; - - - if ( !message->payloadlen ) - { - log_err("%s (null)\n", message->topic); - return ; - } - - log_dbg("Subscriber receive message: '%s'\n", message->payload); - - root = cJSON_Parse(message->payload); - if( !root ) - { - log_err("cJSON_Parse parser failure: %s\n", cJSON_GetErrorPtr()); - return ; - } - - /* check ID matched or not */ - item = cJSON_GetObjectItem(root, "id"); - if( !item ) - { - log_err("cJSON_Parse get ID failure: %s\n", cJSON_GetErrorPtr()); - goto OUT; - } - - value = cJSON_PrintUnformatted(item); - if( strcasecmp(value, ctx->mqtt_ctx.id) ) - { - log_warn("cJSON_Parse get ID not matchs [%s<->%s], drop this message!\n", value, ctx->mqtt_ctx.id); - free(value); - goto OUT; - } - - free(value); - - /* proc JSON mesage */ - proc_json_items(root); - -OUT: - cJSON_Delete(root); /* must delete it, or it will result memory leak */ - return ; -} - - -void *mqtt_sub_worker(void *args) -{ - struct mosquitto *mosq; - bool session = true; - - iotd_ctx_t *ctx = (iotd_ctx_t *)args; - mqtt_ctx_t *mqtt_ctx; - - if( !ctx ) - { - log_err("Invalid input arguments\n"); - return NULL; - } - - mqtt_ctx = &ctx->mqtt_ctx; - - - mosq = mosquitto_new(NULL, session, ctx); - if( !mosq ) - { - log_err("mosquitto_new failure\n"); - return NULL; - } - - /* set connnect to broker username and password */ - if( strlen(mqtt_ctx->uid)> 0 && strlen(mqtt_ctx->pwd)> 0 ) - mosquitto_username_pw_set(mosq, mqtt_ctx->uid, mqtt_ctx->pwd); - - /* set callback functions */ - mosquitto_connect_callback_set(mosq, sub_connect_callback); - mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback); - mosquitto_message_callback_set(mosq, sub_message_callback); - - while( !g_signal.stop ) - { - /* connect to MQTT broker */ - if( mosquitto_connect(mosq, mqtt_ctx->host, mqtt_ctx->port, mqtt_ctx->keepalive) ) - { - log_err("Subscriber connect to broker[%s:%d] failure: %s\n", mqtt_ctx->host, mqtt_ctx->port, strerror(errno)); - msleep(1000); - continue; - } - - /* -1: use default timeout 1000ms 1: unused */ - mosquitto_loop_forever(mosq, -1, 1); - } - - mosquitto_destroy(mosq); - return NULL; -} diff --git a/iotd/makefile b/iotd/makefile index 55796ef..8386ab1 100644 --- a/iotd/makefile +++ b/iotd/makefile @@ -26,7 +26,7 @@ export STRIP=${CROSSTOOL}strip export LDFLAGS -DIRS= conf hal lylib +DIRS= conf hal lylib mqtt DIRS_PATH=$(patsubst %,${PRJ_PATH}/%,$(DIRS)) CFLAGS=$(patsubst %,-I%,$(DIRS_PATH)) diff --git a/iotd/mqtt/makefile b/iotd/mqtt/makefile new file mode 100644 index 0000000..608137a --- /dev/null +++ b/iotd/mqtt/makefile @@ -0,0 +1,15 @@ + +PWD=$(shell pwd ) + +LIBNAME=$(shell basename ${PWD} ) +TOPDIR=$(shell dirname ${PWD} ) + +all: clean + @rm -f *.o + @${CROSSTOOL}gcc ${CFLAGS} -I${TOPDIR} -c *.c + ${CROSSTOOL}ar -rcs lib${LIBNAME}.a *.o + +clean: + @rm -f *.o + @rm -f *.a + diff --git a/iotd/mqtt/mqtt.c b/iotd/mqtt/mqtt.c new file mode 100644 index 0000000..9b70fbc --- /dev/null +++ b/iotd/mqtt/mqtt.c @@ -0,0 +1,360 @@ +/******************************************************************************** + * Copyright: (C) 2021 LingYun IoT System Studio + * All rights reserved. + * + * Filename: mqtt.h + * Description: This head file is MQTT subscriber and publisher thread code + * + * Version: 1.0.0(20/04/21) + * Author: Guo Wenxue <guowenxue@gmail.com> + * ChangeLog: 1, Release initial version on "20/04/21 15:46:42" + * + ********************************************************************************/ + +#include <string.h> +#include <math.h> +#include <cjson/cJSON.h> +#include <mosquitto.h> + +#include "lylib/util_proc.h" +#include "lylib/util_time.h" +#include "lylib/logger.h" +#include "hal/hal.h" +#include "conf/conf.h" + + +/*+-------------------------------------------+ + *| | + *| MQTT publisher thread worker code | + *| | + *+-------------------------------------------+*/ + + +void pub_connect_callback(struct mosquitto *mosq, void *userdata, int result) +{ + iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; + mqtt_ctx_t *mqtt_ctx; + hal_ctx_t *hal_ctx; + int rv = 0; + char msg[128]; + float temp = 0.0; /* temperature */ + float rh = 0.0; /* relative humidity */ + int retain = 0; + + mqtt_ctx = &ctx->mqtt_ctx; + hal_ctx = &ctx->hal_ctx; + + + if( result ) + { + log_err("Publisher connect to broker server[%s:%d] failed, rv=%d\n", mqtt_ctx->host, mqtt_ctx->port, result); + return ; + } + log_nrml("Publisher connect to broker server[%s:%d] successfully\n", mqtt_ctx->host, mqtt_ctx->port); + + + if( hal_ctx->ds18b20_enable ) + { + memset(msg, 0, sizeof(msg)); + + if( ds18b20_get_temperature(&temp) < 0 ) + snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\" }", mqtt_ctx->id); + else + snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\" }", mqtt_ctx->id, temp); + + rv = mosquitto_publish(mosq, NULL, mqtt_ctx->pubTopic, strlen(msg), msg, mqtt_ctx->pubQos, retain); + if( rv ) + { + log_err("Publisher broadcast message '%s' failure: %d\n", msg, rv); + } + else + { + log_nrml("Publisher broadcast message '%s' ok\n", msg); + } + } + + if( hal_ctx->sht2x_enable ) + { + memset(msg, 0, sizeof(msg)); + + if( sht2x_get_temp_humidity(&temp, &rh) < 0 ) + snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\", \"RH\":\"error\" }", mqtt_ctx->id); + else + snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\", \"RH\":\"%.2f\" }", mqtt_ctx->id, temp, rh); + + rv = mosquitto_publish(mosq, NULL, mqtt_ctx->pubTopic, strlen(msg), msg, mqtt_ctx->pubQos, retain); + if( rv ) + { + log_err("Publisher broadcast message '%s' failure: %d\n", msg, rv); + } + else + { + log_nrml("Publisher broadcast message '%s' ok\n", msg); + } + } + + + log_nrml("Publisher broadcast over and disconnect broker now\n", mqtt_ctx->host, mqtt_ctx->port); + mosquitto_disconnect(mosq); + + return ; +} + + +void *mqtt_pub_worker(void *args) +{ + struct mosquitto *mosq; + bool session = true; + + iotd_ctx_t *ctx = (iotd_ctx_t *)args; + mqtt_ctx_t *mqtt_ctx; + + if( !ctx ) + { + log_err("Invalid input arguments\n"); + return NULL; + } + + mqtt_ctx = &ctx->mqtt_ctx; + + + mosq = mosquitto_new(NULL, session, ctx); + if( !mosq ) + { + log_err("mosquitto_new failure\n"); + return NULL; + } + + /* set connnect to broker username and password */ + if( strlen(mqtt_ctx->uid)> 0 && strlen(mqtt_ctx->pwd)> 0 ) + mosquitto_username_pw_set(mosq, mqtt_ctx->uid, mqtt_ctx->pwd); + + /* set callback functions */ + mosquitto_connect_callback_set(mosq, pub_connect_callback); + + while( !g_signal.stop ) + { + /* connect to MQTT broker */ + if( mosquitto_connect(mosq, mqtt_ctx->host, mqtt_ctx->port, mqtt_ctx->keepalive) ) + { + log_err("Publisher connect to broker[%s:%d] failure: %s\n", mqtt_ctx->host, mqtt_ctx->port, strerror(errno)); + msleep(1000); + continue; + } + + /* -1: use default timeout 1000ms 1: unused */ + mosquitto_loop_forever(mosq, -1, 1); + + /* Publisher broadcast sensors data message interval time, unit seconds */ + sleep( mqtt_ctx->interval ); + } + + mosquitto_destroy(mosq); + return NULL; +} + +/*+-------------------------------------------+ + *| | + *| MQTT Subscriber thread worker code | + *| | + *+-------------------------------------------+*/ + +void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result) +{ + iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; + + if( result ) + { + log_err("Subscriber connect to broker server failed, rv=%d\n", result); + return ; + } + + log_nrml("Subscriber connect to broker server[%s:%d] successfully\n", ctx->mqtt_ctx.host, ctx->mqtt_ctx.port); + mosquitto_subscribe(mosq, NULL, ctx->mqtt_ctx.subTopic, ctx->mqtt_ctx.subQos); +} + +void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result) +{ + iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; + + log_warn("Subscriber disconnect to broker server[%s:%d], reason=%d\n", + ctx->mqtt_ctx.host, ctx->mqtt_ctx.port, result); +} + +void proc_json_items(cJSON *root) +{ + int i; + char *value; + cJSON *item; + cJSON *array; + + if( !root ) + { + log_err("Invalid input arguments $root\n"); + return ; + } + + for( i=0; i<cJSON_GetArraySize(root); i++ ) + { + item = cJSON_GetArrayItem(root, i); + if( !item ) + break; + + /* if item is cJSON_Object, then recursive call proc_json */ + if( cJSON_Object == item->type ) + { + proc_json_items(item); + } + else if( cJSON_Array != item->type ) + { + value = cJSON_Print(item); + + /* light controled by relay */ + if( !strcasecmp(item->string, "hallway") ) + { + if( strcasestr(value, "on") ) + { + //turn_light(LIGHT_HALLWAY, ON); + log_nrml("Turn on hallway light\n"); + } + else if( strcasestr(value, "off") ) + { + //turn_light(LIGHT_HALLWAY, OFF); + log_nrml("Turn off hallway light\n"); + } + } + else if( !strcasecmp(item->string, "livroom_left") ) + { + if( strcasestr(value, "on") ) + { + //turn_light(LIGHT_LVROOM_LEFT, ON); + log_nrml("Turn on livingroom left light\n"); + } + else if( strcasestr(value, "off") ) + { + //turn_light(LIGHT_LVROOM_LEFT, OFF); + log_nrml("Turn off livingroom left light\n"); + } + } + if( !strcasecmp(item->string, "livroom_right") ) + { + if( strcasestr(value, "on") ) + { + //turn_light(LIGHT_LVROOM_RIGHT, ON); + log_nrml("Turn on livingroom right light\n"); + } + else if( strcasestr(value, "off") ) + { + //turn_light(LIGHT_LVROOM_RIGHT, OFF); + log_nrml("Turn off livingroom right light\n"); + } + } + + free(value); /* must free it, or it will result memory leak */ + } + } + +} + +void sub_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) +{ + iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; + + cJSON *root = NULL; + cJSON *item; + char *value; + + + if ( !message->payloadlen ) + { + log_err("%s (null)\n", message->topic); + return ; + } + + log_dbg("Subscriber receive message: '%s'\n", message->payload); + + root = cJSON_Parse(message->payload); + if( !root ) + { + log_err("cJSON_Parse parser failure: %s\n", cJSON_GetErrorPtr()); + return ; + } + + /* check ID matched or not */ + item = cJSON_GetObjectItem(root, "id"); + if( !item ) + { + log_err("cJSON_Parse get ID failure: %s\n", cJSON_GetErrorPtr()); + goto OUT; + } + + value = cJSON_PrintUnformatted(item); + if( strcasecmp(value, ctx->mqtt_ctx.id) ) + { + log_warn("cJSON_Parse get ID not matchs [%s<->%s], drop this message!\n", value, ctx->mqtt_ctx.id); + free(value); + goto OUT; + } + + free(value); + + /* proc JSON mesage */ + proc_json_items(root); + +OUT: + cJSON_Delete(root); /* must delete it, or it will result memory leak */ + return ; +} + + +void *mqtt_sub_worker(void *args) +{ + struct mosquitto *mosq; + bool session = true; + + iotd_ctx_t *ctx = (iotd_ctx_t *)args; + mqtt_ctx_t *mqtt_ctx; + + if( !ctx ) + { + log_err("Invalid input arguments\n"); + return NULL; + } + + mqtt_ctx = &ctx->mqtt_ctx; + + + mosq = mosquitto_new(NULL, session, ctx); + if( !mosq ) + { + log_err("mosquitto_new failure\n"); + return NULL; + } + + /* set connnect to broker username and password */ + if( strlen(mqtt_ctx->uid)> 0 && strlen(mqtt_ctx->pwd)> 0 ) + mosquitto_username_pw_set(mosq, mqtt_ctx->uid, mqtt_ctx->pwd); + + /* set callback functions */ + mosquitto_connect_callback_set(mosq, sub_connect_callback); + mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback); + mosquitto_message_callback_set(mosq, sub_message_callback); + + while( !g_signal.stop ) + { + /* connect to MQTT broker */ + if( mosquitto_connect(mosq, mqtt_ctx->host, mqtt_ctx->port, mqtt_ctx->keepalive) ) + { + log_err("Subscriber connect to broker[%s:%d] failure: %s\n", mqtt_ctx->host, mqtt_ctx->port, strerror(errno)); + msleep(1000); + continue; + } + + /* -1: use default timeout 1000ms 1: unused */ + mosquitto_loop_forever(mosq, -1, 1); + } + + mosquitto_destroy(mosq); + return NULL; +} + diff --git a/iotd/mqtt/mqtt.h b/iotd/mqtt/mqtt.h new file mode 100644 index 0000000..b14935f --- /dev/null +++ b/iotd/mqtt/mqtt.h @@ -0,0 +1,45 @@ +/******************************************************************************** + * Copyright: (C) 2021 LingYun IoT System Studio + * All rights reserved. + * + * Filename: mqtt.h + * Description: This head file is MQTT subscriber and publisher thread code + * + * Version: 1.0.0(20/04/21) + * Author: Guo Wenxue <guowenxue@gmail.com> + * ChangeLog: 1, Release initial version on "20/04/21 15:46:42" + * + ********************************************************************************/ +#ifndef _MQTT_H_ +#define _MQTT_H_ + + +typedef struct mqtt_ctx_s +{ + char id[32]; /* production ID */ + + /* Broker settings */ + char host[128]; /* MQTT broker server name */ + int port; /* MQTT broker listen port */ + char uid[64]; /* username */ + char pwd[64]; /* password */ + int keepalive; /* MQTT broker send PING message to subsciber/publisher keepalive timeout<seconds> */ + + /* Subscriber settings */ + int sub_enable; /* Subscriber enable or not */ + char subTopic[256]; /* Subscriber topic */ + int subQos; /* Subscriber Qos */ + + /* Publisher settings */ + int pub_enable; /* Publisher enable or not */ + char pubTopic[256]; /* Publisher topic */ + int pubQos; /* Publisher Qos */ + int interval; /* Publish interval */ +} mqtt_ctx_t; + + +extern void *mqtt_pub_worker(void *args); +extern void *mqtt_sub_worker(void *args); + +#endif /* ----- #ifndef _MQTT_H_ ----- */ + -- Gitblit v1.9.1