/********************************************************************************* * Copyright: (C) 2019 LingYun IoT System Studio * All rights reserved. * * Filename: main.c * Description: This file is MQTT publisher/subscriber example program. * * Version: 1.0.0(29/01/19) * Author: Guo Wenxue * ChangeLog: 1, Release initial version on "29/01/19 15:34:41" * ********************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include "logger.h" #include "proc.h" #include "config.h" #include "ds18b20.h" #include "sht20.h" #include "leds.h" #include "relay.h" #define PROG_VERSION "v1.0.0" #define DAEMON_PIDFILE "/tmp/.iotd.pid" void *mqtt_worker(void *args); static void program_usage(char *progname) { printf("Usage: %s [OPTION]...\n", progname); printf(" %s is LingYun studio iotd program running on RaspberryPi\n", progname); printf("\nMandatory arguments to long options are mandatory for short options too:\n"); printf(" -b[daemon ] Running in daemon mode\n"); printf(" -d[debug ] Running in debug mode\n"); printf(" -c[conf ] Specify configure file\n"); printf(" -h[help ] Display this help information\n"); printf(" -v[version ] Display the program version\n"); printf("\n%s version %s\n", progname, PROG_VERSION); return; } int main (int argc, char **argv) { int daemon = 0; pthread_t tid; iotd_ctx_t ctx; char *conf_file="/etc/iotd.conf"; int debug = 0; int opt; char *progname=NULL; struct option long_options[] = { {"conf", required_argument, NULL, 'c'}, {"daemon", no_argument, NULL, 'b'}, {"debug", no_argument, NULL, 'd'}, {"version", no_argument, NULL, 'v'}, {"help", no_argument, NULL, 'h'}, {NULL, 0, NULL, 0} }; progname = (char *)basename(argv[0]); /* parser the command line parameters */ while ((opt = getopt_long(argc, argv, "c:bdvh", long_options, NULL)) != -1) { switch (opt) { case 'c': /* Set configure file */ conf_file = optarg; break; case 'b': /* Set daemon running */ daemon = 1; break; case 'd': /* Set debug running */ debug = 1; break; case 'v': /* Get software version */ printf("%s version %s\n", progname, PROG_VERSION); return 0; case 'h': /* Get help information */ program_usage(progname); return 0; default: break; } } /* parser configure file */ if( parser_conf(conf_file, &ctx, debug)<0 ) { fprintf(stderr, "Parser iotd configure file failure\n"); return -2; } /* install signal proc handler */ install_default_signal(); /* check program already running or not, if already running then exit, or set running as daemon */ if( check_set_program_running(daemon, DAEMON_PIDFILE) < 0 ) goto cleanup; /* initial mosquitto library */ mosquitto_lib_init(); /* start MQTT thread */ if( thread_start(&tid, mqtt_worker, &ctx.mqtt ) < 0 ) { log_error("Start MQTT worker thread failure\n"); goto cleanup; } log_info("Start MQTT worker thread ok\n"); /* control thread loop */ while( !g_signal.stop ) { sleep(1); } cleanup: mosquitto_lib_cleanup(); log_close(); return 0; } /* * +--------------------------------+ * | MQTT Thread worker | * +--------------------------------+ */ /* process publisher(uplink) data */ void pub_connect_callback(struct mosquitto *mosq, void *userdata, int result) { mqtt_ctx_t *mqtt = (mqtt_ctx_t *)userdata; iotd_ctx_t *iotd = to_iotd(mqtt); int rv = 0; char msg[128]; float temp = 0.0; /* temperature */ float rh = 0.0; /* relative humidity */ int retain = 0; static time_t last_time = 0; /* publish time is not arrive */ if( !check_timeout(&last_time, mqtt->interval) ) return ; log_debug("Publish topic '%s'\n", mqtt->pubTopic); if( iotd->hwinfo.ds18b20 ) { log_debug("DS18B20 temperature sensor enabled, start broadcast it\n"); if( ds18b20_get_temperature(&temp)<0 ) return ; memset(msg, 0, sizeof(msg)); snprintf(msg, sizeof(msg), "{\"temperature\":%.2f}", temp); rv = mosquitto_publish(mosq, NULL, mqtt->pubTopic, strlen(msg), msg, mqtt->pubQos, retain); if( rv ) { log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv); } else { log_info("Publisher broadcast message '%s' ok\n", msg); } } if( iotd->hwinfo.sht2x ) { log_debug("SHT2X temperature and humidity sensor enabled, start broadcast it\n"); if( sht2x_get_temp_humidity(&temp, &rh)<0 ) return ; memset(msg, 0, sizeof(msg)); snprintf(msg, sizeof(msg), "{\"temperature\":%.2f, \"humidity\":%.2f}", temp, rh); rv = mosquitto_publish(mosq, NULL, mqtt->pubTopic, strlen(msg), msg, mqtt->pubQos, retain); if( rv ) { log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv); } else { log_info("Publisher broadcast message '%s' ok\n", msg); } } return ; } /* process subscriber(downlink) data */ void proc_json_items(cJSON *root, mqtt_ctx_t *mqtt) { iotd_ctx_t *iotd = to_iotd(mqtt); cJSON *item; char leds[LED_CNT][16] = {"RedLed", "GreenLed", "BlueLed"}; char relays[RELAY_CNT][16] = {"Relay"}; int which; if( !root ) { log_error("Invalid input arguments $root\n"); return ; } /* parser JSON command for relays */ if( iotd->hwinfo.relay ) { for(which=0; whichvaluestring, "on") ) turn_relay(which, ON); else if( strcasecmp(item->valuestring, "off") ) turn_relay(which, OFF); } } /* parser JSON command for RGB leds */ if( iotd->hwinfo.led ) { for(which=0; whichvaluestring, "on") ) turn_led(which, ON); else if( strcasecmp(item->valuestring, "off") ) turn_led(which, OFF); } } return ; } void sub_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; cJSON *root = NULL; if ( !message->payloadlen ) { log_error("%s (null)\n", message->topic); return ; } log_info("Subscriber receive message: '%s'\n", message->payload); root = cJSON_Parse(message->payload); if( !root ) { log_error("cJSON_Parse parser failure: %s\n", cJSON_GetErrorPtr()); return ; } /* process receive message data */ proc_json_items(root, ctx); /* must delete it, or it will result memory leak */ cJSON_Delete(root); return ; } void mqtt_connect_callback(struct mosquitto *mosq, void *userdata, int result) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; if( result ) { log_error("mosquitto connect to broker server failed, rv=%d\n", result); return ; } log_info("mosquitto connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port); mosquitto_subscribe(mosq, NULL, ctx->subTopic, ctx->subQos); } void mqtt_disconnect_callback(struct mosquitto *mosq, void *userdata, int result) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; log_warn("mosquitto disconnect to broker server[%s:%d], reason=%d\n", ctx->host, ctx->port, result); } void *mqtt_worker(void *args) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)args; struct mosquitto *mosq; bool session = true; int rv = 0; mosq = mosquitto_new(ctx->devid, session, ctx); if( !mosq ) { log_error("mosquitto_new failure\n"); return NULL; } /* connnect to broker by token or uid/pwd */ if( strlen(ctx->token ) > 0) { log_info("Using token authentication: '%s'\n", ctx->token); mosquitto_username_pw_set(mosq, ctx->token, NULL); } else if( strlen(ctx->uid)> 0 && strlen(ctx->pwd)> 0 ) { log_info("Using username/password authentication\n"); mosquitto_username_pw_set(mosq, ctx->uid, ctx->pwd); } /* set callback functions */ mosquitto_connect_callback_set(mosq, mqtt_connect_callback); mosquitto_disconnect_callback_set(mosq, mqtt_disconnect_callback); mosquitto_message_callback_set(mosq, sub_message_callback); while( !g_signal.stop ) { /* connect to MQTT broker */ if( mosquitto_connect(mosq, ctx->host, ctx->port, ctx->keepalive) ) { log_error("mosquitto connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno)); sleep(1); continue; } while(!g_signal.stop) { /* periodically publish and report data */ pub_connect_callback(mosq, ctx, MOSQ_ERR_SUCCESS); /* MQTT process in Non-blocking mode, timeout for 1s */ if( MOSQ_ERR_SUCCESS != (rv = mosquitto_loop(mosq, 1000, 1)) ) { log_warn("MQTT loop error: %s, reconnecting...\n", mosquitto_strerror(rv)); break; } } mosquitto_disconnect(mosq); sleep(1); } mosquitto_destroy(mosq); return NULL; }