From 4291c22ef445b8f94d2bcf1a7a3f531d3d9f6121 Mon Sep 17 00:00:00 2001
From: guowenxue <guowenxue@gmail.com>
Date: Tue, 18 Nov 2025 10:43:20 +0800
Subject: [PATCH] update thingsboard and lightd

---
 project/lightd/etc/lightd.service           |    1 
 project/thingsboard/thingsboard.c           |  193 ++++++++---------------
 project/lightd/config.c                     |    1 
 project/lightd/config.h                     |    5 
 project/thingsboard/config.c                |   84 +++++----
 project/lightd/lightd.c                     |  105 +++++--------
 project/thingsboard/config.h                |   72 +++++---
 project/thingsboard/etc/thingsboard.service |    1 
 project/booster/util_proc.h                 |   16 ++
 9 files changed, 222 insertions(+), 256 deletions(-)

diff --git a/project/booster/util_proc.h b/project/booster/util_proc.h
index 89856c6..22a5fd7 100644
--- a/project/booster/util_proc.h
+++ b/project/booster/util_proc.h
@@ -86,4 +86,20 @@
     return ;
 }
 
+static inline int check_timeout(time_t *last_time, int interval)
+{
+    int                  timeout = 0;
+    time_t               now;
+
+    time(&now);
+
+    if( difftime(now, *last_time)>interval )
+    {
+        timeout = 1;
+        *last_time = now;
+    }
+
+    return timeout;
+}
+
 #endif
diff --git a/project/lightd/config.c b/project/lightd/config.c
index 4f2b261..a763d2e 100644
--- a/project/lightd/config.c
+++ b/project/lightd/config.c
@@ -41,7 +41,6 @@
     logger = &ctx->logger;
     hwinfo = &ctx->hwinfo;
     mqtt = &ctx->mqtt;
-    mqtt->userdata = (void *)hwinfo;
 
     ini = iniparser_load(conf_file);
     if( !ini )
diff --git a/project/lightd/config.h b/project/lightd/config.h
index 85f47ff..0e9e131 100644
--- a/project/lightd/config.h
+++ b/project/lightd/config.h
@@ -13,6 +13,7 @@
 #ifndef  __CONF_H_
 #define  __CONF_H_
 
+#include <stddef.h>
 #include "gpio.h"
 
 enum
@@ -73,6 +74,10 @@
     mqtt_ctx_t      mqtt;
 } iotd_ctx_t;
 
+/* get iotd_ctx address by mqtt_ctx address */
+#define container_of(ptr, type, member) ((type *)((char *)(ptr) - offsetof(type, member)))
+#define to_iotd(ctx)	container_of(ctx, iotd_ctx_t, mqtt);
+
 extern int parser_conf(const char *conf_file, iotd_ctx_t *ctx, int debug);
 
 #endif   /* ----- #ifndef _CONF_H_  ----- */
diff --git a/project/lightd/etc/lightd.service b/project/lightd/etc/lightd.service
index 4129fd7..c37f65c 100644
--- a/project/lightd/etc/lightd.service
+++ b/project/lightd/etc/lightd.service
@@ -6,6 +6,7 @@
 Type=simple
 ExecStartPre=/bin/rm -f /tmp/.lightd.pid /var/log/lightd.log
 ExecStart=/usr/bin/lightd -c /etc/lightd.conf
+ExecStop=/bin/rm -f /tmp/.lightd.pid
 
 Restart=always
 RestartSec=2
diff --git a/project/lightd/lightd.c b/project/lightd/lightd.c
index 2b1a00a..22ce0cb 100644
--- a/project/lightd/lightd.c
+++ b/project/lightd/lightd.c
@@ -31,13 +31,12 @@
 #define PROG_VERSION               "v1.0.0"
 #define DAEMON_PIDFILE             "/tmp/.lightd.pid"
 
