ESP IDF MQTT connection lost after few hours

Cimby1
Posts: 22
Joined: Thu Aug 22, 2024 12:56 pm

ESP IDF MQTT connection lost after few hours

Postby Cimby1 » Wed Oct 16, 2024 8:26 am

Dear forumers!

I am developing a code to push sensor readings via MQTT. My code seems working properly, but after about 9 hours I have the following error in the console.

Code: Select all

I (42766620) mqtt: MQTT_EVENT_BEFORE_CONNECT
E (42766620) esp-tls: [sock=54] connect() error: Host is unreachable
E (42766620) transport_base: Failed to open a new connection: 32772
E (42766620) mqtt_client: Error transport connect
I (42766630) mqtt: MQTT_EVENT_ERROR
I (42766630) mqtt: MQTT_EVENT_DISCONNECTED
I have two running task, one of them keep running, so the ESP it self doesn't crash nor freeze down. The other task when the MQTT connection fails I suspend it (thats how I wanted to be). But my MQTT can't connect back to it's broker.
I am using MQTT 3, and I have my own mosquitto broker running in a docker. I can connect to the broker anytime with a MQTT Explorer and another ESP using a simpler code is able to keep connected to the broker. My local internet is stable aswell, and I tried shutting down and on my broker manually, and my ESP could reconnect and work normally.


Here is my mqtt header file:

Code: Select all

#ifndef MQTT_H
#define MQTT_H

#include "mqtt_client.h"
#include "cJSON.h"

#define MQTT_USER "username"
#define MQTT_PASS "password"
#define MQTT_BROKER "mqtt://local.ip:1883"

void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data);
void mqtt_app_start(void);
void mqtt_publish_json(const char *topic, cJSON *json_obj);
#endif // MQTT_H
Here is my mqtt source file:

Code: Select all

#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include "esp_system.h"
#include "nvs_flash.h"
#include "esp_event.h"
#include "esp_netif.h"
#include "esp_log.h"

#include "mqtt.h"
#include "task_handler.h"

static const char *TAG_MQTT = "mqtt";

extern TaskHandles task_handles;
esp_mqtt_client_handle_t mqtt_client = NULL;

