/********************************************************************************* * Copyright: (C) 2019 LingYun IoT System Studio * All rights reserved. * * Filename: main.c * Description: This file * * Version: 1.0.0(29/01/19) * Author: Guo Wenxue * ChangeLog: 1, Release initial version on "29/01/19 15:34:41" * ********************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include "logger.h" #include "util_proc.h" #include "config.h" #include "sht20.h" #include "leds.h" #define PROG_VERSION "v1.0.0" #define DAEMON_PIDFILE "/tmp/.thingsboard.pid" void *thingsboard_subsciber(void *args); void *thingsboard_publisher(void *args); static void program_usage(char *progname) { printf("Usage: %s [OPTION]...\n", progname); printf(" %s is LingYun studio thingsboard client running on RaspberryPi\n", progname); printf("\nMandatory arguments to long options are mandatory for short options too:\n"); printf(" -b[daemon ] Running in daemon mode\n"); printf(" -d[debug ] Running in debug mode\n"); printf(" -c[conf ] Specify configure file\n"); printf(" -h[help ] Display this help information\n"); printf(" -v[version ] Display the program version\n"); printf("\n%s version %s\n", progname, PROG_VERSION); return; } int main (int argc, char **argv) { int daemon = 0; pthread_t tid; mqtt_ctx_t ctx; char *conf_file="/etc/thingsboard.conf"; int debug = 0; int opt; char *progname=NULL; struct option long_options[] = { {"conf", required_argument, NULL, 'c'}, {"daemon", no_argument, NULL, 'b'}, {"debug", no_argument, NULL, 'd'}, {"version", no_argument, NULL, 'v'}, {"help", no_argument, NULL, 'h'}, {NULL, 0, NULL, 0} }; progname = (char *)basename(argv[0]); /* parser the command line parameters */ while ((opt = getopt_long(argc, argv, "c:bdvh", long_options, NULL)) != -1) { switch (opt) { case 'c': /* Set configure file */ conf_file = optarg; break; case 'b': /* Set daemon running */ daemon = 1; break; case 'd': /* Set debug running */ debug = 1; break; case 'v': /* Get software version */ printf("%s version %s\n", progname, PROG_VERSION); return 0; case 'h': /* Get help information */ program_usage(progname); return 0; default: break; } } /* parser configure file */ if( parser_conf(conf_file, &ctx, debug)<0 ) { fprintf(stderr, "Parser thingsboard configure file failure\n"); return -2; } /* install signal proc handler */ install_default_signal(); /* check program already running or not, if already running then exit, or set running as daemon */ if( check_set_program_running(daemon, DAEMON_PIDFILE) < 0 ) goto cleanup; /* initial mosquitto library */ mosquitto_lib_init(); /* * +--------------------------------+ * | MQTT Subscriber Thread | * +--------------------------------+ */ if( thread_start(&tid, thingsboard_subsciber, &ctx ) < 0 ) { log_error("Start MQTT subsciber worker thread failure\n"); goto cleanup; } log_info("Start MQTT subsciber worker thread ok\n"); /* * +--------------------------------+ * | MQTT publisher Thread | * +--------------------------------+ */ if( thread_start(&tid, thingsboard_publisher, &ctx) < 0 ) { log_error("Start MQTT publisher worker thread failure\n"); goto cleanup; } log_info("Start MQTT publisher worker thread ok\n"); while( ! g_signal.stop ) { msleep(1000); } cleanup: mosquitto_lib_cleanup(); log_close(); return 0; } void pub_connect_callback(struct mosquitto *mosq, void *userdata, int result) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; int rv = 0; char msg[128]; float temp = 0.0; /* temperature */ float rh = 0.0; /* relative humidity */ int retain = 0; if( result ) { log_error("Publisher connect to broker server[%s:%d] failed, rv=%d\n", ctx->host, ctx->port, result); return ; } log_info("Publisher connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port); log_debug("Publish topic '%s'\n", ctx->pubTopic); if( ctx->hwconf.sht2x ) { memset(msg, 0, sizeof(msg)); log_debug("SHT2X temperature and humidity sensor enabled, start broadcast it\n"); if( sht2x_get_temp_humidity(&temp, &rh)<0 ) return ; snprintf(msg, sizeof(msg), "{\"temperature\":%.2f, \"humidity\":%.2f}", temp, rh); rv = mosquitto_publish(mosq, NULL, ctx->pubTopic, strlen(msg), msg, ctx->pubQos, retain); if( rv ) { log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv); } else { log_info("Publisher broadcast message '%s' ok\n", msg); } } msleep(500); log_info("Publisher broadcast over and disconnect broker now\n", ctx->host, ctx->port); mosquitto_disconnect(mosq); return ; } void *thingsboard_publisher(void *args) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)args; struct mosquitto *mosq; bool session = true; mosq = mosquitto_new(ctx->devid, session, ctx); if( !mosq ) { log_error("mosquitto_new failure\n"); return NULL; } /* connnect to broker by token or uid/pwd */ if( strlen(ctx->token ) > 0) { log_info("Using token authentication\n"); mosquitto_username_pw_set(mosq, ctx->token, NULL); } else if( strlen(ctx->uid)> 0 && strlen(ctx->pwd)> 0 ) { log_info("Using username/password authentication\n"); mosquitto_username_pw_set(mosq, ctx->uid, ctx->pwd); } /* set callback functions */ mosquitto_connect_callback_set(mosq, pub_connect_callback); while( !g_signal.stop ) { /* connect to MQTT broker */ if( mosquitto_connect(mosq, ctx->host, ctx->port, ctx->keepalive) ) { log_error("Publisher connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno)); msleep(1000); continue; } /* -1: use default timeout 1000ms 1: unused */ mosquitto_loop_forever(mosq, -1, 1); /* Publisher broadcast sensors data message interval time, unit seconds */ sleep( ctx->interval ); } mosquitto_destroy(mosq); return NULL; } void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; if( result ) { log_error("Subscriber connect to broker server failed, rv=%d\n", result); return ; } log_info("Subscriber connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port); mosquitto_subscribe(mosq, NULL, ctx->subTopic, ctx->subQos); } void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; log_warn("Subscriber disconnect to broker server[%s:%d], reason=%d\n", ctx->host, ctx->port, result); } void proc_json_items(cJSON *root, mqtt_ctx_t *ctx) { cJSON *item_method; cJSON *item_params; cJSON *item_device; cJSON *item_status; char *method; char *device; int status; /* 获取 method 字段 */ item_method = cJSON_GetObjectItem(root, "method"); if (item_method == NULL || !cJSON_IsString(item_method)) { log_error("JSON missing method\n"); return; } method = item_method->valuestring; /* 只处理 setValue 命令 */ if (strcmp(method, "setValue") != 0) { log_error("Ignore method: %s\n", method); return; } /* 获取 params 对象 */ item_params = cJSON_GetObjectItem(root, "params"); if (item_params == NULL || !cJSON_IsObject(item_params)) { log_error("JSON missing params\n"); return; } /* 获取 params.device */ item_device = cJSON_GetObjectItem(item_params, "device"); if (item_device == NULL || !cJSON_IsString(item_device)) { log_error("JSON missing params.device\n"); return; } device = item_device->valuestring; /* 获取 params.status (布尔值) */ item_status = cJSON_GetObjectItem(item_params, "status"); if (item_status == NULL || !(cJSON_IsBool(item_status))) { log_error("JSON missing params.status\n"); return; } status = cJSON_IsTrue(item_status) ? ON : OFF; /* 映射 LED 名称并调用控制函数 */ if ( !strcmp(device, "RedLed") ) { turn_led(LED_R, status); } else if ( !strcmp(device, "GreenLed") ) { turn_led(LED_G, status); } else if ( !strcmp(device, "BlueLed") ) { turn_led(LED_B, status); } return; } void sub_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; cJSON *root = NULL; if ( !message->payloadlen ) { log_error("%s (null)\n", message->topic); return ; } log_info("Subscriber receive message: '%s'\n", message->payload); root = cJSON_Parse(message->payload); if( !root ) { log_error("cJSON_Parse parser failure: %s\n", cJSON_GetErrorPtr()); return ; } proc_json_items(root, ctx); /* must delete it, or it will result memory leak */ cJSON_Delete(root); return ; } void *thingsboard_subsciber(void *args) { mqtt_ctx_t *ctx = (mqtt_ctx_t *)args; struct mosquitto *mosq; bool session = true; mosq = mosquitto_new(ctx->devid, session, ctx); if( !mosq ) { log_error("mosquitto_new failure\n"); return NULL; } /* connnect to broker by token or uid/pwd */ if( strlen(ctx->token ) > 0) { log_info("Using token authentication\n"); mosquitto_username_pw_set(mosq, ctx->token, NULL); } else if( strlen(ctx->uid)> 0 && strlen(ctx->pwd)> 0 ) { log_info("Using username/password authentication\n"); mosquitto_username_pw_set(mosq, ctx->uid, ctx->pwd); } /* set callback functions */ mosquitto_connect_callback_set(mosq, sub_connect_callback); mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback); mosquitto_message_callback_set(mosq, sub_message_callback); while( !g_signal.stop ) { /* connect to MQTT broker */ if( mosquitto_connect(mosq, ctx->host, ctx->port, ctx->keepalive) ) { log_error("Subscriber connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno)); msleep(1000); continue; } /* -1: use default timeout 1000ms 1: unused */ mosquitto_loop_forever(mosq, -1, 1); } mosquitto_destroy(mosq); return NULL; }