-int check_timeout(time_t *last_time, int interval);
 void *thingsboard_worker(void *args);
 
 static void program_usage(char *progname)
 {
     printf("Usage: %s [OPTION]...\n", progname);
-    printf(" %s is LingYun studio MQTT daemon program running on RaspberryPi\n", progname);
+    printf(" %s is LingYun studio lightd program running on RaspberryPi\n", progname);
 
     printf("\nMandatory arguments to long options are mandatory for short options too:\n");
     printf(" -b[daemon  ]  Running in daemon mode\n");
@@ -127,17 +126,13 @@
     /* initial gpio hardware */
     gpio_init(&hwinfo->gpio);
 
-    /*
-     * +--------------------------------+
-     * |    ThingsBoard MQTT Thread     |
-     * +--------------------------------+
-     */
+    /* start thingsboard MQTT thread */
     if( thread_start(&tid, thingsboard_worker, &ctx.mqtt ) < 0 )
     {
-        log_error("Start MQTT subsciber worker thread failure\n");
+        log_error("Start thingsboard MQTT worker thread failure\n");
         goto cleanup;
     }
-    log_info("Start MQTT subsciber worker thread ok\n");
+    log_info("Start thingsboard MQTT worker thread ok\n");
 
     /*
      * +--------------------------------+
@@ -154,7 +149,7 @@
             if( !hwinfo->tsl2561 )
             {
                 log_error("TSL2561 is not present, do not turn on the light\n");
-				lux = hwinfo->lux_threshold + 100.0;
+                lux = hwinfo->lux_threshold + 100.0;
                 goto nextloop;
             }
 
@@ -162,7 +157,7 @@
             if( tsl2561_get_lux(&lux) < 0 )
             {
                 log_error("TSL2561 sample failed, do not turn on the light\n");
-				lux = hwinfo->lux_threshold + 100.0;
+                lux = hwinfo->lux_threshold + 100.0;
                 goto nextloop;
             }
 
@@ -210,27 +205,11 @@
  * +--------------------------------+
  */
 
-int check_timeout(time_t *last_time, int interval)
-{
-    int                  timeout = 0;
-    time_t               now;
-
-    time(&now);
-
-    if( difftime(now, *last_time)>interval )
-    {
-        timeout = 1;
-        *last_time = now;
-    }
-
-    return timeout;
-}
-
-/* compatible with MQTT publish callback function */
+/* process publisher(uplink) data */
 void pub_connect_callback(struct mosquitto *mosq, void *userdata, int result)
 {
-    mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
-    hwinfo_t               *hwinfo = (hwinfo_t *)ctx->userdata;
+    mqtt_ctx_t             *mqtt = (mqtt_ctx_t *)userdata;
+    iotd_ctx_t             *iotd = to_iotd(mqtt);
     int                     rv = 0;
     char                    msg[128];
     float                   temp = 0.0; /* temperature */
@@ -238,24 +217,19 @@
     static time_t           last_time = 0;
 
     /* publish time is not arrive */
-    if( !check_timeout(&last_time, ctx->interval) )
-    {
+    if( !check_timeout(&last_time, mqtt->interval) )
         return ;
-    }
 
-    log_debug("Publish topic '%s'\n", ctx->pubTopic);
-
-    if( hwinfo->ds18b20 )
+    log_debug("Publish topic '%s'\n", mqtt->pubTopic);
+    if( iotd->hwinfo.ds18b20 )
     {
-        memset(msg, 0, sizeof(msg));
-
         log_debug("DS18B20 temperature sensor enabled, start broadcast it\n");
-
         if( ds18b20_get_temperature(&temp)<0 )
             return ;
 
+        memset(msg, 0, sizeof(msg));
         snprintf(msg, sizeof(msg), "{\"temperature\":%.2f}", temp);
-        rv = mosquitto_publish(mosq, NULL, ctx->pubTopic, strlen(msg), msg, ctx->pubQos, retain);
+        rv = mosquitto_publish(mosq, NULL, mqtt->pubTopic, strlen(msg), msg, mqtt->pubQos, retain);
         if( rv )
         {
             log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv);
@@ -269,27 +243,7 @@
     return ;
 }
 
-void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result)
-{
-    mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
-
-    if( result )
-    {
-        log_error("mosquitto connect to broker server failed, rv=%d\n", result);
-        return ;
-    }
-
-    log_info("mosquitto connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port);
-    mosquitto_subscribe(mosq, NULL, ctx->subTopic, ctx->subQos);
-}
-
-void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
-{
-    mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
-
-    log_warn("mosquitto disconnect to broker server[%s:%d], reason=%d\n", ctx->host, ctx->port, result);
-}
-
+/* process subscriber(downlink) data */
 void proc_json_items(cJSON *root, mqtt_ctx_t *ctx)
 {
     cJSON *item_method;
@@ -377,12 +331,34 @@
         return ;
     }
 
+    /* process receive message data */
     proc_json_items(root, ctx);
 
     /* must delete it, or it will result memory leak */
     cJSON_Delete(root);
 
     return ;
+}
+
+void mqtt_connect_callback(struct mosquitto *mosq, void *userdata, int result)
+{
+    mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
+
+    if( result )
+    {
+        log_error("mosquitto connect to broker server failed, rv=%d\n", result);
+        return ;
+    }
+
+    log_info("mosquitto connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port);
+    mosquitto_subscribe(mosq, NULL, ctx->subTopic, ctx->subQos);
+}
+
+void mqtt_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
+{
+    mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
+
+    log_warn("mosquitto disconnect to broker server[%s:%d], reason=%d\n", ctx->host, ctx->port, result);
 }
 
 void *thingsboard_worker(void *args)
@@ -412,8 +388,8 @@
     }
 
     /* set callback functions */
-    mosquitto_connect_callback_set(mosq, sub_connect_callback);
-    mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback);
+    mosquitto_connect_callback_set(mosq, mqtt_connect_callback);
+    mosquitto_disconnect_callback_set(mosq, mqtt_disconnect_callback);
     mosquitto_message_callback_set(mosq, sub_message_callback);
 
     while( !g_signal.stop )
