From b5bf869dfb976503ec146d79b588eeff61f13e50 Mon Sep 17 00:00:00 2001 From: guowenxue <guowenxue@gmail.com> Date: Tue, 20 Apr 2021 23:55:07 +0800 Subject: [PATCH] Add mqtt folder in iotd --- iotd/main.c | 277 ++++++------------------------------------------------ 1 files changed, 34 insertions(+), 243 deletions(-) diff --git a/iotd/main.c b/iotd/main.c index f504b8f..e7833ff 100644 --- a/iotd/main.c +++ b/iotd/main.c @@ -26,17 +26,14 @@ #include "lylib/util_proc.h" #include "hal/hal.h" #include "conf/conf.h" +#include "mqtt/mqtt.h" #define PROG_VERSION "v1.0.0" #define DAEMON_PIDFILE "/tmp/.iotd.pid" -int check_set_program_running(int daemon); -void *mqtt_sub_worker(void *args); - static void program_usage(char *progname) { - printf("Usage: %s [OPTION]...\n", progname); printf(" %s is LingYun studio MQTT daemon program running on RaspberryPi\n", progname); @@ -48,12 +45,6 @@ printf("\n%s version %s\n", progname, PROG_VERSION); return; -} - -void auto_light_off(int signum) -{ - log_nrml("turn hallway auto light off now\n"); - //turn_light(LIGHT_HALLWAY, OFF); } int main (int argc, char **argv) @@ -133,19 +124,46 @@ } install_default_signal(); - signal(SIGALRM, auto_light_off); - if( check_set_program_running(daemon) < 0 ) + if( check_set_program_running(daemon, DAEMON_PIDFILE) < 0 ) goto OUT; mosquitto_lib_init(); - if( thread_start(&tid, mqtt_sub_worker, &ctx ) < 0 ) + + /*+--------------------------------------------+ + *| Start MQTT subsciber thread if enable | + *+--------------------------------------------+*/ + if( ctx.mqtt_ctx.sub_enable ) { - log_fatal("Start MQTT subsciber worker thread failure\n"); - goto OUT; + if( thread_start(&tid, mqtt_sub_worker, &ctx ) < 0 ) + { + log_fatal("Start MQTT subsciber worker thread failure\n"); + goto OUT; + } + else + { + log_nrml("Start MQTT subsciber worker thread ok\n"); + } } - log_nrml("Start MQTT subsciber worker thread ok\n"); + + + /*+--------------------------------------------+ + *| Start MQTT publisher thread if enable | + *+--------------------------------------------+*/ + if( ctx.mqtt_ctx.pub_enable ) + { + if( thread_start(&tid, mqtt_pub_worker, &ctx ) < 0 ) + { + log_fatal("Start MQTT publisher worker thread failure\n"); + goto OUT; + } + else + { + log_nrml("Start MQTT publisher worker thread ok\n"); + } + } + log_nrml("Start infrared monitor thread working...\n"); while( ! g_signal.stop ) @@ -180,231 +198,4 @@ return 0; } /* ----- End of main() ----- */ -int check_set_program_running(int daemon) -{ - if( check_daemon_running(DAEMON_PIDFILE) ) - { - log_err("Program already running, process exit now"); - return -1; - } - - if( daemon ) - { - if( set_daemon_running(DAEMON_PIDFILE) < 0 ) - { - log_err("set program running as daemon failure\n"); - return -2; - } - } - else - { - if( record_daemon_pid(DAEMON_PIDFILE) < 0 ) - { - log_err("record program running PID failure\n"); - return -3; - } - } - - return 0; -} - - -void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result) -{ - iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; - - if( result ) - { - log_err("Subscriber connect to broker server failed, rv=%d\n", result); - return ; - } - - log_nrml("Subscriber connect to broker server[%s:%d] successfully\n", ctx->mqtt_ctx.host, ctx->mqtt_ctx.port); - mosquitto_subscribe(mosq, NULL, ctx->mqtt_ctx.subTopic, ctx->mqtt_ctx.subQos); -} - -void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result) -{ - iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; - - log_warn("Subscriber disconnect to broker server[%s:%d], reason=%d\n", - ctx->mqtt_ctx.host, ctx->mqtt_ctx.port, result); -} - -void proc_json_items(cJSON *root) -{ - int i; - char *value; - cJSON *item; - cJSON *array; - - if( !root ) - { - log_err("Invalid input arguments $root\n"); - return ; - } - - for( i=0; i<cJSON_GetArraySize(root); i++ ) - { - item = cJSON_GetArrayItem(root, i); - if( !item ) - break; - - /* if item is cJSON_Object, then recursive call proc_json */ - if( cJSON_Object == item->type ) - { - proc_json_items(item); - } - else if( cJSON_Array != item->type ) - { - value = cJSON_Print(item); - - /* light controled by relay */ - if( !strcasecmp(item->string, "hallway") ) - { - if( strcasestr(value, "on") ) - { - //turn_light(LIGHT_HALLWAY, ON); - log_nrml("Turn on hallway light\n"); - } - else if( strcasestr(value, "off") ) - { - //turn_light(LIGHT_HALLWAY, OFF); - log_nrml("Turn off hallway light\n"); - } - } - else if( !strcasecmp(item->string, "livroom_left") ) - { - if( strcasestr(value, "on") ) - { - //turn_light(LIGHT_LVROOM_LEFT, ON); - log_nrml("Turn on livingroom left light\n"); - } - else if( strcasestr(value, "off") ) - { - //turn_light(LIGHT_LVROOM_LEFT, OFF); - log_nrml("Turn off livingroom left light\n"); - } - } - if( !strcasecmp(item->string, "livroom_right") ) - { - if( strcasestr(value, "on") ) - { - //turn_light(LIGHT_LVROOM_RIGHT, ON); - log_nrml("Turn on livingroom right light\n"); - } - else if( strcasestr(value, "off") ) - { - //turn_light(LIGHT_LVROOM_RIGHT, OFF); - log_nrml("Turn off livingroom right light\n"); - } - } - - free(value); /* must free it, or it will result memory leak */ - } - } - -} - -void sub_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) -{ - iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; - - cJSON *root = NULL; - cJSON *item; - char *value; - - - if ( !message->payloadlen ) - { - log_err("%s (null)\n", message->topic); - return ; - } - - log_dbg("Subscriber receive message: '%s'\n", message->payload); - - root = cJSON_Parse(message->payload); - if( !root ) - { - log_err("cJSON_Parse parser failure: %s\n", cJSON_GetErrorPtr()); - return ; - } - - /* check ID matched or not */ - item = cJSON_GetObjectItem(root, "id"); - if( !item ) - { - log_err("cJSON_Parse get ID failure: %s\n", cJSON_GetErrorPtr()); - goto OUT; - } - - value = cJSON_PrintUnformatted(item); - if( strcasecmp(value, ctx->mqtt_ctx.id) ) - { - log_warn("cJSON_Parse get ID not matchs [%s<->%s], drop this message!\n", value, ctx->mqtt_ctx.id); - free(value); - goto OUT; - } - - free(value); - - /* proc JSON mesage */ - proc_json_items(root); - -OUT: - cJSON_Delete(root); /* must delete it, or it will result memory leak */ - return ; -} - - -void *mqtt_sub_worker(void *args) -{ - struct mosquitto *mosq; - bool session = true; - - iotd_ctx_t *ctx = (iotd_ctx_t *)args; - mqtt_ctx_t *mqtt_ctx; - - if( !ctx ) - { - log_err("Invalid input arguments\n"); - return NULL; - } - - mqtt_ctx = &ctx->mqtt_ctx; - - - mosq = mosquitto_new(NULL, session, ctx); - if( !mosq ) - { - log_err("mosquitto_new failure\n"); - return NULL; - } - - /* set connnect to broker username and password */ - if( strlen(mqtt_ctx->uid)> 0 && strlen(mqtt_ctx->pwd)> 0 ) - mosquitto_username_pw_set(mosq, mqtt_ctx->uid, mqtt_ctx->pwd); - - /* set callback functions */ - mosquitto_connect_callback_set(mosq, sub_connect_callback); - mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback); - mosquitto_message_callback_set(mosq, sub_message_callback); - - while( !g_signal.stop ) - { - /* connect to MQTT broker */ - if( mosquitto_connect(mosq, mqtt_ctx->host, mqtt_ctx->port, mqtt_ctx->keepalive) ) - { - log_err("Subscriber connect to broker[%s:%d] failure: %s\n", mqtt_ctx->host, mqtt_ctx->port, strerror(errno)); - msleep(1000); - continue; - } - - /* -1: use default timeout 1000ms 1: unused */ - mosquitto_loop_forever(mosq, -1, 1); - } - - mosquitto_destroy(mosq); - return NULL; -} -- Gitblit v1.9.1