| | |
| | | #define PROG_VERSION "v1.0.0" |
| | | #define DAEMON_PIDFILE "/tmp/.thingsboard.pid" |
| | | |
| | | void *mqtt_sub_worker(void *args); |
| | | void *mqtt_pub_worker(void *args); |
| | | void *thingsboard_worker(void *args); |
| | | |
| | | static void program_usage(char *progname) |
| | | { |
| | | |
| | | printf("Usage: %s [OPTION]...\n", progname); |
| | | printf(" %s is LingYun studio MQTT daemon program running on RaspberryPi\n", progname); |
| | | printf(" %s is LingYun studio thingsboard client running on RaspberryPi\n", progname); |
| | | |
| | | printf("\nMandatory arguments to long options are mandatory for short options too:\n"); |
| | | printf(" -b[daemon ] Running in daemon mode\n"); |
| | |
| | | { |
| | | int daemon = 0; |
| | | pthread_t tid; |
| | | mqtt_ctx_t ctx; |
| | | iotd_ctx_t ctx; |
| | | char *conf_file="/etc/thingsboard.conf"; |
| | | int debug = 0; |
| | | int opt; |
| | |
| | | default: |
| | | break; |
| | | } |
| | | |
| | | } |
| | | |
| | | /* parser configure file */ |
| | |
| | | /* initial mosquitto library */ |
| | | mosquitto_lib_init(); |
| | | |
| | | /* create MQTT subsciber thread */ |
| | | if( thread_start(&tid, mqtt_sub_worker, &ctx ) < 0 ) |
| | | /* start thingsboard MQTT thread */ |
| | | if( thread_start(&tid, thingsboard_worker, &ctx.mqtt ) < 0 ) |
| | | { |
| | | log_error("Start MQTT subsciber worker thread failure\n"); |
| | | log_error("Start thingsboard MQTT worker thread failure\n"); |
| | | goto cleanup; |
| | | } |
| | | log_info("Start MQTT subsciber worker thread ok\n"); |
| | | log_info("Start thingsboard MQTT worker thread ok\n"); |
| | | |
| | | /* create MQTT publisher thread */ |
| | | if( thread_start(&tid, mqtt_pub_worker, &ctx) < 0 ) |
| | | /* control thread loop */ |
| | | while( !g_signal.stop ) |
| | | { |
| | | log_error("Start MQTT publisher worker thread failure\n"); |
| | | goto cleanup; |
| | | } |
| | | log_info("Start MQTT publisher worker thread ok\n"); |
| | | |
| | | while( ! g_signal.stop ) |
| | | { |
| | | msleep(1000); |
| | | sleep(1); |
| | | } |
| | | |
| | | cleanup: |
| | |
| | | log_close(); |
| | | |
| | | return 0; |
| | | } /* ----- End of main() ----- */ |
| | | } |
| | | |
| | | |
| | | /* |
| | | * +--------------------------------+ |
| | | * | ThingsBoard MQTT Thread | |
| | | * +--------------------------------+ |
| | | */ |
| | | |
| | | /* process publisher(uplink) data */ |
| | | void pub_connect_callback(struct mosquitto *mosq, void *userdata, int result) |
| | | { |
| | | mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; |
| | | mqtt_ctx_t *mqtt = (mqtt_ctx_t *)userdata; |
| | | iotd_ctx_t *iotd = to_iotd(mqtt); |
| | | int rv = 0; |
| | | char msg[128]; |
| | | float temp = 0.0; /* temperature */ |
| | | float rh = 0.0; /* relative humidity */ |
| | | int retain = 0; |
| | | static time_t last_time = 0; |
| | | |
| | | if( result ) |
| | | { |
| | | log_error("Publisher connect to broker server[%s:%d] failed, rv=%d\n", ctx->host, ctx->port, result); |
| | | /* publish time is not arrive */ |
| | | if( !check_timeout(&last_time, mqtt->interval) ) |
| | | return ; |
| | | } |
| | | |
| | | log_info("Publisher connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port); |
| | | log_debug("Publish topic '%s'\n", ctx->pubTopic); |
| | | |
| | | if( ctx->hwconf.sht2x ) |
| | | log_debug("Publish topic '%s'\n", mqtt->pubTopic); |
| | | if( iotd->hwinfo.sht2x ) |
| | | { |
| | | memset(msg, 0, sizeof(msg)); |
| | | |
| | | log_debug("SHT2X temperature and humidity sensor enabled, start broadcast it\n"); |
| | | if( sht2x_get_temp_humidity(&temp, &rh)<0 ) |
| | | return ; |
| | | |
| | | if( 0 == sht2x_get_temp_humidity(&temp, &rh) ) |
| | | snprintf(msg, sizeof(msg), "{\"temperature\":%.2f, \"humidity\":%.2f}", temp, rh); |
| | | |
| | | rv = mosquitto_publish(mosq, NULL, ctx->pubTopic, strlen(msg), msg, ctx->pubQos, retain); |
| | | memset(msg, 0, sizeof(msg)); |
| | | snprintf(msg, sizeof(msg), "{\"temperature\":%.2f, \"humidity\":%.2f}", temp, rh); |
| | | rv = mosquitto_publish(mosq, NULL, mqtt->pubTopic, strlen(msg), msg, mqtt->pubQos, retain); |
| | | if( rv ) |
| | | { |
| | | log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv); |
| | |
| | | } |
| | | } |
| | | |
| | | msleep(500); |
| | | log_info("Publisher broadcast over and disconnect broker now\n", ctx->host, ctx->port); |
| | | mosquitto_disconnect(mosq); |
| | | |
| | | return ; |
| | | } |
| | | |
| | | |
| | | void *mqtt_pub_worker(void *args) |
| | | { |
| | | mqtt_ctx_t *ctx = (mqtt_ctx_t *)args; |
| | | struct mosquitto *mosq; |
| | | bool session = true; |
| | | |
| | | mosq = mosquitto_new(ctx->devid, session, ctx); |
| | | if( !mosq ) |
| | | { |
| | | log_error("mosquitto_new failure\n"); |
| | | return NULL; |
| | | } |
| | | |
| | | /* connnect to broker by token or uid/pwd */ |
| | | if( strlen(ctx->token ) > 0) |
| | | { |
| | | log_info("Using token authentication\n"); |
| | | mosquitto_username_pw_set(mosq, ctx->token, NULL); |
| | | } |
| | | else if( strlen(ctx->uid)> 0 && strlen(ctx->pwd)> 0 ) |
| | | { |
| | | log_info("Using username/password authentication\n"); |
| | | mosquitto_username_pw_set(mosq, ctx->uid, ctx->pwd); |
| | | } |
| | | |
| | | /* set callback functions */ |
| | | mosquitto_connect_callback_set(mosq, pub_connect_callback); |
| | | |
| | | while( !g_signal.stop ) |
| | | { |
| | | /* connect to MQTT broker */ |
| | | if( mosquitto_connect(mosq, ctx->host, ctx->port, ctx->keepalive) ) |
| | | { |
| | | log_error("Publisher connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno)); |
| | | msleep(1000); |
| | | continue; |
| | | } |
| | | |
| | | /* -1: use default timeout 1000ms 1: unused */ |
| | | mosquitto_loop_forever(mosq, -1, 1); |
| | | |
| | | /* Publisher broadcast sensors data message interval time, unit seconds */ |
| | | sleep( ctx->interval ); |
| | | } |
| | | |
| | | mosquitto_destroy(mosq); |
| | | return NULL; |
| | | } |
| | | |
| | | void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result) |
| | | { |
| | | mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; |
| | | |
| | | if( result ) |
| | | { |
| | | log_error("Subscriber connect to broker server failed, rv=%d\n", result); |
| | | return ; |
| | | } |
| | | |
| | | log_info("Subscriber connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port); |
| | | mosquitto_subscribe(mosq, NULL, ctx->subTopic, ctx->subQos); |
| | | } |
| | | |
| | | void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result) |
| | | { |
| | | mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; |
| | | |
| | | log_warn("Subscriber disconnect to broker server[%s:%d], reason=%d\n", ctx->host, ctx->port, result); |
| | | } |
| | | |
| | | /* process subscriber(downlink) data */ |
| | | void proc_json_items(cJSON *root, mqtt_ctx_t *ctx) |
| | | { |
| | | cJSON *item_method; |
| | |
| | | item_method = cJSON_GetObjectItem(root, "method"); |
| | | if (item_method == NULL || !cJSON_IsString(item_method)) |
| | | { |
| | | printf("JSON missing method\n"); |
| | | log_error("JSON missing method\n"); |
| | | return; |
| | | } |
| | | method = item_method->valuestring; |
| | |
| | | /* 只处理 setValue 命令 */ |
| | | if (strcmp(method, "setValue") != 0) |
| | | { |
| | | printf("Ignore method: %s\n", method); |
| | | log_error("Ignore method: %s\n", method); |
| | | return; |
| | | } |
| | | |
| | |
| | | item_params = cJSON_GetObjectItem(root, "params"); |
| | | if (item_params == NULL || !cJSON_IsObject(item_params)) |
| | | { |
| | | printf("JSON missing params\n"); |
| | | log_error("JSON missing params\n"); |
| | | return; |
| | | } |
| | | |
| | |
| | | item_device = cJSON_GetObjectItem(item_params, "device"); |
| | | if (item_device == NULL || !cJSON_IsString(item_device)) |
| | | { |
| | | printf("JSON missing params.device\n"); |
| | | log_error("JSON missing params.device\n"); |
| | | return; |
| | | } |
| | | device = item_device->valuestring; |
| | |
| | | item_status = cJSON_GetObjectItem(item_params, "status"); |
| | | if (item_status == NULL || !(cJSON_IsBool(item_status))) |
| | | { |
| | | printf("JSON missing params.status\n"); |
| | | log_error("JSON missing params.status\n"); |
| | | return; |
| | | } |
| | | status = cJSON_IsTrue(item_status) ? ON : OFF; |
| | | |
| | | /* 映射 LED 名称并调用控制函数 */ |
| | | if (strcmp(device, "RedLed") == 0) |
| | | if ( !strcmp(device, "RedLed") ) |
| | | { |
| | | turn_led(LED_R, status); |
| | | } |
| | | else if (strcmp(device, "GreenLed") == 0) |
| | | else if ( !strcmp(device, "GreenLed") ) |
| | | { |
| | | turn_led(LED_G, status); |
| | | } |
| | | else if (strcmp(device, "BlueLed") == 0) |
| | | else if ( !strcmp(device, "BlueLed") ) |
| | | { |
| | | turn_led(LED_B, status); |
| | | } |
| | |
| | | return ; |
| | | } |
| | | |
| | | log_debug("Subscriber receive message: '%s'\n", message->payload); |
| | | log_info("Subscriber receive message: '%s'\n", message->payload); |
| | | |
| | | root = cJSON_Parse(message->payload); |
| | | if( !root ) |
| | |
| | | return ; |
| | | } |
| | | |
| | | /* process receive message data */ |
| | | proc_json_items(root, ctx); |
| | | |
| | | /* must delete it, or it will result memory leak */ |
| | |
| | | return ; |
| | | } |
| | | |
| | | void mqtt_connect_callback(struct mosquitto *mosq, void *userdata, int result) |
| | | { |
| | | mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; |
| | | |
| | | void *mqtt_sub_worker(void *args) |
| | | if( result ) |
| | | { |
| | | log_error("mosquitto connect to broker server failed, rv=%d\n", result); |
| | | return ; |
| | | } |
| | | |
| | | log_info("mosquitto connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port); |
| | | mosquitto_subscribe(mosq, NULL, ctx->subTopic, ctx->subQos); |
| | | } |
| | | |
| | | void mqtt_disconnect_callback(struct mosquitto *mosq, void *userdata, int result) |
| | | { |
| | | mqtt_ctx_t *ctx = (mqtt_ctx_t *)userdata; |
| | | |
| | | log_warn("mosquitto disconnect to broker server[%s:%d], reason=%d\n", ctx->host, ctx->port, result); |
| | | } |
| | | |
| | | void *thingsboard_worker(void *args) |
| | | { |
| | | mqtt_ctx_t *ctx = (mqtt_ctx_t *)args; |
| | | struct mosquitto *mosq; |
| | | bool session = true; |
| | | int rv = 0; |
| | | |
| | | mosq = mosquitto_new(ctx->devid, session, ctx); |
| | | if( !mosq ) |
| | |
| | | /* connnect to broker by token or uid/pwd */ |
| | | if( strlen(ctx->token ) > 0) |
| | | { |
| | | log_info("Using token authentication\n"); |
| | | log_info("Using token authentication: '%s'\n", ctx->token); |
| | | mosquitto_username_pw_set(mosq, ctx->token, NULL); |
| | | } |
| | | else if( strlen(ctx->uid)> 0 && strlen(ctx->pwd)> 0 ) |
| | |
| | | } |
| | | |
| | | /* set callback functions */ |
| | | mosquitto_connect_callback_set(mosq, sub_connect_callback); |
| | | mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback); |
| | | mosquitto_connect_callback_set(mosq, mqtt_connect_callback); |
| | | mosquitto_disconnect_callback_set(mosq, mqtt_disconnect_callback); |
| | | mosquitto_message_callback_set(mosq, sub_message_callback); |
| | | |
| | | while( !g_signal.stop ) |
| | |
| | | /* connect to MQTT broker */ |
| | | if( mosquitto_connect(mosq, ctx->host, ctx->port, ctx->keepalive) ) |
| | | { |
| | | log_error("Subscriber connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno)); |
| | | msleep(1000); |
| | | log_error("mosquitto connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno)); |
| | | sleep(1); |
| | | continue; |
| | | } |
| | | |
| | | /* -1: use default timeout 1000ms 1: unused */ |
| | | mosquitto_loop_forever(mosq, -1, 1); |
| | | while(!g_signal.stop) |
| | | { |
| | | /* periodically publish and report data */ |
| | | pub_connect_callback(mosq, ctx, MOSQ_ERR_SUCCESS); |
| | | |
| | | /* MQTT process in Non-blocking mode, timeout for 1s */ |
| | | if( MOSQ_ERR_SUCCESS != (rv = mosquitto_loop(mosq, 1000, 1)) ) |
| | | { |
| | | log_warn("MQTT loop error: %s, reconnecting...\n", mosquitto_strerror(rv)); |
| | | break; |
| | | } |
| | | } |
| | | |
| | | mosquitto_disconnect(mosq); |
| | | sleep(1); |
| | | } |
| | | |
| | | mosquitto_destroy(mosq); |