@@ -428,9 +404,10 @@
 
         while(!g_signal.stop)
         {
+            /* periodically publish and report data */
             pub_connect_callback(mosq, ctx, MOSQ_ERR_SUCCESS);
 
-            /* MQTT process in nonblock mode, timeout for 1s */
+            /* MQTT process in Non-blocking mode, timeout for 1s */
             if( MOSQ_ERR_SUCCESS != (rv = mosquitto_loop(mosq, 1000, 1)) )
             {
                 log_warn("MQTT loop error: %s, reconnecting...\n", mosquitto_strerror(rv));
diff --git a/project/thingsboard/config.c b/project/thingsboard/config.c
index 9fa08e0..621f372 100644
--- a/project/thingsboard/config.c
+++ b/project/thingsboard/config.c
@@ -14,12 +14,15 @@
 #include "logger.h"
 #include "iniparser.h"
 
-int parser_conf(const char *conf_file, mqtt_ctx_t *ctx, int debug)
+int parser_conf(const char *conf_file, iotd_ctx_t *ctx, int debug)
 {
     dictionary          *ini;
     const char          *str;
     int                  val;
     int                  rv = 0;
+    logger_t            *logger;
+    hwinfo_t            *hwinfo;
+    mqtt_ctx_t          *mqtt;
 
     if( !conf_file || !ctx )
     {
@@ -28,6 +31,9 @@
     }
 
     memset(ctx, 0, sizeof(*ctx));
+    logger = &ctx->logger;
+    hwinfo = &ctx->hwinfo;
+    mqtt = &ctx->mqtt;
 
     ini = iniparser_load(conf_file);
     if( !ini )
@@ -42,18 +48,18 @@
     if( !debug )
     {
         str = iniparser_getstring(ini, "logger:file", "/tmp/thingsboard.log");
-        strncpy(ctx->logfile, str, sizeof(ctx->logfile));
-        ctx->logsize = iniparser_getint(ini, "logger:size", 1024);
-        ctx->loglevel = iniparser_getint(ini, "logger:level", LOG_LEVEL_INFO);
+        strncpy(logger->logfile, str, sizeof(logger->logfile));
+        logger->logsize = iniparser_getint(ini, "logger:size", 1024);
+        logger->loglevel = iniparser_getint(ini, "logger:level", LOG_LEVEL_INFO);
     }
     else
     {
-        strncpy(ctx->logfile, "console", sizeof(ctx->logfile));
-        ctx->loglevel = LOG_LEVEL_DEBUG;
-        ctx->logsize = 0;
+        strncpy(logger->logfile, "console", sizeof(logger->logfile));
+        logger->loglevel = LOG_LEVEL_DEBUG;
+        logger->logsize = 0;
     }
 
-    if( log_open(ctx->logfile, ctx->loglevel, ctx->logsize, LOG_LOCK_DISABLE) < 0 )
+    if( log_open(logger->logfile, logger->loglevel, logger->logsize, LOG_LOCK_DISABLE) < 0 )
     {
         fprintf(stderr, "Logger system initialise failure\n");
         return -2;
@@ -73,8 +79,8 @@
         goto cleanup;
     }
     /* cJSON parser ID will get ""  */
-    snprintf(ctx->devid, sizeof(ctx->devid), "\"%s\"", str);
-    log_info("Parser device ID [%s]\n", ctx->devid);
+    snprintf(mqtt->devid, sizeof(mqtt->devid), "\"%s\"", str);
+    log_info("Parser device ID [%s]\n", mqtt->devid);
 
 
     /*+------------------------------------------------------+
@@ -82,43 +88,43 @@
      *+------------------------------------------------------+*/
 
     /* relay */
-    ctx->hwconf.relay=iniparser_getint(ini, "hardware:relay", 0);
-    if( !ctx->hwconf.relay )
+    hwinfo->relay=iniparser_getint(ini, "hardware:relay", 0);
+    if( !hwinfo->relay )
         log_warn("Parser relay module disabled\n");
     else
         log_info("Parser relay module enabled\n");
 
     /* RGB 3-colors LED */
-    ctx->hwconf.led=iniparser_getint(ini, "hardware:rgbled", 0);
-    if( !ctx->hwconf.led )
+    hwinfo->led=iniparser_getint(ini, "hardware:rgbled", 0);
+    if( !hwinfo->led )
         log_warn("Parser RGB 3-colors Led module disabled\n");
     else
         log_info("Parser RGB 3-colors Led module enabled\n");
 
     /* beeper */
-    ctx->hwconf.beeper=iniparser_getint(ini, "hardware:beep", 0);
-    if( !ctx->hwconf.beeper )
+    hwinfo->beeper=iniparser_getint(ini, "hardware:beep", 0);
+    if( !hwinfo->beeper )
         log_warn("Parser beeper module disabled\n");
     else
         log_info("Parser beeper module enabled\n");
 
     /* DS18B20 temperature module */
-    ctx->hwconf.ds18b20=iniparser_getint(ini, "hardware:ds18b20", 0);
-    if( !ctx->hwconf.ds18b20 )
+    hwinfo->ds18b20=iniparser_getint(ini, "hardware:ds18b20", 0);
+    if( !hwinfo->ds18b20 )
         log_warn("Parser DS18B20 temperature module disabled\n");
     else
         log_info("Parser DS18B20 temperature module enabled\n");
 
     /* SHT20 temperature and hummidity module */
-    ctx->hwconf.sht2x=iniparser_getint(ini, "hardware:sht2x", 0);
-    if( !ctx->hwconf.sht2x )
+    hwinfo->sht2x=iniparser_getint(ini, "hardware:sht2x", 0);
+    if( !hwinfo->sht2x )
         log_warn("Parser SHT2X temperature and hummidity module disabled\n");
     else
         log_info("Parser SHT2X temperature and hummidity module enabled\n");
 
     /* TSL2561 light intensity sensor module */
-    ctx->hwconf.tsl2561=iniparser_getint(ini, "hardware:tsl2561", 0);
-    if( !ctx->hwconf.tsl2561 )
+    hwinfo->tsl2561=iniparser_getint(ini, "hardware:tsl2561", 0);
+    if( !hwinfo->tsl2561 )
         log_warn("Parser TSL2561 light intensity sensor module disabled\n");
     else
         log_info("Parser TSL2561 light intensity sensor module enabled\n");
@@ -133,7 +139,7 @@
         rv = -4;
         goto cleanup;
     }
-    strncpy(ctx->host, str, sizeof(ctx->host) );
+    strncpy(mqtt->host, str, sizeof(mqtt->host) );
 
     if( (val=iniparser_getint(ini, "broker:port", -1)) < 0 )
     {
@@ -141,23 +147,23 @@
         rv = -5;
         goto cleanup;
     }
-    ctx->port = val;
-    log_info("Parser MQTT broker server [%s:%d]\n", ctx->host, ctx->port);
+    mqtt->port = val;
+    log_info("Parser MQTT broker server [%s:%d]\n", mqtt->host, mqtt->port);
 
     str=iniparser_getstring(ini, "broker:token", NULL);
-    strncpy(ctx->token, str, sizeof(ctx->uid) );
+    strncpy(mqtt->token, str, sizeof(mqtt->uid) );
 
     str=iniparser_getstring(ini, "broker:username", NULL);
-    strncpy(ctx->uid, str, sizeof(ctx->uid) );
+    strncpy(mqtt->uid, str, sizeof(mqtt->uid) );
 
     str=iniparser_getstring(ini, "broker:password", NULL);
-    strncpy(ctx->pwd, str, sizeof(ctx->pwd) );
+    strncpy(mqtt->pwd, str, sizeof(mqtt->pwd) );
 
-    if( ctx->uid && ctx->pwd )
-        log_info("Parser broker author by [%s:%s]\n", ctx->uid, ctx->pwd);
+    if( mqtt->uid && mqtt->pwd )
+        log_info("Parser broker author by [%s:%s]\n", mqtt->uid, mqtt->pwd);
 
-    ctx->keepalive = iniparser_getint(ini, "broker:keepalive", DEF_KEEPALIVE);
-    log_info("Parser broker keepalive timeout [%d] seconds\n", ctx->keepalive);
+    mqtt->keepalive = iniparser_getint(ini, "broker:keepalive", DEF_KEEPALIVE);
+    log_info("Parser broker keepalive timeout [%d] seconds\n", mqtt->keepalive);
 
     /*+------------------------------------------------------+
      *|             parser publisher settings                |
@@ -169,11 +175,11 @@
         rv = -6;
         goto cleanup;
     }
-    strncpy(ctx->pubTopic, str, sizeof(ctx->pubTopic) );
+    strncpy(mqtt->pubTopic, str, sizeof(mqtt->pubTopic) );
 
-    ctx->pubQos = iniparser_getint(ini, "publisher:pubQos", DEF_QOS);
-    ctx->interval = iniparser_getint(ini, "publisher:interval", DEF_PUBINTERVAL);
-    log_info("Parser publisher topic \"%s\" with Qos[%d] interval[%d]\n", ctx->pubTopic, ctx->pubQos, ctx->interval);
+    mqtt->pubQos = iniparser_getint(ini, "publisher:pubQos", DEF_QOS);
+    mqtt->interval = iniparser_getint(ini, "publisher:interval", DEF_PUBINTERVAL);
+    log_info("Parser publisher topic \"%s\" with Qos[%d] interval[%d]\n", mqtt->pubTopic, mqtt->pubQos, mqtt->interval);
 
     /*+------------------------------------------------------+
      *|             parser subscriber settings               |
@@ -185,10 +191,10 @@
         rv = -7;
         goto cleanup;
     }
-    strncpy(ctx->subTopic, str, sizeof(ctx->subTopic) );
+    strncpy(mqtt->subTopic, str, sizeof(mqtt->subTopic) );
 
-    ctx->subQos = iniparser_getint(ini, "subsciber:subQos", DEF_QOS);
-    log_info("Parser subscriber topic \"%s\" with Qos[%d]\n", ctx->subTopic, ctx->subQos);
+    mqtt->subQos = iniparser_getint(ini, "subsciber:subQos", DEF_QOS);
+    log_info("Parser subscriber topic \"%s\" with Qos[%d]\n", mqtt->subTopic, mqtt->subQos);
 
 cleanup:
     if( ini )
diff --git a/project/thingsboard/config.h b/project/thingsboard/config.h
index 07a6a39..acb008b 100644
--- a/project/thingsboard/config.h
+++ b/project/thingsboard/config.h
@@ -13,6 +13,8 @@
 #ifndef  __CONF_H_
 #define  __CONF_H_
 
+#include <stddef.h>
+
 enum
 {
     Qos0, /* 发送者只发送一次消息,不进行重试,Broker不会返回确认消息。在Qos0情况下,Broker可能没有接受到消息 */
@@ -24,49 +26,59 @@
 #define DEF_PUBINTERVAL     120
 #define DEF_QOS             Qos2
 
-typedef struct hwconf_s
+typedef struct hwinfo_s
 {
-    int            relay;   /* relay aviable or not.   0:Disable  1: Enable */
-    int            led;     /* RGB led aviable or not. 0:Disable  1: Enable */
-    int            beeper;  /* beeper aviable or not.  0:Disable  1: Enable */
-    int            ds18b20; /* DS1B820 aviable or not. 0:Disable  1: Enable */
-    int            sht2x;   /* SHT20  aviable or not.  0:Disable  1: Enable */
-    int            tsl2561; /* TSL2561 aviable or not.  0:Disable  1: Enable */
-} hwconf_t;
+    int             relay;   /* relay aviable or not.   0:Disable  1: Enable */
+    int             led;     /* RGB led aviable or not. 0:Disable  1: Enable */
+    int             beeper;  /* beeper aviable or not.  0:Disable  1: Enable */
+    int             ds18b20; /* DS1B820 aviable or not. 0:Disable  1: Enable */
+    int             sht2x;   /* SHT20  aviable or not.  0:Disable  1: Enable */
+    int             tsl2561; /* TSL2561 aviable or not. 0:Disable  1: Enable */
+} hwinfo_t;
 
+/* logger settings */
+typedef struct logger_s
+{
+    char            logfile[128]; /* logger record file */
+    int             loglevel;     /* logger level  */
+    int             logsize;      /* logger file maxsize, oversize will rollback */
+} logger_t;
 
 typedef struct mqtt_ctx_s
 {
-    char          devid[32];    /*  device ID */
-
-    /* hardware configuration */
-    hwconf_t      hwconf;
-
-    /* logger settings */
-    char          logfile[128]; /* logger record file */
-    int           loglevel;     /* logger level  */
-    int           logsize;      /* logger file maxsize, oversize will rollback */
+    char			devid[32];    /*  device ID */
+    void           *userdata;      /* user data pointer */
 
     /* Broker settings  */
-    char          host[128];  /* MQTT broker server name  */
-    int           port;       /* MQTT broker listen port  */
-    char          uid[64];    /* username */
-    char          pwd[64];    /* password */
-    char          token[64];  /* token */
-    int           keepalive;  /* MQTT broker send PING message to subsciber/publisher keepalive timeout<seconds> */
+    char            host[128];  /* MQTT broker server name  */
+    int             port;       /* MQTT broker listen port  */
+    char            uid[64];    /* username */
+    char            pwd[64];    /* password */
+    char            token[64];  /* token */
+    int             keepalive;  /* MQTT broker send PING message to subsciber/publisher keepalive timeout<seconds> */
 
     /* Publisher settings */
-    char          pubTopic[256]; /* Publisher topic  */
-    int           pubQos;        /* Publisher Qos */
-    int           interval ;     /* Publish sensor data interval time, unit seconds */
+    char            pubTopic[256]; /* Publisher topic  */
+    int             pubQos;        /* Publisher Qos */
+    int             interval ;     /* Publish sensor data interval time, unit seconds */
 
     /* Subscriber settings */
-    char          subTopic[256]; /* Subscriber topic */
-    int           subQos;        /* Subscriber Qos  */
+    char            subTopic[256]; /* Subscriber topic */
+    int             subQos;        /* Subscriber Qos  */
 } mqtt_ctx_t;
 
+typedef struct iotd_ctx_s
+{
+    logger_t        logger;
+    hwinfo_t        hwinfo;
+    mqtt_ctx_t      mqtt;
+} iotd_ctx_t;
 
-extern int parser_conf(const char *conf_file, mqtt_ctx_t *ctx, int debug);
+/* get iotd_ctx address by mqtt_ctx address */
+#define container_of(ptr, type, member) ((type *)((char *)(ptr) - offsetof(type, member)))
+#define to_iotd(ctx)	container_of(ctx, iotd_ctx_t, mqtt);
 
-#endif   /* ----- #ifndef _CONF_H_  ----- */
+extern int parser_conf(const char *conf_file, iotd_ctx_t *ctx, int debug);
+
+#endif
 
diff --git a/project/thingsboard/etc/thingsboard.service b/project/thingsboard/etc/thingsboard.service
index b02edce..848a459 100644
--- a/project/thingsboard/etc/thingsboard.service
+++ b/project/thingsboard/etc/thingsboard.service
@@ -6,6 +6,7 @@
 Type=simple
 ExecStartPre=/bin/rm -f /tmp/.thingsboard.pid /var/log/thingsboard.log
 ExecStart=/usr/bin/thingsboard -c /etc/thingsboard.conf
+ExecStop=/bin/rm -f /tmp/.thingsboard.pid
 
 Restart=always
 RestartSec=2
diff --git a/project/thingsboard/thingsboard.c b/project/thingsboard/thingsboard.c
index 70c872a..6611f22 100644
--- a/project/thingsboard/thingsboard.c
+++ b/project/thingsboard/thingsboard.c
@@ -31,8 +31,7 @@
 #define PROG_VERSION               "v1.0.0"
 #define DAEMON_PIDFILE             "/tmp/.thingsboard.pid"
 
-void *thingsboard_subsciber(void *args);
-void *thingsboard_publisher(void *args);
+void *thingsboard_worker(void *args);
 
 static void program_usage(char *progname)
 {
@@ -54,7 +53,7 @@
 {
     int                daemon = 0;
     pthread_t          tid;
-    mqtt_ctx_t         ctx;
+    iotd_ctx_t         ctx;
     char               *conf_file="/etc/thingsboard.conf";
     int                debug = 0;
     int                opt;
@@ -99,7 +98,6 @@
             default:
                 break;
         }
-
     }
 
     /* parser configure file */
@@ -119,34 +117,18 @@
     /* initial mosquitto library */
     mosquitto_lib_init();
 
-    /*
-     * +--------------------------------+
-     * |    MQTT Subscriber Thread      |
-     * +--------------------------------+
-     */
-    if( thread_start(&tid, thingsboard_subsciber, &ctx ) < 0 )
+    /* start thingsboard MQTT thread */
+    if( thread_start(&tid, thingsboard_worker, &ctx.mqtt ) < 0 )
     {
-        log_error("Start MQTT subsciber worker thread failure\n");
+        log_error("Start thingsboard MQTT worker thread failure\n");
         goto cleanup;
     }
-    log_info("Start MQTT subsciber worker thread ok\n");
+    log_info("Start thingsboard MQTT worker thread ok\n");
 
-    /*
-     * +--------------------------------+
-     * |     MQTT publisher Thread      |
-     * +--------------------------------+
-     */
-    if( thread_start(&tid, thingsboard_publisher, &ctx) < 0 )
+    /* control thread loop */
+    while( !g_signal.stop )
     {
-        log_error("Start MQTT publisher worker thread failure\n");
-        goto cleanup;
-    }
-    log_info("Start MQTT publisher worker thread ok\n");
-
-
-    while( ! g_signal.stop )
-    {
-        msleep(1000);
+        sleep(1);
     }
 
 cleanup:
@@ -156,35 +138,39 @@
     return 0;
 }
 
