From 68826376ee5f47783c644c6604f4411ec747cd7e Mon Sep 17 00:00:00 2001
From: guowenxue <guowenxue@gmail.com>
Date: Fri, 14 Nov 2025 23:52:16 +0800
Subject: [PATCH] Add UDP DNS client source code

---
 project/4.mqttd/mqttd.c |  434 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 434 insertions(+), 0 deletions(-)

diff --git a/project/4.mqttd/mqttd.c b/project/4.mqttd/mqttd.c
new file mode 100644
index 0000000..a22a6b9
--- /dev/null
+++ b/project/4.mqttd/mqttd.c
@@ -0,0 +1,434 @@
+/*********************************************************************************
+ *      Copyright:  (C) 2019 LingYun IoT System Studio
+ *                  All rights reserved.
+ *
+ *       Filename:  main.c
+ *    Description:  This file
+ *
+ *        Version:  1.0.0(29/01/19)
+ *         Author:  Guo Wenxue <guowenxue@gmail.com>
+ *      ChangeLog:  1, Release initial version on "29/01/19 15:34:41"
+ *
+ ********************************************************************************/
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <time.h>
+#include <getopt.h>
+#include <libgen.h>
+#include <string.h>
+#include <errno.h>
+
+#include <mosquitto.h>
+#include <cjson/cJSON.h>
+
+#include "logger.h"
+#include "proc.h"
+#include "config.h"
+#include "ds18b20.h"
+#include "sht20.h"
+#include "leds.h"
+
+#define PROG_VERSION               "v1.0.0"
+#define DAEMON_PIDFILE             "/tmp/.mqtt.pid"
+
+void *mqtt_sub_worker(void *args);
+void *mqtt_pub_worker(void *args);
+
+static void program_usage(char *progname)
+{
+
+	printf("Usage: %s [OPTION]...\n", progname);
+	printf(" %s is LingYun studio MQTT daemon program running on RaspberryPi\n", progname);
+
+	printf("\nMandatory arguments to long options are mandatory for short options too:\n");
+	printf(" -d[debug   ]  Running in debug mode\n");
+	printf(" -c[conf    ]  Specify configure file\n");
+	printf(" -h[help    ]  Display this help information\n");
+	printf(" -v[version ]  Display the program version\n");
+
+	printf("\n%s version %s\n", progname, PROG_VERSION);
+	return;
+}
+
+int main (int argc, char **argv)
+{
+	int                daemon = 1;
+	pthread_t          tid;
+	mqtt_ctx_t         ctx;
+	char               *conf_file="/etc/mqttd.conf";
+	int                debug = 0;
+	int                opt;
+	char              *progname=NULL;
+
+	struct option long_options[] = {
+		{"conf", required_argument, NULL, 'c'},
+		{"debug", no_argument, NULL, 'd'},
+		{"version", no_argument, NULL, 'v'},
+		{"help", no_argument, NULL, 'h'},
+		{NULL, 0, NULL, 0}
+	};
+
+	progname = (char *)basename(argv[0]);
+
+	/* parser the command line parameters */
+	while ((opt = getopt_long(argc, argv, "c:dvh", long_options, NULL)) != -1)
+	{
+		switch (opt)
+		{
+			case 'c': /* Set configure file */
+				conf_file = optarg;
+				break;
+
+			case 'd': /* Set debug running */
+				daemon = 0;
+				debug = 1;
+				break;
+
+			case 'v':  /* Get software version */
+				printf("%s version %s\n", progname, PROG_VERSION);
+				return 0;
+
+			case 'h':  /* Get help information */
+				program_usage(progname);
+				return 0;
+
+			default:
+				break;
+		}
+
+	}
+
+	if( !conf_file )
+		debug = 1;
+
+	/* parser configure file */
+	if( mqttd_parser_conf(conf_file, &ctx, debug)<0 )
+	{
+		fprintf(stderr, "Parser mqtted configure file failure\n");
+		return -2;
+	}
+
+	/* install signal proc handler */
+	install_default_signal();
+
+	/* check program already running or not, if already running then exit, or set running as daemon */
+	if( check_set_program_running(daemon, DAEMON_PIDFILE) < 0 )
+		goto cleanup;
+
+	/* initial mosquitto library */
+	mosquitto_lib_init();
+
+	/* create MQTT subsciber thread  */
+	if( thread_start(&tid, mqtt_sub_worker, &ctx ) < 0 )
+	{
+		log_error("Start MQTT subsciber worker thread failure\n");
+		goto cleanup;
+	}
+	log_info("Start MQTT subsciber worker thread ok\n");
+
+	/* create MQTT publisher thread  */
+	if( thread_start(&tid, mqtt_pub_worker, &ctx) < 0 )
+	{
+		log_error("Start MQTT publisher worker thread failure\n");
+		goto cleanup;
+	}
+	log_info("Start MQTT publisher worker thread ok\n");
+
+	while( ! g_signal.stop )
+	{
+		msleep(1000);
+	}
+
+cleanup:
+	mosquitto_lib_cleanup();
+	log_close();
+
+	return 0;
+} /* ----- End of main() ----- */
+
+void pub_connect_callback(struct mosquitto *mosq, void *userdata, int result)
+{
+	mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
+	int                     rv = 0;
+	char                    msg[128];
+	float                   temp = 0.0; /* temperature */
+	float                   rh = 0.0;   /* relative humidity */
+	int                     retain = 0;
+
+	if( result )
+	{
+		log_error("Publisher connect to broker server[%s:%d] failed, rv=%d\n", ctx->host, ctx->port, result);
+		return ;
+	}
+
+	log_info("Publisher connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port);
+	log_debug("Publish topic '%s'\n", ctx->pubTopic);
+
+	if( ctx->hwconf.ds18b20 )
+	{
+		memset(msg, 0, sizeof(msg));
+
+		log_debug("DS18B20 temperature sensor enabled, start broadcast it\n");
+
+		if( ds18b20_get_temperature(&temp) < 0 )
+			snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\" }", ctx->devid);
+		else
+			snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\" }", ctx->devid, temp);
+
+		rv = mosquitto_publish(mosq, NULL, ctx->pubTopic, strlen(msg), msg, ctx->pubQos, retain);
+		if( rv )
+		{
+			log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv);
+		}
+		else
+		{
+			log_info("Publisher broadcast message '%s' ok\n", msg);
+		}
+	}
+
+	if( ctx->hwconf.sht2x )
+	{
+		memset(msg, 0, sizeof(msg));
+
+		log_debug("SHT2X temperature and humidity sensor enabled, start broadcast it\n");
+
+		if( sht2x_get_temp_humidity(&temp, &rh) < 0 )
+			snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"error\", \"RH\":\"error\" }", ctx->devid);
+		else
+			snprintf(msg, sizeof(msg), "{ \"id\":%s, \"temp\":\"%.2f\", \"RH\":\"%.2f\" }", ctx->devid, temp, rh);
+
+		rv = mosquitto_publish(mosq, NULL, ctx->pubTopic, strlen(msg), msg, ctx->pubQos, retain);
+		if( rv )
+		{
+			log_error("Publisher broadcast message '%s' failure: %d\n", msg, rv);
+		}
+		else
+		{
+			log_info("Publisher broadcast message '%s' ok\n", msg);
+		}
+	}
+
+	log_info("Publisher broadcast over and disconnect broker now\n", ctx->host, ctx->port);
+	mosquitto_disconnect(mosq);
+
+	return ;
+}
+
+
+void *mqtt_pub_worker(void *args)
+{
+	mqtt_ctx_t             *ctx = (mqtt_ctx_t *)args;
+	struct mosquitto       *mosq;
+	bool                    session = true;
+
+
+	mosq = mosquitto_new(NULL, session, ctx);
+	if( !mosq )
+	{
+		log_error("mosquitto_new failure\n");
+		return NULL;
+	}
+
+	/* set connnect to broker username and password  */
+	if( strlen(ctx->uid)> 0  && strlen(ctx->pwd)> 0 )
+		mosquitto_username_pw_set(mosq, ctx->uid, ctx->pwd);
+
+	/* set callback functions */
+	mosquitto_connect_callback_set(mosq, pub_connect_callback);
+
+	while( !g_signal.stop )
+	{
+		/* connect to MQTT broker  */
+		if( mosquitto_connect(mosq, ctx->host, ctx->port, ctx->keepalive) )
+		{
+			log_error("Publisher connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno));
+			msleep(1000);
+			continue;
+		}
+
+		/* -1: use default timeout 1000ms  1: unused */
+		mosquitto_loop_forever(mosq, -1, 1);
+
+		/* Publisher broadcast sensors data message interval time, unit seconds */
+		sleep( ctx->interval );
+	}
+
+	mosquitto_destroy(mosq);
+	return NULL;
+}
+
+void sub_connect_callback(struct mosquitto *mosq, void *userdata, int result)
+{
+	mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
+
+	if( result )
+	{
+		log_error("Subscriber connect to broker server failed, rv=%d\n", result);
+		return ;
+	}
+
+	log_info("Subscriber connect to broker server[%s:%d] successfully\n", ctx->host, ctx->port);
+	mosquitto_subscribe(mosq, NULL, ctx->subTopic, ctx->subQos);
+}
+
+void sub_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
+{
+	mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
+
+	log_warn("Subscriber disconnect to broker server[%s:%d], reason=%d\n", ctx->host, ctx->port, result);
+}
+
+static inline void mqtt_turn_led(int which, char *cmd)
+{
+	if( strcasestr(cmd, "on") )
+		turn_led(which, ON);
+	else if( strcasestr(cmd, "off") )
+		turn_led(which, OFF);
+}
+
+void proc_json_items(cJSON *root, mqtt_ctx_t *ctx)
+{
+	int                    i;
+	cJSON                 *item;
+	cJSON                 *array;
+	hwconf_t              *hwconf = &ctx->hwconf;
+
+	if( !root )
+	{
+		log_error("Invalid input arguments $root\n");
+		return ;
+	}
+
+	for( i=0; i<cJSON_GetArraySize(root); i++ )
+	{
+		item = cJSON_GetArrayItem(root, i);
+		if( !item )
+			break;
+
+		/* if item is cJSON_Object, then recursive call proc_json */
+		if( cJSON_Object == item->type )
+		{
+			proc_json_items(item, ctx);
+		}
+		else if( cJSON_Array == item->type )
+		{
+			/* RGB colors led control: {"id":"RPi3B#01", "leds":[{"red":"on","green":"off","blue":"on"}]} */
+			if( hwconf->led && !strcasecmp(item->string, "leds") )
+			{
+				array = cJSON_GetArrayItem(item, 0);
+				if( NULL != array )
+				{
+					cJSON        *led_item;
+
+					if( NULL != (led_item=cJSON_GetObjectItem(array , "red")) )
+					{
+						log_info("turn red led '%s'\n", led_item->valuestring);
+						mqtt_turn_led(LED_R, led_item->valuestring);
+					}
+
+					if( NULL != (led_item=cJSON_GetObjectItem(array , "green")) )
+					{
+						log_info("turn green led '%s'\n", led_item->valuestring);
+						mqtt_turn_led(LED_G, led_item->valuestring);
+					}
+
+					if( NULL != (led_item=cJSON_GetObjectItem(array , "blue")) )
+					{
+						log_info("turn blue led '%s'\n", led_item->valuestring);
+						mqtt_turn_led(LED_B, led_item->valuestring);
+					}
+				}
+			}
+		}
+	}
+}
+
+void sub_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
+{
+	mqtt_ctx_t             *ctx = (mqtt_ctx_t *)userdata;
+
+	cJSON                  *root = NULL;
+	cJSON                  *item;
+	char                   *value;
+
+	if ( !message->payloadlen )
+	{
+		log_error("%s (null)\n", message->topic);
+		return ;
+	}
+
+	log_debug("Subscriber receive message: '%s'\n", message->payload);
+
+	root = cJSON_Parse(message->payload);
+	if( !root )
+	{
+		log_error("cJSON_Parse parser failure: %s\n", cJSON_GetErrorPtr());
+		return ;
+	}
+
+	item = cJSON_GetObjectItem(root, "id");
+	if( !item )
+	{
+		log_error("cJSON_Parse get ID failure: %s\n", cJSON_GetErrorPtr());
+		goto cleanup;
+	}
+
+	value = cJSON_PrintUnformatted(item);
+	if( strcasecmp(value, ctx->devid) )
+	{
+		free(value);
+		goto cleanup;
+	}
+
+	free(value);
+	log_info("Subscriber receive message: '%s'\n", message->payload);
+
+	proc_json_items(root, ctx);
+
+cleanup:
+	cJSON_Delete(root); /* must delete it, or it will result memory leak */
+	return ;
+}
+
+
+void *mqtt_sub_worker(void *args)
+{
+	mqtt_ctx_t             *ctx = (mqtt_ctx_t *)args;
+	struct mosquitto       *mosq;
+	bool                    session = true;
+
+	mosq = mosquitto_new(NULL, session, ctx);
+	if( !mosq )
+	{
+		log_error("mosquitto_new failure\n");
+		return NULL;
+	}
+
+	/* set connnect to broker username and password  */
+	if( strlen(ctx->uid)> 0  && strlen(ctx->pwd)> 0 )
+		mosquitto_username_pw_set(mosq, ctx->uid, ctx->pwd);
+
+	/* set callback functions */
+	mosquitto_connect_callback_set(mosq, sub_connect_callback);
+	mosquitto_disconnect_callback_set(mosq, sub_disconnect_callback);
+	mosquitto_message_callback_set(mosq, sub_message_callback);
+
+	while( !g_signal.stop )
+	{
+		/* connect to MQTT broker  */
+		if( mosquitto_connect(mosq, ctx->host, ctx->port, ctx->keepalive) )
+		{
+			log_error("Subscriber connect to broker[%s:%d] failure: %s\n", ctx->host, ctx->port, strerror(errno));
+			msleep(1000);
+			continue;
+		}
+
+		/* -1: use default timeout 1000ms  1: unused */
+		mosquitto_loop_forever(mosq, -1, 1);
+	}
+
+	mosquitto_destroy(mosq);
+	return NULL;
+}
+

--
Gitblit v1.9.1