void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
    esp_mqtt_event_handle_t event = event_data;
    int msg_id;

    switch ((esp_mqtt_event_id_t)event_id)
    {
    case MQTT_EVENT_CONNECTED:
        ESP_LOGI(TAG_MQTT, "MQTT_EVENT_CONNECTED");

        // Subscribe to topic
        msg_id = esp_mqtt_client_subscribe(mqtt_client, "test/topic", 0);
        ESP_LOGI(TAG_MQTT, "Subscribed to topic, msg_id=%d", msg_id);
        task_halter(&task_handles, RESUME);
        break;
    case MQTT_EVENT_DISCONNECTED:
        ESP_LOGI(TAG_MQTT, "MQTT_EVENT_DISCONNECTED");
        task_halter(&task_handles, SUSPEND);
        break;
    case MQTT_EVENT_SUBSCRIBED:
        ESP_LOGI(TAG_MQTT, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
        break;
    case MQTT_EVENT_UNSUBSCRIBED:
        ESP_LOGI(TAG_MQTT, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
        break;
    case MQTT_EVENT_PUBLISHED:
        // ESP_LOGI(TAG_MQTT, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
        break;
    case MQTT_EVENT_DATA:
        ESP_LOGI(TAG_MQTT, "MQTT_EVENT_DATA");
        printf("Received topic: %.*s\r\n", event->topic_len, event->topic);
        printf("Received data: %.*s\r\n", event->data_len, event->data);
        break;
    case MQTT_EVENT_ERROR:
        ESP_LOGI(TAG_MQTT, "MQTT_EVENT_ERROR");
        break;
    case MQTT_EVENT_BEFORE_CONNECT:
        ESP_LOGI(TAG_MQTT, "MQTT_EVENT_BEFORE_CONNECT");
        break;
    default:
        ESP_LOGI(TAG_MQTT, "Other event id:%d", event->event_id);
        break;
    }
}

void mqtt_app_start(void)
{
    esp_mqtt_client_config_t mqtt_cfg = {
        .broker.address.uri = MQTT_BROKER,
        .credentials.username = MQTT_USER,
        .credentials.authentication.password = MQTT_PASS,
        .network.disable_auto_reconnect = false,
        .network.reconnect_timeout_ms = 5000,
    };

    mqtt_client = esp_mqtt_client_init(&mqtt_cfg); // Assign to global variable

    ESP_ERROR_CHECK(esp_mqtt_client_register_event(mqtt_client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL));
    ESP_ERROR_CHECK(esp_mqtt_client_start(mqtt_client));
}

void mqtt_publish_json(const char *topic, cJSON *json_obj)
{
    // Convert the JSON object to a string
    char *json_str = cJSON_Print(json_obj);
    if (json_str == NULL)
    {
        ESP_LOGE("MQTT", "Failed to print JSON");
        return; // Exit if printing fails
    }

    if (mqtt_client != NULL)
    {
        int msg_id = esp_mqtt_client_publish(mqtt_client, topic, json_str, 0, 1, 0);
        ESP_LOGI(TAG_MQTT, "JSON data published to topic '%s', msg_id=%d", topic, msg_id);
    }
    else
    {
        ESP_LOGE(TAG_MQTT, "MQTT client is not initialized.");
    }

    // Free the memory allocated for the JSON string
    // free(json_str);
}
My function that calls the :
mqtt_publish_json()

Code: Select all

void data_sender(const uint8_t sensor_type, sensor_data_entry_t *entries, uint8_t entry_count)
{
    // esp_task_wdt_reset_user(data_sender_twdt_user_hdl);
    cJSON *json_obj = cJSON_CreateObject();
    if (json_obj == NULL)
    {
        ESP_LOGE(TAG_MAIN, "Failed to create JSON object");
        return;
    }

    for (int i = 0; i < entry_count; i++)
    {
        cJSON_AddNumberToObject(json_obj, entries[i].key, entries[i].value);
    }

    switch (sensor_type)
    {
    case VEML7700:
        cJSON_AddStringToObject(json_obj, "sensor_type", "VEML7700");
        mqtt_publish_json("comfortzone/veml7700", json_obj);
        break;
    case BME680:
        cJSON_AddStringToObject(json_obj, "sensor_type", "BME680");
        mqtt_publish_json("comfortzone/bme680", json_obj);
        break;
    default:
        ESP_LOGE(TAG_MAIN, "No sensor found");
        break;
    }

    cJSON_Delete(json_obj);
}
There is a
task_halter(&task_handles, SUSPEND);
part in the mqtt source file which does the following:

Code: Select all

void task_halter(TaskHandles *handles, bool condition)
{

    TaskHandle_t *taskArray[NUM_TASKS] = {
        handles->veml7700,
        handles->bme680,
        handles->omron,
        handles->etc};

    ESP_LOGI("Task holter", "Task definied in TaskHandles_t: %d", NUM_TASKS);

    switch (condition)
    {
    case RESUME:
        if (is_it_suspended == true)
        {
            for (int i = 0; i < NUM_TASKS; i++)
            {
                if (taskArray[i] != NULL)
                {
                    ESP_LOGI("Task holter", "Resumed task: %p", taskArray[i]);
                    vTaskResume(taskArray[i]);
                    esp_task_wdt_add(taskArray[i]);
                }
            }
            is_it_suspended = false;
        }
        break;
    case SUSPEND:
        if (is_it_suspended == false)
        {
            for (int i = 0; i < NUM_TASKS; i++)
            {
                if (taskArray[i] != NULL)
                {
                    ESP_LOGI("Task holter", "Suspended task: %p", taskArray[i]);
                    vTaskSuspend(taskArray[i]);
                    esp_task_wdt_delete(taskArray[i]);
                }
            }

            xSemaphoreGive(dataSendSemaphore);
            is_it_suspended = true;
        }
        break;
    default:
        ESP_LOGE("Task holer", "Error in task holter!");
        break;
    }
}

And here is one of my tasks:

Code: Select all

static void veml7700_task(void *pvParameters, void *arg)
{
    // Subscribe this task to TWDT, then check if it is subscribed
    ESP_ERROR_CHECK(esp_task_wdt_add(task_handles.veml7700));
    ESP_ERROR_CHECK(esp_task_wdt_status(task_handles.veml7700));

    // Subscribe data_sender as users of the the TWDT
    // ESP_ERROR_CHECK(esp_task_wdt_add_user("data_sender", &data_sender_twdt_user_hdl));

    ESP_LOGI(TAG_MAIN, "Subscribed to TWDT, veml7700_task");

    veml7700_data_t raw_veml_values[VEML_SAMPLE_SIZE] = {0};

    while (1)
    {
        esp_task_wdt_reset();

        veml7700_data_t average_veml_values = {0};

        for (int i = 0; i < VEML_SAMPLE_SIZE; i++)
        {
            ESP_LOGI(TAG_MAIN, "Sampling data from VEML7700...");
            raw_veml_values[i] = query_veml7700_data();
            vTaskDelay(pdMS_TO_TICKS(WAIT_BETWEEN_SAMPLES));
        }

        for (int i = 0; i < VEML_SAMPLE_SIZE; i++)
        {
            average_veml_values.als += raw_veml_values[i].als;
            average_veml_values.white += raw_veml_values[i].white;
        }
        average_veml_values.als /= VEML_SAMPLE_SIZE;
        average_veml_values.white /= VEML_SAMPLE_SIZE;

        sensor_data_entry_t veml_data_entries[] = {
            {"ambient_light", average_veml_values.als},
            {"white_light", average_veml_values.white}};

        if (xSemaphoreTake(dataSendSemaphore, portMAX_DELAY) == pdTRUE)
        {
            data_sender(VEML7700, veml_data_entries, 2);

            xSemaphoreGive(dataSendSemaphore);
        }

        // get_task_info(task_handles.veml7700);
        // task_halter(&task_handles, SUSPEND);
        esp_task_wdt_reset();
        vTaskDelay(pdMS_TO_TICKS(TASK_DELAY));
    }
}

I use semaphore and watchdog to keep everything safe, but my guess it is not connected to the MQTT error.

I hope it is enough information to get started.
Last edited by Cimby1 on Wed Oct 16, 2024 12:33 pm, edited 1 time in total.

nopnop2002
Posts: 111
Joined: Thu Oct 03, 2019 10:52 pm
Contact:

Re: ESP IDF MQTT connection lost after few hours

Postby nopnop2002 » Wed Oct 16, 2024 10:41 am

From print_value() function, the pointer returned is allocated by cJSON_strdup() and it is returned to the caller.
Buffers returned by cJSON_Print must be freed by the caller.
Please use the proper API (cJSON_free) rather than directly calling stdlib free.

Code: Select all

char *my_json_string = cJSON_Print(root);
ESP_LOGI(TAG, "my_json_string\n%s",my_json_string);
cJSON_Delete(root);
cJSON_free(my_json_string);

Cimby1
Posts: 22
Joined: Thu Aug 22, 2024 12:56 pm

Re: ESP IDF MQTT connection lost after few hours

Postby Cimby1 » Wed Oct 16, 2024 10:49 am

Thanks for your reply!

Should I do this in one function or in two separate ones ?

Like this:

Code: Select all

void data_sender(const uint8_t sensor_type, sensor_data_entry_t *entries, uint8_t entry_count)
{
    cJSON *json_obj = cJSON_CreateObject();
    if (json_obj == NULL)
    {
        ESP_LOGE(TAG_CJSON, "Failed to create JSON object");
        return;
    }

    for (int i = 0; i < entry_count; i++)
    {
        cJSON_AddNumberToObject(json_obj, entries[i].key, entries[i].value);
    }

    switch (sensor_type)
    {
    case VEML7700:
        cJSON_AddStringToObject(json_obj, "sensor_type", "VEML7700");
        mqtt_publish_json("comfortzone/veml7700", json_obj);
        break;
    case BME680:
        cJSON_AddStringToObject(json_obj, "sensor_type", "BME680");
        mqtt_publish_json("comfortzone/bme680", json_obj);
        break;
    default:
        ESP_LOGE(TAG_CJSON, "No sensor found");
        break;
    }

    cJSON_Delete(json_obj);
}

Code: Select all

void mqtt_publish_json(const char *topic, cJSON *json_obj)
{
    // Convert the JSON object to a string
    char *json_str = cJSON_Print(json_obj);
    if (json_str == NULL)
    {
        ESP_LOGE("MQTT", "Failed to print JSON");
        return; // Exit if printing fails
    }

    if (mqtt_client != NULL)
    {
        int msg_id = esp_mqtt_client_publish(mqtt_client, topic, json_str, 0, 1, 0);
        ESP_LOGI(TAG_CJSON, "JSON data published to topic '%s', msg_id=%d", topic, msg_id);
    }
    else
    {
        ESP_LOGE(TAG_CJSON, "MQTT client is not initialized.");
    }

    // Free the memory allocated for the JSON string
    // free(json_str);
    cJSON_free(json_str);
}
So this way I free up allocated memory of my string and even my json obj.

MicroController
Posts: 1725
Joined: Mon Oct 17, 2022 7:38 pm
Location: Europe, Germany

Re: ESP IDF MQTT connection lost after few hours

Postby MicroController » Wed Oct 16, 2024 12:21 pm

Do not use vTaskSuspend(...) on any task other than the current one, if ever.

Cimby1
Posts: 22
Joined: Thu Aug 22, 2024 12:56 pm

Re: ESP IDF MQTT connection lost after few hours

Postby Cimby1 » Wed Oct 16, 2024 12:32 pm

Is it that bad ?
I thought if I suspend a task when there is no broker where I should send the data, then it's unnecessarily to querry any sensor information. Hence I suspend and resume tasks via the mqtt_handler. But I can agree with you if you give me an explanation, or another idea to pause tasks.

I have one question still, what is causing my mqtt to collapse ? Stackoverflow or something ?

MicroController
Posts: 1725
Joined: Mon Oct 17, 2022 7:38 pm
Location: Europe, Germany

Re: ESP IDF MQTT connection lost after few hours

Postby MicroController » Wed Oct 16, 2024 2:12 pm

Cimby1 wrote:
Wed Oct 16, 2024 12:32 pm
I have one question still, what is causing my mqtt to collapse ? Stackoverflow or something ?
Not sure, but it may be the task suspending/resuming which breaks things.
Suspending (or deleting) a task (T1) from another task (T2) is very unsafe, because T2 doesn't know what exactly T1 is currently executing, and T1 doesn't get a chance to do any 'clean up' before ceasing execution. If, for example, T1 acquires a lock/mutex and it gets stopped before it can release that lock again, this may 'deadlock' part or all of the system sooner or later.

The common way to control other tasks is via some kind of 'messaging', where one task sends a message to another task, e.g. asking it to delete itself when ready to.
Messages can be sent e.g. via a FreeRTOS queue, but can also be realized via direct-to-task notifications or semaphores.
There's not really any need to suspend/resume a task because the task can also just block waiting for a message/notification before continuing execution.
This way, a task itself is in control over exactly when it handles the message and when e.g. it pauses/stops, so that it can make sure it's in a safe state to do so.

Cimby1
Posts: 22
Joined: Thu Aug 22, 2024 12:56 pm

Re: ESP IDF MQTT connection lost after few hours

Postby Cimby1 » Wed Oct 16, 2024 3:29 pm

Now I understand. But if this would break my MQTT why is my other task which is simply pushing a log info to the console keep running ? Weird.
I implemented the cJSON_free(json_str); fix, I go a round with that. (I have to wait like 8 hours to happen...). If it keeps happening I try to delete the part that PAUSE, RESUME tasks (Which bytheway only runs when the MQTT connection is lost).

Nevertheless the idea to pause the task itself is great! The mqtt_handler could send a message to holt. But how should I restart the task afterwards ?

Nonetheless I really appreciate your replies, and thanks for your help.

MicroController
Posts: 1725
Joined: Mon Oct 17, 2022 7:38 pm
Location: Europe, Germany

Re: ESP IDF MQTT connection lost after few hours

Postby MicroController » Thu Oct 17, 2024 7:34 am

A notification-based approach could look like this:

Code: Select all

static const uint32_t MSG_PAUSE = 1;
static const uint32_t MSG_CONTINUE = 2;

static void pausableTask(void*) {
  while(1) {
    uint32_t msg;
    // Wait for up to 250ms for any notification:
    if( xTaskNotifyWait( 0, 0, &msg, 250 / portTICK_PERIOD_MS ) == pdFALSE ) {
      // No "message" received. Do regular task stuff every 250ms.
      // ...
    } else {
      if( msg == MSG_PAUSE ) {
        // We're asked to pause.
        // Block/wait until we receive a CONTINUE message
        do {
          xTaskNotifyWait( 0, 0, &msg, portMAX_DELAY );
        } while( msg != MSG_CONTINUE );
      }
    }
  }
}

// Ask task to pause:
xTaskNotify( taskHandle, MSG_PAUSE, eSetValueWithOverwrite );

// Ask task to continue:
xTaskNotify( taskHandle, MSG_CONTINUE, eSetValueWithOverwrite );

Cimby1
Posts: 22
Joined: Thu Aug 22, 2024 12:56 pm

Re: ESP IDF MQTT connection lost after few hours

Postby Cimby1 » Thu Oct 17, 2024 6:55 pm

I am grateful for your explanation. I try to implement your solution in my code.
But before that, here is the outcome of the new cJSON_free(json_str); method. With this line added my esp run for almost a day long, and produced the same error...:

Code: Select all

I (95888556) mqtt: MQTT_EVENT_ERROR
I (95888556) mqtt: MQTT_EVENT_DISCONNECTED
I (95893566) mqtt: MQTT_EVENT_BEFORE_CONNECT
E (95893566) esp-tls: [sock=54] connect() error: Host is unreachable
E (95893566) transport_base: Failed to open a new connection: 32772
E (95893566) mqtt_client: Error transport connect
ESP's free ram int the meanwhile (I published it through MQTT aswell):
Image

ESP's free iram int the meanwhile:
Image

Everything is in bytes for simplicity.

For the next day I remove the SUSPEND, RESUME logic and test again.

nopnop2002
Posts: 111
Joined: Thu Oct 03, 2019 10:52 pm
Contact:

Re: ESP IDF MQTT connection lost after few hours

Postby nopnop2002 » Thu Oct 17, 2024 11:51 pm

Code: Select all

E (42766620) esp-tls: [sock=54] connect() error: Host is unreachable

Is it possible to change the ssl connection to NON ssl connection?

Who is online

Users browsing this forum: No registered users and 331 guests