+
+/*
+ * +--------------------------------+
+ * |    ThingsBoard MQTT Thread     |
+ * +--------------------------------+
+ */
+
+/* process publisher(uplink) data */
 void pub_connect_callback(struct mosquitto *mosq, void *userdata, int result)
 {
-    mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
+    mqtt_ctx_t             *mqtt = (mqtt_ctx_t *)userdata;
+    iotd_ctx_t             *iotd = to_iotd(mqtt);
     int                     rv = 0;
     char                    msg[128];
     float                   temp = 0.0; /* temperature */
     float                   rh = 0.0;   /* relative humidity */
     int                     retain = 0;
+    static time_t           last_time = 0;
 
-    if( result )
-    {
-        log_error("Publisher connect to broker server[%s:%d] failed, rv=%d\n", ctx->host, ctx->port, result);
+    /* publish time is not arrive */
+    if( !check_timeout(&last_time, mqtt->interval) )
         return ;
-    }
 
-    log_info("Publisher connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port);
-    log_debug("Publish topic '%s'\n", ctx->pubTopic);
-
-    if( ctx->hwconf.sht2x )
+    log_debug("Publish topic '%s'\n", mqtt->pubTopic);
+    if( iotd->hwinfo.sht2x )
     {
-        memset(msg, 0, sizeof(msg));
-
         log_debug("SHT2X temperature and humidity sensor enabled, start broadcast it\n");
-
         if( sht2x_get_temp_humidity(&temp, &rh)<0 )
             return ;
 
+        memset(msg, 0, sizeof(msg));
         snprintf(msg, sizeof(msg), "{\"temperature\":%.2f, \"humidity\":%.2f}", temp, rh);
-        rv = mosquitto_publish(mosq, NULL, ctx->pubTopic, strlen(msg), msg, ctx->pubQos, retain);
+        rv = mosquitto_publish(mosq, NULL, mqtt->pubTopic, strlen(msg), msg, mqtt->pubQos, retain);
         if( rv )
         {
             log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv);
@@ -195,83 +181,10 @@
         }
     }
 
-    msleep(500);
-    log_info("Publisher broadcast over and disconnect broker now\n", ctx->host, ctx->port);
-    mosquitto_disconnect(mosq);
-
     return ;
 }
 
