Page 1 of 1

Potential data corruption using Ethernet on ESP32 WROOVER - E

Posted: Fri Dec 29, 2023 9:51 am
by Vilius
Hi,

I am doing a project that involves relatively high speed real time data collection on ESP32. I am constantly reading 12 bit parallel ADC and putting data in a 64 element cyclical buffer. When the interrupt triggers, ESP32 sends that data to the PC via Ethernet (I am using ESP Ethernet kit.)

A bit earlier I was doing that minimal data extraction on the ESP itself: after reading both GPIO registers of the ESP32 I used bitwise operations to extract and build a ADC reading representing integer number (since I had to used various GPIO pin numbers allocated in both GPIO registers.) This was done in an infinite cycle. My python script on the PC was receiving those ADC-representing integer numbers without any problem. My device was working properly.

However, I decided to gain just a bit more speed eliminating the data extraction on the ESP32 side and moving that to the PC... The idea is just to leave the GPIO register reading for the ESP32 to minimize the single iteration time of the main while cycle and do the earlier described bitwise operations on the PC. So ESP32 would only send two 32 bit GPIO registers content to the PC and leave all the processing for the PC. And this is where my problem arises... I use the exact same code architecture as before, but somehow, my data gets corrupted when the PC receives it. When I print raw data the PC has just received, the second GPIO register is represented to be all zeros, and the first GPIO register`s values are virtually the same along the data packet (I am digitizing voltage pulses, so all the register values should vary significantly along the cyclical buffer). The line in the brackets bellow is a 32 element integer recreation of the ADC reading (each element is extracted from both registers), so no wonder why is it wrong, when the raw data is corrupted... I wonder can it be related with the fact that the second GPIO register has some bits reserved (can it cause uncertainty when transmitting?). Any other ideas? Thank you in advance:

ESP code:

Code: Select all

#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/queue.h"
#include "freertos/semphr.h"
#include "driver/gpio.h"
#include "soc/soc.h"
#include "soc/gpio_struct.h"
#include "soc/gpio_reg.h"
#include "esp32/rom/ets_sys.h"
#include "soc/rtc.h"
#include "esp_system.h"
#include "esp_wifi.h"
#include "esp_event.h"
#include "esp_log.h"
#include "nvs_flash.h"
#include "esp_netif.h"
#include "protocol_examples_common.h"
#include "esp_transport.h"
#include "esp_transport_tcp.h"
#include "esp_transport_socks_proxy.h"
#include <sys/time.h>

#define TARGET_ADDR "169.254.157.27"
#define TARGET_PORT 5190

#ifdef CONFIG_EXAMPLE_ENABLE_PROXY
#define PROXY_ADDR CONFIG_EXAMPLE_PROXY_ADDR
#define PROXY_PORT CONFIG_EXAMPLE_PROXY_PORT
#endif

#define INTERRUPT_INPUT 15
#define GPIO_CLOCK 14
#define NUM_GPIO_PINS 12
#define NUM_CYCLES 64

#define GPIO_PIN_12 12   //LSB
#define GPIO_PIN_13 13
#define GPIO_PIN_2 2
#define GPIO_PIN_4 4
#define GPIO_PIN_16 16
#define GPIO_PIN_17 17
#define GPIO_PIN_32 32
#define GPIO_PIN_33 33
#define GPIO_PIN_34 34
#define GPIO_PIN_35 35
#define GPIO_PIN_36 36
#define GPIO_PIN_39 39   //MSB

uint64_t get_time_us() {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return (uint64_t)tv.tv_sec * 1000000 + tv.tv_usec;
}

static const char *TAG = "tcp_transport_client";
esp_transport_handle_t transport = NULL;
static bool is_connected = false;

static const gpio_num_t GPIO_PINS[NUM_GPIO_PINS] = {
    GPIO_PIN_12, GPIO_PIN_13, GPIO_PIN_2, GPIO_PIN_4,
    GPIO_PIN_16, GPIO_PIN_17, GPIO_PIN_32, GPIO_PIN_33,
    GPIO_PIN_34, GPIO_PIN_35, GPIO_PIN_36, GPIO_PIN_39
};

bool trigger = false;
uint32_t data_array[NUM_CYCLES] = {0};

// Task handle for the send data task
TaskHandle_t tcpTaskHandle;

// Semaphore for synchronization
SemaphoreHandle_t tcpMutex;

QueueHandle_t dataQueue;

void send_data_over_tcp(uint32_t *data_array, size_t num_elements) {
    char rx_buffer[128];

    if (transport == NULL) {
        ESP_LOGE(TAG, "Error: TCP transport not initialized");
        return;
    }

    if (!is_connected) {
        ESP_LOGE(TAG, "Error: Not connected to server");
        return;
    }

    int err = esp_transport_connect(transport, TARGET_ADDR, TARGET_PORT, 5000);
    if (err != 0) {
        ESP_LOGE(TAG, "Client unable to connect: errno %d", errno);
        is_connected = false;
        esp_transport_close(transport);
        return;
    }
    ESP_LOGI(TAG, "Successfully connected");

    // Convert data to little-endian before sending
    for (size_t i = 0; i < num_elements; ++i) {
        data_array[i] = htonl(data_array[i]);
    }

    // Assuming data is in little-endian byte order
    int bytes_written = esp_transport_write(transport, (char *)data_array, sizeof(uint32_t) * num_elements, 0);

    if (bytes_written < 0) {
        ESP_LOGE(TAG, "Error occurred during sending: esp_transport_write() returned %d, errno %d", bytes_written, errno);

        if (errno == ECONNRESET) {
            ESP_LOGE(TAG, "Connection reset by peer. Attempting to reconnect...");
            is_connected = false;
            esp_transport_close(transport);
            return;
        }

        ESP_LOGE(TAG, "Unhandled error. Closing the transport.");
        esp_transport_close(transport);
        return;
    }

    // Set a timeout of 1000 milliseconds for esp_transport_read
    uint64_t start_time = get_time_us();
    int len;
    do {
        len = esp_transport_read(transport, rx_buffer, sizeof(rx_buffer) - 1, 0);
        if (len < 0 && errno != EAGAIN) {
            ESP_LOGE(TAG, "recv failed: esp_transport_read() returned %d, errno %d", len, errno);

            if (errno == ECONNRESET) {
                ESP_LOGE(TAG, "Connection reset by peer. Attempting to reconnect...");
                is_connected = false;
                esp_transport_close(transport);
                return;
            }

            ESP_LOGE(TAG, "Unhandled error. Closing the transport.");
            esp_transport_close(transport);
            return;
        }

        if (get_time_us() - start_time > 5000000) {
            ESP_LOGE(TAG, "Timeout waiting for server response");
            esp_transport_close(transport);
            return;
        }
    } while (len < 0);

    rx_buffer[len] = 0;
    ESP_LOGI(TAG, "Received %d bytes from %s:", len, TARGET_ADDR);
    ESP_LOGI(TAG, "Received data: %s", rx_buffer);

    esp_transport_close(transport);
}

// Task that handles data transfer over TCP
void send_data_task(void *pvParameters) {
    uint32_t received_data[NUM_CYCLES];

    while (1) {
        // Wait for the semaphore to be available
        if (xSemaphoreTake(tcpMutex, portMAX_DELAY)) {
            // Wait for data in the queue with a timeout
            if (xQueueReceive(dataQueue, received_data, pdMS_TO_TICKS(5000))) {
                // Check if already connected
                if (!is_connected) {
                    ESP_LOGI(TAG, "Attempting to connect...");
                    int err = esp_transport_connect(transport, TARGET_ADDR, TARGET_PORT, 5000);
                    if (err == 0) {
                        ESP_LOGI(TAG, "Connected successfully");
                        is_connected = true;  // Mark the connection as successful
                    } else {
                        ESP_LOGE(TAG, "Connection failed: errno %d", errno);
                    }
                }

                // Perform data transfer over TCP
                send_data_over_tcp(received_data, NUM_CYCLES);
            } else {
                if (is_connected) {
                    // Attempt to reconnect after a delay
                    ESP_LOGI(TAG, "Connection lost. Attempting to reconnect...");
                    int err = esp_transport_close(transport);
                    if (err == 0) {
                        ESP_LOGI(TAG, "Closed the connection successfully");
                    } else {
                        ESP_LOGE(TAG, "Error closing the connection: errno %d", errno);
                    }
                    is_connected = false;  // Mark the connection as unsuccessful
                }
            }

            // Release the semaphore
            xSemaphoreGive(tcpMutex);
        }

        // Delay to avoid tight loop
        vTaskDelay(pdMS_TO_TICKS(1000));  // Adjust the delay as needed
    }
}

static void IRAM_ATTR gpio_interrupt_handler(void *args) {
    BaseType_t xHigherPriorityTaskWoken = pdFALSE;

    // Send the data_array to the queue from ISR
    xQueueSendFromISR(dataQueue, data_array, &xHigherPriorityTaskWoken);

    // Clear the interrupt status and exit
    if (xHigherPriorityTaskWoken == pdTRUE) {
        portYIELD_FROM_ISR();
    }
}

void app_main() {
    vTaskDelay(100 / portTICK_PERIOD_MS);

    esp_log_level_set("transport", ESP_LOG_VERBOSE);
    esp_log_level_set("transport_base", ESP_LOG_VERBOSE);
    esp_log_level_set("transport_proxy", ESP_LOG_VERBOSE);

    ESP_ERROR_CHECK(nvs_flash_init());
    ESP_ERROR_CHECK(esp_netif_init());
    ESP_ERROR_CHECK(esp_event_loop_create_default());
    ESP_ERROR_CHECK(example_connect());
    transport = esp_transport_tcp_init();

    #ifdef CONFIG_EXAMPLE_ENABLE_PROXY
    esp_transport_handle_t parent = transport;
    esp_transport_socks_proxy_config_t proxy_config = {.port = PROXY_PORT, .address = PROXY_ADDR, .version = SOCKS4};
    transport = esp_transport_socks_proxy_init(parent, &proxy_config);
    #endif

    esp_rom_gpio_pad_select_gpio(INTERRUPT_INPUT);
    gpio_set_direction(INTERRUPT_INPUT, GPIO_MODE_INPUT);
    gpio_pulldown_en(INTERRUPT_INPUT);
    gpio_pullup_dis(INTERRUPT_INPUT);
    gpio_set_intr_type(INTERRUPT_INPUT, GPIO_INTR_NEGEDGE);

    // Create a FreeRTOS queue to communicate between ISR and task
    dataQueue = xQueueCreate(1, sizeof(data_array));

    // Create the TCP data transfer task and assign it to a specific core
    xTaskCreatePinnedToCore(send_data_task, "TCP_Task", 8192, NULL, 2, &tcpTaskHandle, APP_CPU_NUM);

    // Create a FreeRTOS semaphore for synchronization
    tcpMutex = xSemaphoreCreateMutex();

    gpio_install_isr_service(0);
    gpio_isr_handler_add(INTERRUPT_INPUT, gpio_interrupt_handler, (void *)INTERRUPT_INPUT);

    gpio_config_t io_conf_output = {
        .pin_bit_mask = (1ULL << GPIO_CLOCK),
        .mode = GPIO_MODE_OUTPUT,
    };
    gpio_config(&io_conf_output);

    gpio_config_t io_conf;
    for (int i = 0; i < NUM_GPIO_PINS; ++i)
    {
        io_conf = (gpio_config_t){
            .pin_bit_mask = (1ULL << GPIO_PINS[i]),
            .mode = GPIO_MODE_INPUT,
            .intr_type = GPIO_INTR_DISABLE,
            .pull_up_en = GPIO_PULLUP_DISABLE,
            .pull_down_en = GPIO_PULLDOWN_ENABLE,  // Enable pulldown resistors
        };
        gpio_config(&io_conf);
    }

    uint8_t k = 0;

    while (1)
    {
        
        REG_WRITE(GPIO_OUT_W1TS_REG, (1 << GPIO_CLOCK));
        data_array[k] = REG_READ(GPIO_IN_REG);
        data_array[k + 1] = REG_READ(GPIO_IN1_REG);
        k = (k + 2) % NUM_CYCLES;
        REG_WRITE(GPIO_OUT_W1TC_REG, (1 << GPIO_CLOCK));
        
    }
    #ifdef CONFIG_EXAMPLE_ENABLE_PROXY
    esp_transport_destroy(parent);
    #endif
}
PC TCP server code:

Code: Select all

import socket
import struct
import concurrent.futures
import numpy as np

TARGET_IP = "169.254.157.27"
TARGET_PORT = 5190
NUM_ELEMENTS = 64
BUFFER_SIZE = 4 * NUM_ELEMENTS
NUM_WORKERS = 4
NUM_TO_PRINT = 32
BITS_PER_ELEMENT = 32

def process_data_chunk(data):
    REG0, REG1 = data

    mask = np.array([0x01, 0x01 << 1, 0x01 << 2, 0x01 << 3, 0x01 << 4, 0x01 << 5, 0x01 << 6, 0x01 << 7, 0x01 << 8, 0x01 << 9, 0x01 << 10])
    xor_mask = np.array([0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0])

    result_array = (
        ((REG0 >> 12) & mask[0]) |
        ((REG0 >> 13) & mask[1]) |
        ((REG0 >> 2) & mask[2]) |
        ((REG0 >> 4) & mask[3]) |
        (((REG0 >> 16) & mask[0]) ^ xor_mask[4]) |
        ((REG0 >> 17) & mask[5]) |
        ((REG1 >> 0) & mask[6]) |
        ((REG1 >> 1) & mask[7]) |
        ((REG1 >> 2) & mask[8]) |
        ((REG1 >> 3) & mask[9]) |
        ((REG1 >> 4) & mask[10]) |
        ((REG1 >> 7) & mask[0])
    )

    return np.expand_dims(result_array, axis=0)

def process_data(data):
    with concurrent.futures.ProcessPoolExecutor(max_workers=NUM_WORKERS) as executor:
        chunks = [(REG0, REG1) for REG0, REG1 in struct.iter_unpack("<II", data)]
        result_chunks = list(executor.map(process_data_chunk, chunks))

    result_array = np.concatenate(result_chunks)

    # Find the index of the maximum value in the resulting array
    max_index = np.argmax(result_array)

    # Determine the indices for the cyclical buffer without changing the order
    indices = np.arange(max_index, max_index + NUM_TO_PRINT) % len(result_array)

    # Extract the cyclical buffer of 32 elements
    cyclical_buffer = result_array[indices]

    return cyclical_buffer

def print_binary_chunks(data, bits_per_element):
    for element in data:
        binary_str = f'{element:0{bits_per_element}b}'
        print(" ".join(binary_str[i:i+8] for i in range(0, bits_per_element, 8)))

def receive_data():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    try:
        s.bind((TARGET_IP, TARGET_PORT))
        s.listen()
        print(f"Server listening on {TARGET_IP}:{TARGET_PORT}")

        while True:
            conn, addr = s.accept()
            print(f"Connected by {addr}")

            try:
                buffer = bytearray(BUFFER_SIZE)

                while True:
                    additional_data = conn.recv_into(buffer)
                    if additional_data == 0:
                        # Handle connection closed by the client
                        print("Connection closed by the client.")
                        break

                    # Print raw received data in binary format
                    print("Raw received data (binary):")
                    print_binary_chunks(struct.unpack(f"<{NUM_ELEMENTS}I", buffer), BITS_PER_ELEMENT)

                    # Process and print data in real-time
                    try:
                        cyclical_buffer = process_data(buffer)
                        print("Cyclical buffer of 32 elements:")
                        print(cyclical_buffer)
                    except Exception as process_data_error:
                        print(f"Error during processing data: {process_data_error}")

            except Exception as receive_error:
                print(f"Error during data reception: {receive_error}")

            finally:
                conn.close()

    except Exception as bind_error:
        print(f"Error during socket bind: {bind_error}")

    finally:
        s.close()

if __name__ == "__main__":
    try:
        receive_data()
    except KeyboardInterrupt:
        print("Server shutting down gracefully.")
RAW data (that PC receives from the ESP32 via Ethernet) logs:
Connected by ('169.254.116.90', 52901)
Raw received data (binary):
10001010 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10011011 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10011011 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10011110 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10001011 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10011110 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10001010 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10011011 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10011110 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10011111 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10001111 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10011110 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10001111 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10011010 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10001010 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10011111 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10001010 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10001111 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10001011 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10011110 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10001011 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10011110 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10011010 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10011111 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10011010 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10011011 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10011011 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10011010 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10011111 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10001110 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10001110 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10011111 00111010 00000101 00000000
00000000 00000000 00000000 00000000
Cyclical buffer of 32 elements:
[1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1]
Connection closed by the client.
Connected by ('169.254.116.90', 52902)
Raw received data (binary):
10001110 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10001111 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10011110 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10011011 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10011011 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10011110 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10001111 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10001011 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10011110 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10011111 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10001010 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10001010 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10001111 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10001010 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10011111 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10011111 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10011010 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10001011 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10011110 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10001010 00101010 00000101 00000000
00000000 00000000 00000000 00000000
10011011 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10001010 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10011111 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10011011 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10011110 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10001011 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10001010 00111010 00000101 00000000
00000000 00000000 00000000 00000000
10011110 00001010 00000101 00000000
00000000 00000000 00000000 00000000
10011111 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10011110 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10011111 00011010 00000101 00000000
00000000 00000000 00000000 00000000
10001111 00001010 00000101 00000000
00000000 00000000 00000000 00000000
Cyclical buffer of 32 elements:
[1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1]
Connection closed by the client.

Re: Potential data corruption using Ethernet on ESP32 WROOVER - E

Posted: Sat Dec 30, 2023 12:14 pm
by MicroController
Haven't quite spotted the actual issue yet, but

Code: Select all

    // Convert data to little-endian before sending
    for (size_t i = 0; i < num_elements; ++i) {
        data_array[i] = htonl(data_array[i]);
    }
does the opposite of what you want, converting (the ESP's little-endian) data to 'network byte order', which is big-endian.

Also,

Code: Select all

  result_array = (
        ((REG0 >> 12) & mask[0]) |
        ((REG0 >> 13) & mask[1]) |
        ((REG0 >> 2) & mask[2]) |
        ((REG0 >> 4) & mask[3]) |
        (((REG0 >> 16) & mask[0]) ^ xor_mask[4]) |
        ((REG0 >> 17) & mask[5]) |
        ((REG1 >> 0) & mask[6]) |
        ((REG1 >> 1) & mask[7]) |
        ((REG1 >> 2) & mask[8]) |
        ((REG1 >> 3) & mask[9]) |
        ((REG1 >> 4) & mask[10]) |
        ((REG1 >> 7) & mask[0])
    )
doesn't work; the shifts and the mask values don't match. You may want to try it like

Code: Select all

  result_array = (
        (((REG0 >> 12) & 1) << 0) |
        (((REG0 >> 13) & 1) << 1) |
        (((REG0 >> 2) & 1) << 2) |
        (((REG0 >> 4) & 1) << 3) |
        ((((REG0 >> 16) & 1) ^ 1) << 4) | ...
Then, I'm not sure that

Code: Select all

    with concurrent.futures.ProcessPoolExecutor(max_workers=NUM_WORKERS) as executor:
        chunks = [(REG0, REG1) for REG0, REG1 in struct.iter_unpack("<II", data)]
        result_chunks = list(executor.map(process_data_chunk, chunks))
is correct. I am sure however that process_data_chunk() runs so quickly that the overhead of parallelizing is much greater than any potential benefit, and infinitely more so when first spawning new processes to do it.

Re: Potential data corruption using Ethernet on ESP32 WROOVER - E

Posted: Tue Jan 02, 2024 11:31 am
by Vilius
Ok, your ideas got me going. Thank you so much, I solved my problem