I am developing a code to push sensor readings via MQTT. My code seems working properly, but after about 9 hours I have the following error in the console.
Code: Select all
I (42766620) mqtt: MQTT_EVENT_BEFORE_CONNECT
E (42766620) esp-tls: [sock=54] connect() error: Host is unreachable
E (42766620) transport_base: Failed to open a new connection: 32772
E (42766620) mqtt_client: Error transport connect
I (42766630) mqtt: MQTT_EVENT_ERROR
I (42766630) mqtt: MQTT_EVENT_DISCONNECTED
I am using MQTT 3, and I have my own mosquitto broker running in a docker. I can connect to the broker anytime with a MQTT Explorer and another ESP using a simpler code is able to keep connected to the broker. My local internet is stable aswell, and I tried shutting down and on my broker manually, and my ESP could reconnect and work normally.
Here is my mqtt header file:
Code: Select all
#ifndef MQTT_H
#define MQTT_H
#include "mqtt_client.h"
#include "cJSON.h"
#define MQTT_USER "username"
#define MQTT_PASS "password"
#define MQTT_BROKER "mqtt://local.ip:1883"
void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data);
void mqtt_app_start(void);
void mqtt_publish_json(const char *topic, cJSON *json_obj);
#endif // MQTT_H
Code: Select all
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include "esp_system.h"
#include "nvs_flash.h"
#include "esp_event.h"
#include "esp_netif.h"
#include "esp_log.h"
#include "mqtt.h"
#include "task_handler.h"
static const char *TAG_MQTT = "mqtt";
extern TaskHandles task_handles;
esp_mqtt_client_handle_t mqtt_client = NULL;
void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
esp_mqtt_event_handle_t event = event_data;
int msg_id;
switch ((esp_mqtt_event_id_t)event_id)
{
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG_MQTT, "MQTT_EVENT_CONNECTED");
// Subscribe to topic
msg_id = esp_mqtt_client_subscribe(mqtt_client, "test/topic", 0);
ESP_LOGI(TAG_MQTT, "Subscribed to topic, msg_id=%d", msg_id);
task_halter(&task_handles, RESUME);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG_MQTT, "MQTT_EVENT_DISCONNECTED");
task_halter(&task_handles, SUSPEND);
break;
case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG_MQTT, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG_MQTT, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED:
// ESP_LOGI(TAG_MQTT, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG_MQTT, "MQTT_EVENT_DATA");
printf("Received topic: %.*s\r\n", event->topic_len, event->topic);
printf("Received data: %.*s\r\n", event->data_len, event->data);
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG_MQTT, "MQTT_EVENT_ERROR");
break;
case MQTT_EVENT_BEFORE_CONNECT:
ESP_LOGI(TAG_MQTT, "MQTT_EVENT_BEFORE_CONNECT");
break;
default:
ESP_LOGI(TAG_MQTT, "Other event id:%d", event->event_id);
break;
}
}
void mqtt_app_start(void)
{
esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = MQTT_BROKER,
.credentials.username = MQTT_USER,
.credentials.authentication.password = MQTT_PASS,
.network.disable_auto_reconnect = false,
.network.reconnect_timeout_ms = 5000,
};
mqtt_client = esp_mqtt_client_init(&mqtt_cfg); // Assign to global variable
ESP_ERROR_CHECK(esp_mqtt_client_register_event(mqtt_client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL));
ESP_ERROR_CHECK(esp_mqtt_client_start(mqtt_client));
}
void mqtt_publish_json(const char *topic, cJSON *json_obj)
{
// Convert the JSON object to a string
char *json_str = cJSON_Print(json_obj);
if (json_str == NULL)
{
ESP_LOGE("MQTT", "Failed to print JSON");
return; // Exit if printing fails
}
if (mqtt_client != NULL)
{
int msg_id = esp_mqtt_client_publish(mqtt_client, topic, json_str, 0, 1, 0);
ESP_LOGI(TAG_MQTT, "JSON data published to topic '%s', msg_id=%d", topic, msg_id);
}
else
{
ESP_LOGE(TAG_MQTT, "MQTT client is not initialized.");
}
// Free the memory allocated for the JSON string
// free(json_str);
}
mqtt_publish_json()
Code: Select all
void data_sender(const uint8_t sensor_type, sensor_data_entry_t *entries, uint8_t entry_count)
{
// esp_task_wdt_reset_user(data_sender_twdt_user_hdl);
cJSON *json_obj = cJSON_CreateObject();
if (json_obj == NULL)
{
ESP_LOGE(TAG_MAIN, "Failed to create JSON object");
return;
}
for (int i = 0; i < entry_count; i++)
{
cJSON_AddNumberToObject(json_obj, entries[i].key, entries[i].value);
}
switch (sensor_type)
{
case VEML7700:
cJSON_AddStringToObject(json_obj, "sensor_type", "VEML7700");
mqtt_publish_json("comfortzone/veml7700", json_obj);
break;
case BME680:
cJSON_AddStringToObject(json_obj, "sensor_type", "BME680");
mqtt_publish_json("comfortzone/bme680", json_obj);
break;
default:
ESP_LOGE(TAG_MAIN, "No sensor found");
break;
}
cJSON_Delete(json_obj);
}
part in the mqtt source file which does the following:task_halter(&task_handles, SUSPEND);
Code: Select all
void task_halter(TaskHandles *handles, bool condition)
{
TaskHandle_t *taskArray[NUM_TASKS] = {
handles->veml7700,
handles->bme680,
handles->omron,
handles->etc};
ESP_LOGI("Task holter", "Task definied in TaskHandles_t: %d", NUM_TASKS);
switch (condition)
{
case RESUME:
if (is_it_suspended == true)
{
for (int i = 0; i < NUM_TASKS; i++)
{
if (taskArray[i] != NULL)
{
ESP_LOGI("Task holter", "Resumed task: %p", taskArray[i]);
vTaskResume(taskArray[i]);
esp_task_wdt_add(taskArray[i]);
}
}
is_it_suspended = false;
}
break;
case SUSPEND:
if (is_it_suspended == false)
{
for (int i = 0; i < NUM_TASKS; i++)
{
if (taskArray[i] != NULL)
{
ESP_LOGI("Task holter", "Suspended task: %p", taskArray[i]);
vTaskSuspend(taskArray[i]);
esp_task_wdt_delete(taskArray[i]);
}
}
xSemaphoreGive(dataSendSemaphore);
is_it_suspended = true;
}
break;
default:
ESP_LOGE("Task holer", "Error in task holter!");
break;
}
}
And here is one of my tasks:
Code: Select all
static void veml7700_task(void *pvParameters, void *arg)
{
// Subscribe this task to TWDT, then check if it is subscribed
ESP_ERROR_CHECK(esp_task_wdt_add(task_handles.veml7700));
ESP_ERROR_CHECK(esp_task_wdt_status(task_handles.veml7700));
// Subscribe data_sender as users of the the TWDT
// ESP_ERROR_CHECK(esp_task_wdt_add_user("data_sender", &data_sender_twdt_user_hdl));
ESP_LOGI(TAG_MAIN, "Subscribed to TWDT, veml7700_task");
veml7700_data_t raw_veml_values[VEML_SAMPLE_SIZE] = {0};
while (1)
{
esp_task_wdt_reset();
veml7700_data_t average_veml_values = {0};
for (int i = 0; i < VEML_SAMPLE_SIZE; i++)
{
ESP_LOGI(TAG_MAIN, "Sampling data from VEML7700...");
raw_veml_values[i] = query_veml7700_data();
vTaskDelay(pdMS_TO_TICKS(WAIT_BETWEEN_SAMPLES));
}
for (int i = 0; i < VEML_SAMPLE_SIZE; i++)
{
average_veml_values.als += raw_veml_values[i].als;
average_veml_values.white += raw_veml_values[i].white;
}
average_veml_values.als /= VEML_SAMPLE_SIZE;
average_veml_values.white /= VEML_SAMPLE_SIZE;
sensor_data_entry_t veml_data_entries[] = {
{"ambient_light", average_veml_values.als},
{"white_light", average_veml_values.white}};
if (xSemaphoreTake(dataSendSemaphore, portMAX_DELAY) == pdTRUE)
{
data_sender(VEML7700, veml_data_entries, 2);
xSemaphoreGive(dataSendSemaphore);
}
// get_task_info(task_handles.veml7700);
// task_halter(&task_handles, SUSPEND);
esp_task_wdt_reset();
vTaskDelay(pdMS_TO_TICKS(TASK_DELAY));
}
}
I use semaphore and watchdog to keep everything safe, but my guess it is not connected to the MQTT error.
I hope it is enough information to get started.