-void *thingsboard_publisher(void *args)
-{
-    mqtt_ctx_t             *ctx = (mqtt_ctx_t *)args;
-    struct mosquitto       *mosq;
-    bool                    session = true;
-
-    mosq = mosquitto_new(ctx->devid, session, ctx);
-    if( !mosq )
-    {
-        log_error("mosquitto_new failure\n");
-        return NULL;
-    }
-
-    /* connnect to broker by token or uid/pwd */
-    if( strlen(ctx->token ) > 0)
-    {
-        log_info("Using token authentication\n");
-        mosquitto_username_pw_set(mosq, ctx->token, NULL);
-    }
-    else if( strlen(ctx->uid)> 0  && strlen(ctx->pwd)> 0 )
-    {
-        log_info("Using username/password authentication\n");
-        mosquitto_username_pw_set(mosq, ctx->uid, ctx->pwd);
-    }
-
-    /* set callback functions */
-    mosquitto_connect_callback_set(mosq, pub_connect_callback);
-
-    while( !g_signal.stop )
-    {
-        /* connect to MQTT broker  */
-        if( mosquitto_connect(mosq, ctx->host, ctx->port, ctx->keepalive) )
-        {
-            log_error("Publisher connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno));
-            msleep(1000);
-            continue;
-        }
-
-        /* -1: use default timeout 1000ms  1: unused */
-        mosquitto_loop_forever(mosq, -1, 1);
-
-        /* Publisher broadcast sensors data message interval time, unit seconds */
-        sleep( ctx->interval );
-    }
-
-    mosquitto_destroy(mosq);
-    return NULL;
-}
-
-void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result)
-{
-    mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
-
-    if( result )
-    {
-        log_error("Subscriber connect to broker server failed, rv=%d\n", result);
-        return ;
-    }
-
-    log_info("Subscriber connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port);
-    mosquitto_subscribe(mosq, NULL, ctx->subTopic, ctx->subQos);
-}
-
-void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
-{
-    mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
-
-    log_warn("Subscriber disconnect to broker server[%s:%d], reason=%d\n", ctx->host, ctx->port, result);
-}
-
+/* process subscriber(downlink) data */
 void proc_json_items(cJSON *root, mqtt_ctx_t *ctx)
 {
     cJSON *item_method;
@@ -363,6 +276,7 @@
         return ;
     }
 
+    /* process receive message data */
     proc_json_items(root, ctx);
 
     /* must delete it, or it will result memory leak */
@@ -371,11 +285,33 @@
     return ;
 }
 
