5 files modified
3 files added
| | |
| | | #define __CONF_H_ |
| | | |
| | | #include "hal/hal.h" |
| | | #include "mqtt/mqtt.h" |
| | | |
| | | enum |
| | | { |
| | |
| | | int loglevel; /* logger level */ |
| | | int logsize; /* logger file maxsize, oversize will rollback */ |
| | | } log_ctx_t; |
| | | |
| | | typedef struct mqtt_ctx_s |
| | | { |
| | | char id[32]; /* production ID */ |
| | | |
| | | /* Broker settings */ |
| | | char host[128]; /* MQTT broker server name */ |
| | | int port; /* MQTT broker listen port */ |
| | | char uid[64]; /* username */ |
| | | char pwd[64]; /* password */ |
| | | int keepalive; /* MQTT broker send PING message to subsciber/publisher keepalive timeout<seconds> */ |
| | | |
| | | /* Subscriber settings */ |
| | | int sub_enable; /* Subscriber enable or not */ |
| | | char subTopic[256]; /* Subscriber topic */ |
| | | int subQos; /* Subscriber Qos */ |
| | | |
| | | /* Publisher settings */ |
| | | int pub_enable; /* Publisher enable or not */ |
| | | char pubTopic[256]; /* Publisher topic */ |
| | | int pubQos; /* Publisher Qos */ |
| | | int interval; /* Publish interval */ |
| | | } mqtt_ctx_t; |
| | | |
| | | |
| | | typedef struct iotd_ctx_s |
| | | { |
| | |
| | | * *****************************************************************************/ |
| | | void daemonize(int nochdir, int noclose) |
| | | { |
| | | int retval, fd; |
| | | int rv, fd; |
| | | int i; |
| | | |
| | | /* already a daemon */ |
| | |
| | | return; |
| | | |
| | | /* fork error */ |
| | | retval = fork(); |
| | | if (retval < 0) exit(1); |
| | | rv = fork(); |
| | | if (rv < 0) exit(1); |
| | | |
| | | /* parent process exit */ |
| | | if (retval > 0) |
| | | if (rv > 0) |
| | | exit(0); |
| | | |
| | | /* obtain a new process session group */ |
| | |
| | | |
| | | return; |
| | | } |
| | | |
| | | /* **************************************************************************** |
| | | * FunctionName: check_set_program_running |
| | | * Description : check program already running or not, if not then run it and |
| | | * record pid into $pidfile |
| | | * Inputs : daemon: set program running in daemon or not |
| | | * pid_file:The record PID file path |
| | | * Output : NONE |
| | | * Return : 0: Record successfully Else: Failure |
| | | * *****************************************************************************/ |
| | | |
| | | int check_set_program_running(int daemon, char *pidfile) |
| | | { |
| | | if( !pidfile ) |
| | | return 0; |
| | | |
| | | if( check_daemon_running(pidfile) ) |
| | | { |
| | | log_err("Program already running, process exit now"); |
| | | return -1; |
| | | } |
| | | |
| | | if( daemon ) |
| | | { |
| | | if( set_daemon_running(pidfile) < 0 ) |
| | | { |
| | | log_err("set program running as daemon failure\n"); |
| | | return -2; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if( record_daemon_pid(pidfile) < 0 ) |
| | | { |
| | | log_err("record program running PID failure\n"); |
| | | return -3; |
| | | } |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | |
| | | /* **************************************************************************** |
| | | * FunctionName: record_daemon_pid |
| | |
| | | * *****************************************************************************/ |
| | | int check_daemon_running(const char *pid_file) |
| | | { |
| | | int retVal = -1; |
| | | int rv = -1; |
| | | struct stat fStatBuf; |
| | | |
| | | retVal = stat(pid_file, &fStatBuf); |
| | | if (0 == retVal) |
| | | rv = stat(pid_file, &fStatBuf); |
| | | if (0 == rv) |
| | | { |
| | | pid_t pid = -1; |
| | | printf("PID record file \"%s\" exist.\n", pid_file); |
| | |
| | | pid = get_daemon_pid(pid_file); |
| | | if (pid > 0) /* Process pid exist */ |
| | | { |
| | | if ((retVal = kill(pid, 0)) == 0) |
| | | if ((rv = kill(pid, 0)) == 0) |
| | | { |
| | | printf("Program with PID[%d] seems running.\n", pid); |
| | | return 1; |
| | |
| | | /* start a new thread to run $thread_workbody point function */ |
| | | int thread_start(pthread_t *thread_id, thread_body_t thread_workbody, void *thread_arg) |
| | | { |
| | | int retval = 0; |
| | | int rv = 0; |
| | | pthread_t tid; |
| | | |
| | | pthread_attr_t thread_attr; |
| | | pthread_attr_t thread_attr; |
| | | |
| | | /* Initialize the thread attribute */ |
| | | retval = pthread_attr_init(&thread_attr); |
| | | if(retval) |
| | | rv = pthread_attr_init(&thread_attr); |
| | | if(rv) |
| | | return -1; |
| | | |
| | | /* Set the stack size of the thread */ |
| | | retval = pthread_attr_setstacksize(&thread_attr, 120 * 1024); |
| | | if(retval) |
| | | rv = pthread_attr_setstacksize(&thread_attr, 120 * 1024); |
| | | if(rv) |
| | | goto CleanUp; |
| | | |
| | | /* Set thread to detached state:Don`t need pthread_join */ |
| | | retval = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED); |
| | | if(retval) |
| | | rv = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED); |
| | | if(rv) |
| | | goto CleanUp; |
| | | |
| | | /* Create the thread */ |
| | | retval = pthread_create(thread_id, &thread_attr, thread_workbody, thread_arg); |
| | | if(retval) |
| | | rv = pthread_create(&tid, &thread_attr, thread_workbody, thread_arg); |
| | | if(rv) |
| | | goto CleanUp; |
| | | |
| | | |
| | | CleanUp: |
| | | |
| | | |
| | | if( thread_id ) |
| | | { |
| | | if( rv ) |
| | | *thread_id = 0; |
| | | else |
| | | *thread_id = tid; |
| | | } |
| | | |
| | | /* Destroy the attributes of thread */ |
| | | pthread_attr_destroy(&thread_attr); |
| | | return retval; |
| | | return rv; |
| | | } |
| | | |
| | | |
| | |
| | | /* excute a linux command by system() */ |
| | | extern void exec_system_cmd(const char *format, ...); |
| | | |
| | | /* check program already running or not, if not then run it and record pid into $pidfile */ |
| | | extern int check_set_program_running(int daemon, char *pidfile); |
| | | |
| | | /* check program already running or not from $pid_file */ |
| | | extern int check_daemon_running(const char *pid_file); |
| | | |
| | |
| | | #include "lylib/util_proc.h" |
| | | #include "hal/hal.h" |
| | | #include "conf/conf.h" |
| | | #include "mqtt/mqtt.h" |
| | | |
| | | #define PROG_VERSION "v1.0.0" |
| | | #define DAEMON_PIDFILE "/tmp/.iotd.pid" |
| | | |
| | | |
| | | int check_set_program_running(int daemon); |
| | | void *mqtt_sub_worker(void *args); |
| | | |
| | | static void program_usage(char *progname) |
| | | { |
| | | |
| | | printf("Usage: %s [OPTION]...\n", progname); |
| | | printf(" %s is LingYun studio MQTT daemon program running on RaspberryPi\n", progname); |
| | | |
| | |
| | | |
| | | printf("\n%s version %s\n", progname, PROG_VERSION); |
| | | return; |
| | | } |
| | | |
| | | void auto_light_off(int signum) |
| | | { |
| | | log_nrml("turn hallway auto light off now\n"); |
| | | //turn_light(LIGHT_HALLWAY, OFF); |
| | | } |
| | | |
| | | int main (int argc, char **argv) |
| | |
| | | } |
| | | |
| | | install_default_signal(); |
| | | signal(SIGALRM, auto_light_off); |
| | | |
| | | if( check_set_program_running(daemon) < 0 ) |
| | | if( check_set_program_running(daemon, DAEMON_PIDFILE) < 0 ) |
| | | goto OUT; |
| | | |
| | | mosquitto_lib_init(); |
| | | |
| | | if( thread_start(&tid, mqtt_sub_worker, &ctx ) < 0 ) |
| | | |
| | | /*+--------------------------------------------+ |
| | | *| Start MQTT subsciber thread if enable | |
| | | *+--------------------------------------------+*/ |
| | | if( ctx.mqtt_ctx.sub_enable ) |
| | | { |
| | | log_fatal("Start MQTT subsciber worker thread failure\n"); |
| | | goto OUT; |
| | | if( thread_start(&tid, mqtt_sub_worker, &ctx ) < 0 ) |
| | | { |
| | | log_fatal("Start MQTT subsciber worker thread failure\n"); |
| | | goto OUT; |
| | | } |
| | | else |
| | | { |
| | | log_nrml("Start MQTT subsciber worker thread ok\n"); |
| | | } |
| | | } |
| | | log_nrml("Start MQTT subsciber worker thread ok\n"); |
| | | |
| | | |
| | | /*+--------------------------------------------+ |
| | | *| Start MQTT publisher thread if enable | |
| | | *+--------------------------------------------+*/ |
| | | if( ctx.mqtt_ctx.pub_enable ) |
| | | { |
| | | if( thread_start(&tid, mqtt_pub_worker, &ctx ) < 0 ) |
| | | { |
| | | log_fatal("Start MQTT publisher worker thread failure\n"); |
| | | goto OUT; |
| | | } |
| | | else |
| | | { |
| | | log_nrml("Start MQTT publisher worker thread ok\n"); |
| | | } |
| | | } |
| | | |
| | | |
| | | log_nrml("Start infrared monitor thread working...\n"); |
| | | while( ! g_signal.stop ) |
| | |
| | | return 0; |
| | | } /* ----- End of main() ----- */ |
| | | |
| | | int check_set_program_running(int daemon) |
| | | { |
| | | if( check_daemon_running(DAEMON_PIDFILE) ) |
| | | { |
| | | log_err("Program already running, process exit now"); |
| | | return -1; |
| | | } |
| | | |
| | | if( daemon ) |
| | | { |
| | | if( set_daemon_running(DAEMON_PIDFILE) < 0 ) |
| | | { |
| | | log_err("set program running as daemon failure\n"); |
| | | return -2; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if( record_daemon_pid(DAEMON_PIDFILE) < 0 ) |
| | | { |
| | | log_err("record program running PID failure\n"); |
| | | return -3; |
| | | } |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result) |
| | | { |
| | | iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; |
| | | |
| | | if( result ) |
| | | { |
| | | log_err("Subscriber connect to broker server failed, rv=%d\n", result); |
| | | return ; |
| | | } |
| | | |
| | | log_nrml("Subscriber connect to broker server[%s:%d] successfully\n", ctx->mqtt_ctx.host, ctx->mqtt_ctx.port); |
| | | mosquitto_subscribe(mosq, NULL, ctx->mqtt_ctx.subTopic, ctx->mqtt_ctx.subQos); |
| | | } |
| | | |
| | | void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result) |
| | | { |
| | | iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; |
| | | |
| | | log_warn("Subscriber disconnect to broker server[%s:%d], reason=%d\n", |
| | | ctx->mqtt_ctx.host, ctx->mqtt_ctx.port, result); |
| | | } |
| | | |
| | | void proc_json_items(cJSON *root) |
| | | { |
| | | int i; |
| | | char *value; |
| | | cJSON *item; |
| | | cJSON *array; |
| | | |
| | | if( !root ) |
| | | { |
| | | log_err("Invalid input arguments $root\n"); |
| | | return ; |
| | | } |
| | | |
| | | for( i=0; i<cJSON_GetArraySize(root); i++ ) |
| | | { |
| | | item = cJSON_GetArrayItem(root, i); |
| | | if( !item ) |
| | | break; |
| | | |
| | | /* if item is cJSON_Object, then recursive call proc_json */ |
| | | if( cJSON_Object == item->type ) |
| | | { |
| | | proc_json_items(item); |
| | | } |
| | | else if( cJSON_Array != item->type ) |
| | | { |
| | | value = cJSON_Print(item); |
| | | |
| | | /* light controled by relay */ |
| | | if( !strcasecmp(item->string, "hallway") ) |
| | | { |
| | | if( strcasestr(value, "on") ) |
| | | { |
| | | //turn_light(LIGHT_HALLWAY, ON); |
| | | log_nrml("Turn on hallway light\n"); |
| | | } |
| | | else if( strcasestr(value, "off") ) |
| | | { |
| | | //turn_light(LIGHT_HALLWAY, OFF); |
| | | log_nrml("Turn off hallway light\n"); |
| | | } |
| | | } |
| | | else if( !strcasecmp(item->string, "livroom_left") ) |
| | | { |
| | | if( strcasestr(value, "on") ) |
| | | { |
| | | //turn_light(LIGHT_LVROOM_LEFT, ON); |
| | | log_nrml("Turn on livingroom left light\n"); |
| | | } |
| | | else if( strcasestr(value, "off") ) |
| | | { |
| | | //turn_light(LIGHT_LVROOM_LEFT, OFF); |
| | | log_nrml("Turn off livingroom left light\n"); |
| | | } |
| | | } |
| | | if( !strcasecmp(item->string, "livroom_right") ) |
| | | { |
| | | if( strcasestr(value, "on") ) |
| | | { |
| | | //turn_light(LIGHT_LVROOM_RIGHT, ON); |
| | | log_nrml("Turn on livingroom right light\n"); |
| | | } |
| | | else if( strcasestr(value, "off") ) |
| | | { |
| | | //turn_light(LIGHT_LVROOM_RIGHT, OFF); |
| | | log_nrml("Turn off livingroom right light\n"); |
| | | } |
| | | } |
| | | |
| | | free(value); /* must free it, or it will result memory leak */ |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | void sub_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) |
| | | { |
| | | iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; |
| | | |
| | | cJSON *root = NULL; |
| | | cJSON *item; |
| | | char *value; |
| | | |
| | | |
| | | if ( !message->payloadlen ) |
| | | { |
| | | log_err("%s (null)\n", message->topic); |
| | | return ; |
| | | } |
| | | |
| | | log_dbg("Subscriber receive message: '%s'\n", message->payload); |
| | | |
| | | root = cJSON_Parse(message->payload); |
| | | if( !root ) |
| | | { |
| | | log_err("cJSON_Parse parser failure: %s\n", cJSON_GetErrorPtr()); |
| | | return ; |
| | | } |
| | | |
| | | /* check ID matched or not */ |
| | | item = cJSON_GetObjectItem(root, "id"); |
| | | if( !item ) |
| | | { |
| | | log_err("cJSON_Parse get ID failure: %s\n", cJSON_GetErrorPtr()); |
| | | goto OUT; |
| | | } |
| | | |
| | | value = cJSON_PrintUnformatted(item); |
| | | if( strcasecmp(value, ctx->mqtt_ctx.id) ) |
| | | { |
| | | log_warn("cJSON_Parse get ID not matchs [%s<->%s], drop this message!\n", value, ctx->mqtt_ctx.id); |
| | | free(value); |
| | | goto OUT; |
| | | } |
| | | |
| | | free(value); |
| | | |
| | | /* proc JSON mesage */ |
| | | proc_json_items(root); |
| | | |
| | | OUT: |
| | | cJSON_Delete(root); /* must delete it, or it will result memory leak */ |
| | | return ; |
| | | } |
| | | |
| | | |
| | | void *mqtt_sub_worker(void *args) |
| | | { |
| | | struct mosquitto *mosq; |
| | | bool session = true; |
| | | |
| | | iotd_ctx_t *ctx = (iotd_ctx_t *)args; |
| | | mqtt_ctx_t *mqtt_ctx; |
| | | |
| | | if( !ctx ) |
| | | { |
| | | log_err("Invalid input arguments\n"); |
| | | return NULL; |
| | | } |
| | | |
| | | mqtt_ctx = &ctx->mqtt_ctx; |
| | | |
| | | |
| | | mosq = mosquitto_new(NULL, session, ctx); |
| | | if( !mosq ) |
| | | { |
| | | log_err("mosquitto_new failure\n"); |
| | | return NULL; |
| | | } |
| | | |
| | | /* set connnect to broker username and password */ |
| | | if( strlen(mqtt_ctx->uid)> 0 && strlen(mqtt_ctx->pwd)> 0 ) |
| | | mosquitto_username_pw_set(mosq, mqtt_ctx->uid, mqtt_ctx->pwd); |
| | | |
| | | /* set callback functions */ |
| | | mosquitto_connect_callback_set(mosq, sub_connect_callback); |
| | | mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback); |
| | | mosquitto_message_callback_set(mosq, sub_message_callback); |
| | | |
| | | while( !g_signal.stop ) |
| | | { |
| | | /* connect to MQTT broker */ |
| | | if( mosquitto_connect(mosq, mqtt_ctx->host, mqtt_ctx->port, mqtt_ctx->keepalive) ) |
| | | { |
| | | log_err("Subscriber connect to broker[%s:%d] failure: %s\n", mqtt_ctx->host, mqtt_ctx->port, strerror(errno)); |
| | | msleep(1000); |
| | | continue; |
| | | } |
| | | |
| | | /* -1: use default timeout 1000ms 1: unused */ |
| | | mosquitto_loop_forever(mosq, -1, 1); |
| | | } |
| | | |
| | | mosquitto_destroy(mosq); |
| | | return NULL; |
| | | } |
| | | |
| | |
| | | export STRIP=${CROSSTOOL}strip |
| | | export LDFLAGS |
| | | |
| | | DIRS= conf hal lylib |
| | | DIRS= conf hal lylib mqtt |
| | | |
| | | DIRS_PATH=$(patsubst %,${PRJ_PATH}/%,$(DIRS)) |
| | | CFLAGS=$(patsubst %,-I%,$(DIRS_PATH)) |
New file |
| | |
| | | |
| | | PWD=$(shell pwd ) |
| | | |
| | | LIBNAME=$(shell basename ${PWD} ) |
| | | TOPDIR=$(shell dirname ${PWD} ) |
| | | |
| | | all: clean |
| | | @rm -f *.o |
| | | @${CROSSTOOL}gcc ${CFLAGS} -I${TOPDIR} -c *.c |
| | | ${CROSSTOOL}ar -rcs lib${LIBNAME}.a *.o |
| | | |
| | | clean: |
| | | @rm -f *.o |
| | | @rm -f *.a |
| | | |
New file |
| | |
| | | /******************************************************************************** |
| | | * Copyright: (C) 2021 LingYun IoT System Studio |
| | | * All rights reserved. |
| | | * |
| | | * Filename: mqtt.h |
| | | * Description: This head file is MQTT subscriber and publisher thread code |
| | | * |
| | | * Version: 1.0.0(20/04/21) |
| | | * Author: Guo Wenxue <guowenxue@gmail.com> |
| | | * ChangeLog: 1, Release initial version on "20/04/21 15:46:42" |
| | | * |
| | | ********************************************************************************/ |
| | | |
| | | #include <string.h> |
| | | #include <math.h> |
| | | #include <cjson/cJSON.h> |
| | | #include <mosquitto.h> |
| | | |
| | | #include "lylib/util_proc.h" |
| | | #include "lylib/util_time.h" |
| | | #include "lylib/logger.h" |
| | | #include "hal/hal.h" |
| | | #include "conf/conf.h" |
| | | |
| | | |
| | | /*+-------------------------------------------+ |
| | | *| | |
| | | *| MQTT publisher thread worker code | |
| | | *| | |
| | | *+-------------------------------------------+*/ |
| | | |
| | | |
| | | void pub_connect_callback(struct mosquitto *mosq, void *userdata, int result) |
| | | { |
| | | iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; |
| | | mqtt_ctx_t *mqtt_ctx; |
| | | hal_ctx_t *hal_ctx; |
| | | int rv = 0; |
| | | char msg[128]; |
| | | float temp = 0.0; /* temperature */ |
| | | float rh = 0.0; /* relative humidity */ |
| | | int retain = 0; |
| | | |
| | | mqtt_ctx = &ctx->mqtt_ctx; |
| | | hal_ctx = &ctx->hal_ctx; |
| | | |
| | | |
| | | if( result ) |
| | | { |
| | | log_err("Publisher connect to broker server[%s:%d] failed, rv=%d\n", mqtt_ctx->host, mqtt_ctx->port, result); |
| | | return ; |
| | | } |
| | | log_nrml("Publisher connect to broker server[%s:%d] successfully\n", mqtt_ctx->host, mqtt_ctx->port); |
| | | |
| | | |
| | | if( hal_ctx->ds18b20_enable ) |
| | | { |
| | | memset(msg, 0, sizeof(msg)); |
| | | |
| | | if( ds18b20_get_temperature(&temp) < 0 ) |
| | | snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\" }", mqtt_ctx->id); |
| | | else |
| | | snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\" }", mqtt_ctx->id, temp); |
| | | |
| | | rv = mosquitto_publish(mosq, NULL, mqtt_ctx->pubTopic, strlen(msg), msg, mqtt_ctx->pubQos, retain); |
| | | if( rv ) |
| | | { |
| | | log_err("Publisher broadcast message '%s' failure: %d\n", msg, rv); |
| | | } |
| | | else |
| | | { |
| | | log_nrml("Publisher broadcast message '%s' ok\n", msg); |
| | | } |
| | | } |
| | | |
| | | if( hal_ctx->sht2x_enable ) |
| | | { |
| | | memset(msg, 0, sizeof(msg)); |
| | | |
| | | if( sht2x_get_temp_humidity(&temp, &rh) < 0 ) |
| | | snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\", \"RH\":\"error\" }", mqtt_ctx->id); |
| | | else |
| | | snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\", \"RH\":\"%.2f\" }", mqtt_ctx->id, temp, rh); |
| | | |
| | | rv = mosquitto_publish(mosq, NULL, mqtt_ctx->pubTopic, strlen(msg), msg, mqtt_ctx->pubQos, retain); |
| | | if( rv ) |
| | | { |
| | | log_err("Publisher broadcast message '%s' failure: %d\n", msg, rv); |
| | | } |
| | | else |
| | | { |
| | | log_nrml("Publisher broadcast message '%s' ok\n", msg); |
| | | } |
| | | } |
| | | |
| | | |
| | | log_nrml("Publisher broadcast over and disconnect broker now\n", mqtt_ctx->host, mqtt_ctx->port); |
| | | mosquitto_disconnect(mosq); |
| | | |
| | | return ; |
| | | } |
| | | |
| | | |
| | | void *mqtt_pub_worker(void *args) |
| | | { |
| | | struct mosquitto *mosq; |
| | | bool session = true; |
| | | |
| | | iotd_ctx_t *ctx = (iotd_ctx_t *)args; |
| | | mqtt_ctx_t *mqtt_ctx; |
| | | |
| | | if( !ctx ) |
| | | { |
| | | log_err("Invalid input arguments\n"); |
| | | return NULL; |
| | | } |
| | | |
| | | mqtt_ctx = &ctx->mqtt_ctx; |
| | | |
| | | |
| | | mosq = mosquitto_new(NULL, session, ctx); |
| | | if( !mosq ) |
| | | { |
| | | log_err("mosquitto_new failure\n"); |
| | | return NULL; |
| | | } |
| | | |
| | | /* set connnect to broker username and password */ |
| | | if( strlen(mqtt_ctx->uid)> 0 && strlen(mqtt_ctx->pwd)> 0 ) |
| | | mosquitto_username_pw_set(mosq, mqtt_ctx->uid, mqtt_ctx->pwd); |
| | | |
| | | /* set callback functions */ |
| | | mosquitto_connect_callback_set(mosq, pub_connect_callback); |
| | | |
| | | while( !g_signal.stop ) |
| | | { |
| | | /* connect to MQTT broker */ |
| | | if( mosquitto_connect(mosq, mqtt_ctx->host, mqtt_ctx->port, mqtt_ctx->keepalive) ) |
| | | { |
| | | log_err("Publisher connect to broker[%s:%d] failure: %s\n", mqtt_ctx->host, mqtt_ctx->port, strerror(errno)); |
| | | msleep(1000); |
| | | continue; |
| | | } |
| | | |
| | | /* -1: use default timeout 1000ms 1: unused */ |
| | | mosquitto_loop_forever(mosq, -1, 1); |
| | | |
| | | /* Publisher broadcast sensors data message interval time, unit seconds */ |
| | | sleep( mqtt_ctx->interval ); |
| | | } |
| | | |
| | | mosquitto_destroy(mosq); |
| | | return NULL; |
| | | } |
| | | |
| | | /*+-------------------------------------------+ |
| | | *| | |
| | | *| MQTT Subscriber thread worker code | |
| | | *| | |
| | | *+-------------------------------------------+*/ |
| | | |
| | | void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result) |
| | | { |
| | | iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; |
| | | |
| | | if( result ) |
| | | { |
| | | log_err("Subscriber connect to broker server failed, rv=%d\n", result); |
| | | return ; |
| | | } |
| | | |
| | | log_nrml("Subscriber connect to broker server[%s:%d] successfully\n", ctx->mqtt_ctx.host, ctx->mqtt_ctx.port); |
| | | mosquitto_subscribe(mosq, NULL, ctx->mqtt_ctx.subTopic, ctx->mqtt_ctx.subQos); |
| | | } |
| | | |
| | | void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result) |
| | | { |
| | | iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; |
| | | |
| | | log_warn("Subscriber disconnect to broker server[%s:%d], reason=%d\n", |
| | | ctx->mqtt_ctx.host, ctx->mqtt_ctx.port, result); |
| | | } |
| | | |
| | | void proc_json_items(cJSON *root) |
| | | { |
| | | int i; |
| | | char *value; |
| | | cJSON *item; |
| | | cJSON *array; |
| | | |
| | | if( !root ) |
| | | { |
| | | log_err("Invalid input arguments $root\n"); |
| | | return ; |
| | | } |
| | | |
| | | for( i=0; i<cJSON_GetArraySize(root); i++ ) |
| | | { |
| | | item = cJSON_GetArrayItem(root, i); |
| | | if( !item ) |
| | | break; |
| | | |
| | | /* if item is cJSON_Object, then recursive call proc_json */ |
| | | if( cJSON_Object == item->type ) |
| | | { |
| | | proc_json_items(item); |
| | | } |
| | | else if( cJSON_Array != item->type ) |
| | | { |
| | | value = cJSON_Print(item); |
| | | |
| | | /* light controled by relay */ |
| | | if( !strcasecmp(item->string, "hallway") ) |
| | | { |
| | | if( strcasestr(value, "on") ) |
| | | { |
| | | //turn_light(LIGHT_HALLWAY, ON); |
| | | log_nrml("Turn on hallway light\n"); |
| | | } |
| | | else if( strcasestr(value, "off") ) |
| | | { |
| | | //turn_light(LIGHT_HALLWAY, OFF); |
| | | log_nrml("Turn off hallway light\n"); |
| | | } |
| | | } |
| | | else if( !strcasecmp(item->string, "livroom_left") ) |
| | | { |
| | | if( strcasestr(value, "on") ) |
| | | { |
| | | //turn_light(LIGHT_LVROOM_LEFT, ON); |
| | | log_nrml("Turn on livingroom left light\n"); |
| | | } |
| | | else if( strcasestr(value, "off") ) |
| | | { |
| | | //turn_light(LIGHT_LVROOM_LEFT, OFF); |
| | | log_nrml("Turn off livingroom left light\n"); |
| | | } |
| | | } |
| | | if( !strcasecmp(item->string, "livroom_right") ) |
| | | { |
| | | if( strcasestr(value, "on") ) |
| | | { |
| | | //turn_light(LIGHT_LVROOM_RIGHT, ON); |
| | | log_nrml("Turn on livingroom right light\n"); |
| | | } |
| | | else if( strcasestr(value, "off") ) |
| | | { |
| | | //turn_light(LIGHT_LVROOM_RIGHT, OFF); |
| | | log_nrml("Turn off livingroom right light\n"); |
| | | } |
| | | } |
| | | |
| | | free(value); /* must free it, or it will result memory leak */ |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | void sub_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) |
| | | { |
| | | iotd_ctx_t *ctx = (iotd_ctx_t *)userdata; |
| | | |
| | | cJSON *root = NULL; |
| | | cJSON *item; |
| | | char *value; |
| | | |
| | | |
| | | if ( !message->payloadlen ) |
| | | { |
| | | log_err("%s (null)\n", message->topic); |
| | | return ; |
| | | } |
| | | |
| | | log_dbg("Subscriber receive message: '%s'\n", message->payload); |
| | | |
| | | root = cJSON_Parse(message->payload); |
| | | if( !root ) |
| | | { |
| | | log_err("cJSON_Parse parser failure: %s\n", cJSON_GetErrorPtr()); |
| | | return ; |
| | | } |
| | | |
| | | /* check ID matched or not */ |
| | | item = cJSON_GetObjectItem(root, "id"); |
| | | if( !item ) |
| | | { |
| | | log_err("cJSON_Parse get ID failure: %s\n", cJSON_GetErrorPtr()); |
| | | goto OUT; |
| | | } |
| | | |
| | | value = cJSON_PrintUnformatted(item); |
| | | if( strcasecmp(value, ctx->mqtt_ctx.id) ) |
| | | { |
| | | log_warn("cJSON_Parse get ID not matchs [%s<->%s], drop this message!\n", value, ctx->mqtt_ctx.id); |
| | | free(value); |
| | | goto OUT; |
| | | } |
| | | |
| | | free(value); |
| | | |
| | | /* proc JSON mesage */ |
| | | proc_json_items(root); |
| | | |
| | | OUT: |
| | | cJSON_Delete(root); /* must delete it, or it will result memory leak */ |
| | | return ; |
| | | } |
| | | |
| | | |
| | | void *mqtt_sub_worker(void *args) |
| | | { |
| | | struct mosquitto *mosq; |
| | | bool session = true; |
| | | |
| | | iotd_ctx_t *ctx = (iotd_ctx_t *)args; |
| | | mqtt_ctx_t *mqtt_ctx; |
| | | |
| | | if( !ctx ) |
| | | { |
| | | log_err("Invalid input arguments\n"); |
| | | return NULL; |
| | | } |
| | | |
| | | mqtt_ctx = &ctx->mqtt_ctx; |
| | | |
| | | |
| | | mosq = mosquitto_new(NULL, session, ctx); |
| | | if( !mosq ) |
| | | { |
| | | log_err("mosquitto_new failure\n"); |
| | | return NULL; |
| | | } |
| | | |
| | | /* set connnect to broker username and password */ |
| | | if( strlen(mqtt_ctx->uid)> 0 && strlen(mqtt_ctx->pwd)> 0 ) |
| | | mosquitto_username_pw_set(mosq, mqtt_ctx->uid, mqtt_ctx->pwd); |
| | | |
| | | /* set callback functions */ |
| | | mosquitto_connect_callback_set(mosq, sub_connect_callback); |
| | | mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback); |
| | | mosquitto_message_callback_set(mosq, sub_message_callback); |
| | | |
| | | while( !g_signal.stop ) |
| | | { |
| | | /* connect to MQTT broker */ |
| | | if( mosquitto_connect(mosq, mqtt_ctx->host, mqtt_ctx->port, mqtt_ctx->keepalive) ) |
| | | { |
| | | log_err("Subscriber connect to broker[%s:%d] failure: %s\n", mqtt_ctx->host, mqtt_ctx->port, strerror(errno)); |
| | | msleep(1000); |
| | | continue; |
| | | } |
| | | |
| | | /* -1: use default timeout 1000ms 1: unused */ |
| | | mosquitto_loop_forever(mosq, -1, 1); |
| | | } |
| | | |
| | | mosquitto_destroy(mosq); |
| | | return NULL; |
| | | } |
| | | |
New file |
| | |
| | | /******************************************************************************** |
| | | * Copyright: (C) 2021 LingYun IoT System Studio |
| | | * All rights reserved. |
| | | * |
| | | * Filename: mqtt.h |
| | | * Description: This head file is MQTT subscriber and publisher thread code |
| | | * |
| | | * Version: 1.0.0(20/04/21) |
| | | * Author: Guo Wenxue <guowenxue@gmail.com> |
| | | * ChangeLog: 1, Release initial version on "20/04/21 15:46:42" |
| | | * |
| | | ********************************************************************************/ |
| | | #ifndef _MQTT_H_ |
| | | #define _MQTT_H_ |
| | | |
| | | |
| | | typedef struct mqtt_ctx_s |
| | | { |
| | | char id[32]; /* production ID */ |
| | | |
| | | /* Broker settings */ |
| | | char host[128]; /* MQTT broker server name */ |
| | | int port; /* MQTT broker listen port */ |
| | | char uid[64]; /* username */ |
| | | char pwd[64]; /* password */ |
| | | int keepalive; /* MQTT broker send PING message to subsciber/publisher keepalive timeout<seconds> */ |
| | | |
| | | /* Subscriber settings */ |
| | | int sub_enable; /* Subscriber enable or not */ |
| | | char subTopic[256]; /* Subscriber topic */ |
| | | int subQos; /* Subscriber Qos */ |
| | | |
| | | /* Publisher settings */ |
| | | int pub_enable; /* Publisher enable or not */ |
| | | char pubTopic[256]; /* Publisher topic */ |
| | | int pubQos; /* Publisher Qos */ |
| | | int interval; /* Publish interval */ |
| | | } mqtt_ctx_t; |
| | | |
| | | |
| | | extern void *mqtt_pub_worker(void *args); |
| | | extern void *mqtt_sub_worker(void *args); |
| | | |
| | | #endif /* ----- #ifndef _MQTT_H_ ----- */ |
| | | |