Advanced Distributed Systems module at HSLU
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

228 lines
7.8 KiB

/*
* myMqtt.c
*
* Created on: 25.11.2022
* Author: jonas
*/
#include "myMqtt.h"
#include <stdio.h>
#include <stdbool.h>
#include <stdint.h>
#include "esp_log.h"
#include "mqtt_client.h"
#include "challenge_com.h"
#include "challenge_nvs.h"
#include "McuUtility.h"
// tag for logging with ESP_LOG
static const char *TAG = "MY_MQTT";
// local cars
char _brokerIp[] = "255.255.255.255"; // max length of ip
SemaphoreHandle_t semaphoreMqtt = NULL;
// declarations
static void log_error_if_nonzero(const char *message, int error_code);
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data);
// variables
esp_mqtt_client_handle_t client;
bool MyMqtt_Init(void){
// initialize semaphore
semaphoreMqtt = xSemaphoreCreateRecursiveMutex();
if(semaphoreMqtt == NULL){
ESP_LOGE(TAG, "Failed to initialize semaphore for MyMqtt!");
}else{
vQueueAddToRegistry(semaphoreMqtt, "mymqtt_semphr");
}
// get mqtt broker IP from NVS
char brokerUri[50];
Challenge_Nvs_GetBrokerIpFromNVS(_brokerIp, sizeof(_brokerIp));
McuUtility_strcpy((unsigned char*)brokerUri, sizeof(brokerUri), (unsigned char*)"mqtt://");
McuUtility_strcat((unsigned char*)brokerUri, sizeof(brokerUri), (unsigned char*)_brokerIp);
ESP_LOGI(TAG, "Connecting to Mqtt Broker URI '%s'", brokerUri);
esp_mqtt_client_config_t mqtt_cfg = {
.uri = brokerUri,
};
client = esp_mqtt_client_init(&mqtt_cfg);
/* The last argument may be used to pass data to the event handler, in this example mqtt_event_handler */
esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL);
esp_mqtt_client_register_event(client, MQTT_EVENT_DATA, Challenge_Com_ParseMqtt, NULL); // ToDo: maybe make this configurable via parameter
esp_err_t mqtt_error = esp_mqtt_client_start(client);
// allow 1s to finish mqtt client to start
vTaskDelay(1000);
return mqtt_error == ESP_OK;
}
void MyMqtt_Deinit(void){
esp_mqtt_client_destroy(client);
}
/* HELPERS FOR SEMAPHORE */
static bool MyMqtt_AquireMutex(void){
if(xSemaphoreTakeRecursive(semaphoreMqtt, pdMS_TO_TICKS(1000)) != pdTRUE){
/* timeout */
ESP_LOGW(TAG, "Timemout while aquiring myMqtt semaphore.");
return false;
}
return true;
}
static void MyMqtt_ReleaseMutex(void){
xSemaphoreGive(semaphoreMqtt);
}
void MyMqtt_Publish(const char *topic, const char *data){
if(MyMqtt_AquireMutex()){
int msg_id = esp_mqtt_client_publish(client, topic, data, 0, 2, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
MyMqtt_ReleaseMutex();
}
}
void MyMqtt_Subscribe(const char *topic){
if(MyMqtt_AquireMutex()){
int msg_id = esp_mqtt_client_subscribe(client, topic, 0);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d, success=%s", msg_id, msg_id==-1 ? "false" : "true");
MyMqtt_ReleaseMutex();
}
}
char* MyMqtt_GetBrokerIP(void){
return _brokerIp;
}
void MyMqtt_SetBrokerIP(char* ip){
ESP_LOGI(TAG, "Set Broker IP to '%s'", ip);
McuUtility_strcpy((unsigned char*)_brokerIp, sizeof(_brokerIp), (unsigned char*)ip);
// store to NVS
Challenge_Nvs_StoreBrokerIpToNVS(_brokerIp);
}
/***********/
/* HELPERS */
/***********/
static void log_error_if_nonzero(const char *message, int error_code)
{
if (error_code != 0) {
ESP_LOGE(TAG, "Last error %s: 0x%x", message, error_code);
}
}
/*
* @brief Event handler registered to receive MQTT events
*
* This function is called by the MQTT client event loop.
*
* @param handler_args user data registered to the event.
* @param base Event base for the handler(always MQTT Base in this example).
* @param event_id The id for the received event.
* @param event_data The data for the event, esp_mqtt_event_handle_t.
*/
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id);
esp_mqtt_event_handle_t event = event_data;
// unused: esp_mqtt_client_handle_t client = event->client;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED:
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err);
log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err);
log_error_if_nonzero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno);
ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno));
}
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
}
/*********/
/* SHELL */
/*********/
#if PL_CONFIG_USE_SHELL
static uint8_t PrintStatus(const McuShell_StdIOType *io) {
McuShell_SendStatusStr((unsigned char*)"mqtt", (unsigned char*)"ESP32 MQTT status\r\n", io->stdOut);
McuShell_SendStatusStr((unsigned char*)" ip", (unsigned char*)MyMqtt_GetBrokerIP(), io->stdOut);
return ERR_OK;
}
static uint8_t PrintHelp(const McuShell_StdIOType *io) {
McuShell_SendHelpStr((unsigned char*)"mqtt", (unsigned char*)"Group of ESP32 MQTT commands\r\n", io->stdOut);
McuShell_SendHelpStr((unsigned char*)" help|status", (unsigned char*)"Shows MQTT help or status\r\n", io->stdOut);
McuShell_SendHelpStr((unsigned char*)" setIp <ipv4>", (unsigned char*)"Sets the IP address of the MQTT broker\r\n", io->stdOut);
McuShell_SendHelpStr((unsigned char*)" publish <topic> <msg>", (unsigned char*)"Publishes a message to the given topic.\r\n", io->stdOut);
return ERR_OK;
}
uint8_t MyMqtt_ParseShellCommand(const unsigned char* cmd, bool *handled, const McuShell_StdIOType *io) {
if (McuUtility_strcmp((char*)cmd, (char*)McuShell_CMD_HELP)==0 || McuUtility_strcmp((char*)cmd, (char*)"mqtt help")==0) {
*handled = TRUE;
return PrintHelp(io);
} else if (McuUtility_strcmp((char*)cmd, (char*)McuShell_CMD_STATUS)==0 || McuUtility_strcmp((char*)cmd, (char*)"mqtt status")==0) {
*handled = TRUE;
return PrintStatus(io);
} else if (McuUtility_strncmp((char*)cmd,"mqtt setIp",sizeof("mqtt setIp")-1) == 0) { // contains
*handled = TRUE;
char ip[] = "255.255.255.255"; // maximum size of IP
McuUtility_strcpy((unsigned char*)ip, sizeof(ip), (unsigned char *)cmd + strlen((char*)"mqtt setIp ")); // cut front
McuUtility_strCutTail((unsigned char*)ip, (unsigned char*)"\0\0\0"); // cut tail
MyMqtt_SetBrokerIP(ip); // set ip
}else if (McuUtility_strncmp((char*)cmd,"mqtt publish ",sizeof("mqtt publish ")-1) == 0){
*handled = TRUE;
cmd += sizeof("mqtt publish")-1;
// skip until double quotes (topic)
while(*cmd!='\"'){
cmd++;
}
uint8_t topic[128] = "";
uint8_t data[128] = "";
McuUtility_ScanDoubleQuotedString(&cmd, topic, sizeof(topic));
ESP_LOGI(TAG, "Parsed topic=%s", topic);
// skip to data
cmd++;
McuUtility_ScanDoubleQuotedString(&cmd, data, sizeof(data));
ESP_LOGI(TAG, "Parsed data=%s", data);
MyMqtt_Publish((char*)topic, (char*)data);
}
return ERR_OK;
}
#endif /* PL_CONFIG_USE_SHELL */