-void *thingsboard_subsciber(void *args)
+void mqtt_connect_callback(struct mosquitto *mosq, void *userdata, int result)
+{
+    mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
+
+    if( result )
+    {
+        log_error("mosquitto connect to broker server failed, rv=%d\n", result);
+        return ;
+    }
+
+    log_info("mosquitto connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port);
+    mosquitto_subscribe(mosq, NULL, ctx->subTopic, ctx->subQos);
+}
+
+void mqtt_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
+{
+    mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
+
+    log_warn("mosquitto disconnect to broker server[%s:%d], reason=%d\n", ctx->host, ctx->port, result);
+}
+
+void *thingsboard_worker(void *args)
 {
     mqtt_ctx_t             *ctx = (mqtt_ctx_t *)args;
     struct mosquitto       *mosq;
     bool                    session = true;
+    int                     rv = 0;
 
     mosq = mosquitto_new(ctx->devid, session, ctx);
     if( !mosq )
@@ -387,7 +323,7 @@
     /* connnect to broker by token or uid/pwd */
     if( strlen(ctx->token ) > 0)
     {
-        log_info("Using token authentication\n");
+        log_info("Using token authentication: '%s'\n", ctx->token);
         mosquitto_username_pw_set(mosq, ctx->token, NULL);
     }
     else if( strlen(ctx->uid)> 0  && strlen(ctx->pwd)> 0 )
@@ -397,8 +333,8 @@
     }
 
     /* set callback functions */
-    mosquitto_connect_callback_set(mosq, sub_connect_callback);
-    mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback);
+    mosquitto_connect_callback_set(mosq, mqtt_connect_callback);
+    mosquitto_disconnect_callback_set(mosq, mqtt_disconnect_callback);
     mosquitto_message_callback_set(mosq, sub_message_callback);
 
     while( !g_signal.stop )
@@ -406,13 +342,26 @@
         /* connect to MQTT broker  */
         if( mosquitto_connect(mosq, ctx->host, ctx->port, ctx->keepalive) )
         {
-            log_error("Subscriber connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno));
-            msleep(1000);
+            log_error("mosquitto connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno));
+            sleep(1);
             continue;
         }
 
-        /* -1: use default timeout 1000ms  1: unused */
-        mosquitto_loop_forever(mosq, -1, 1);
+        while(!g_signal.stop)
+        {
+            /* periodically publish and report data */
+            pub_connect_callback(mosq, ctx, MOSQ_ERR_SUCCESS);
+
+            /* MQTT process in Non-blocking mode, timeout for 1s */
+            if( MOSQ_ERR_SUCCESS != (rv = mosquitto_loop(mosq, 1000, 1)) )
+            {
+                log_warn("MQTT loop error: %s, reconnecting...\n", mosquitto_strerror(rv));
+                break;
+            }
+        }
+
+        mosquitto_disconnect(mosq);
+        sleep(1);
     }
 
     mosquitto_destroy(mosq);

--
Gitblit v1.9.1