Page 1 of 1

recv() does not give correct read value

Posted: Fri Mar 31, 2023 8:34 pm
by volksvorg
Hi there,

At the moment i am developing a snapcast client for my ESP32. Snapcast is a multi room audio solution.
The Snapcast Server is sending a constant stream of 20ms raw pcm audio data, which is 3840 bytes.
4 bytes left and right audio multiplied by 48000/50, this is equal to 3840bytes.
my Socket Read Function gives me back the numbers of bytes read. But every now and then my Socket Read Function does not give me back the correct number of bytes read.
My code is based on the esp-adf TCP Stream Reader Example. But i gues my problem is not esp-adf specific, so i post the question
in this forum. How can i ensure that recv() always give me back the correct amount of bytes.

the read buffer for recv is 4096 bytes and it is created when the task for snapcast_stream is initialized. The pointer to that buffer is then
passed as an argument to recv();

my read function.
  1.  
  2. static int _snapcast_stream_socket_receive(const char *tag, const int sock, char * data, size_t max_len){
  3.     int len = recv(sock, data, max_len, 0);
  4.    // ESP_LOGI(tag, "[sock=%d]: MAX_LEN: %d", sock, max_len);
  5.     if (len < 0) {
  6.         if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) {
  7.             return 0;   // Not an error
  8.         }
  9.         if (errno == ENOTCONN) {
  10.             ESP_LOGW(tag, "[sock=%d]: Connection closed", sock);
  11.             return -2;  // Socket has been disconnected
  12.         }
  13.         log_socket_error(tag, sock, errno, "Error occurred during receiving");
  14.         return -1;
  15.     }
  16.  
  17.     return len;
  18. }
  19.  
my open socket function
  1. static esp_err_t _snapcast_open(audio_element_handle_t self)
  2. {
  3.     AUDIO_NULL_CHECK(TAG, self, return ESP_FAIL);
  4.     snapcast_stream_t *tcp = (snapcast_stream_t *)audio_element_getdata(self);
  5.     char base_message_serialized[BASE_MESSAGE_SIZE];
  6.     char *hello_message_serialized;
  7.     int result;
  8.     struct timeval now;
  9.     int sockfd, connfd;
  10.     struct sockaddr_in servaddr, cli;
  11.  
  12.     if (tcp->is_open) {
  13.         ESP_LOGE(TAG, "Already opened");
  14.         return ESP_FAIL;
  15.     }
  16.     ESP_LOGI(TAG, "Host is %s, port is %d\n", tcp->host, tcp->port);
  17.  
  18.     // assign IP, PORT
  19.     servaddr.sin_family = AF_INET;
  20.     servaddr.sin_addr.s_addr = inet_addr(tcp->host);
  21.     servaddr.sin_port = htons(tcp->port);
  22.     sockfd = socket(AF_INET, SOCK_STREAM, 0);
  23.     if (connect(sockfd, (SA*)&servaddr, sizeof(servaddr))!= 0) {
  24.         ESP_LOGE(TAG,"connection with the server failed...\n");
  25.         return ESP_FAIL;
  26.     }
  27.     tcp->sock=sockfd;
  28.     // esp_transport_handle_t t = esp_transport_tcp_init();
  29.    // esp_transport_list_handle_t transport_list = esp_transport_list_init();
  30.    // esp_transport_list_add(transport_list, t, "http");
  31.    // AUDIO_NULL_CHECK(TAG, t, return ESP_FAIL);
  32.     //tcp->sock = esp_transport_connect(t, tcp->host, tcp->port, SNAPCAST_CONNECT_TIMEOUT_MS);
  33.     if (tcp->sock < 0) {
  34.         _get_socket_error_code_reason(__func__,  tcp->sock);
  35.         return ESP_FAIL;
  36.     }
  37.     tcp->is_open = true;
  38.    // tcp->t = t;
  39.     tcp->base_message.type = SNAPCAST_MESSAGE_BASE;  // default state, no current message
  40.     tcp->base_message.sent.sec = 0;
  41.     tcp->base_message.sent.usec = 0;
  42.     tcp->base_message.received.sec = 0;
  43.     tcp->base_message.received.usec = 0;
  44.     tcp->received_header = false;
  45.     tcp->last_sync.tv_sec = 0;
  46.     tcp->last_sync.tv_usec = 0;
  47.     tcp->id_counter = 0;
  48.     tcp->time_message.latency.sec = 0;
  49.     tcp->time_message.latency.usec = 0;
  50.  
  51.     result = gettimeofday(&now, NULL);
  52.     if (result) {
  53.         ESP_LOGI(TAG, "Failed to gettimeofday\r\n");
  54.         return ESP_FAIL;
  55.     }
  56.     esp_read_mac(base_mac, ESP_MAC_WIFI_STA);
  57.     sprintf(mac_address, "%02X:%02X:%02X:%02X:%02X:%02X", base_mac[0], base_mac[1], base_mac[2], base_mac[3], base_mac[4], base_mac[5]);
  58.     ESP_LOGI("SNAPCAST_TCP", "%02X:%02X:%02X:%02X:%02X:%02X", base_mac[0], base_mac[1], base_mac[2], base_mac[3], base_mac[4], base_mac[5]);
  59.  
  60.     base_message_t base_message = {
  61.         SNAPCAST_MESSAGE_HELLO,      // type
  62.         0x0,                         // id
  63.         0x0,                         // refersTo
  64.         { now.tv_sec, now.tv_usec }, // sent
  65.         { 0x0, 0x0 },                // received
  66.         0x0,                         // size
  67.     };
  68.  
  69.     hello_message_t hello_message = {
  70.         mac_address,
  71.         SNAPCAST_STREAM_CLIENT_NAME,  // hostname
  72.         "0.0.2",               // client version
  73.         "libsnapcast",         // client name
  74.         "esp32",               // os name
  75.         "xtensa",              // arch
  76.         1,                     // instance
  77.         mac_address,           // id
  78.         2,                     // protocol version
  79.     };
  80.     hello_message_serialized = hello_message_serialize(&hello_message, (size_t *)&(base_message.size));
  81.     if (!hello_message_serialized) {
  82.             ESP_LOGI(TAG, "Failed to serialize hello message\r\b");
  83.             return ESP_FAIL;
  84.     }
  85.     if (result) {
  86.         ESP_LOGI(TAG, "Failed to serialize base message\r\n");
  87.         return ESP_FAIL;
  88.     }
  89.     result=base_message_serialize(&base_message, base_message_serialized, BASE_MESSAGE_SIZE);
  90.     if (result) {
  91.         ESP_LOGI(TAG, "Failed to serialize base message\r\n");
  92.         return ESP_FAIL;
  93.     }
  94.     if(_snapcast_stream_socket_send(TAG, sockfd, base_message_serialized, BASE_MESSAGE_SIZE)<0){
  95.        free(hello_message_serialized);
  96.        return ESP_FAIL;
  97.     }
  98.     if(_snapcast_stream_socket_send(TAG, sockfd, hello_message_serialized, base_message.size)<0){
  99.        free(hello_message_serialized);
  100.        return ESP_FAIL;
  101.     }
  102.     //_snapcast_dispatch_event(self, tcp, NULL, 0, SNAPCAST_STREAM_STATE_CONNECTED);
  103.     free(hello_message_serialized);
  104.     sntp_snapcast=tcp;
  105.    // send_time_tm_handle = xTimerCreate( "snapclient_timer", 5000 / portTICK_RATE_MS, pdTRUE, NULL, send_time_timer_cb);
  106.    // xTimerStart(send_time_tm_handle, 0);
  107.     return ESP_OK;
  108. }
my process function, which wich runs in a while(1) loop
  1. static esp_err_t _snapcast_process(audio_element_handle_t self, char *in_buffer, int in_len){
  2.  
  3.     struct timeval now, tv2; //, last_time_sync;
  4.     int result;
  5.     int w_size = 1;
  6.     int r_len = 1;
  7.     int volume[] ={0, 0};
  8.     //char *buffer;
  9.     if (gettimeofday(&now, NULL)) {
  10.         ESP_LOGI(TAG, "Failed to gettimeofday\r\n");
  11.     }
  12.     snapcast_stream_t *tcp = (snapcast_stream_t *)audio_element_getdata(self);
  13.     r_len=_snapcast_read(self, in_buffer, BASE_MESSAGE_SIZE, tcp->timeout_ms, NULL);
  14.     if(r_len > 0) {
  15.         if(r_len > 4096){
  16.             ESP_LOGI(TAG, "Failed Write Size  %d", tcp->wire_chunk_message.size);
  17.             r_len=4096;
  18.         }else if(r_len < 0){
  19.             return ESP_FAIL;
  20.         }
  21.         switch(tcp->base_message.type){
  22.             case SNAPCAST_MESSAGE_CODEC_HEADER:
  23.                 ESP_LOGI(TAG, "SNAPCAST_MESSAGE_CODEC_HEADER");
  24.                 //tcp->bits.enabled=0;
  25.                 break;
  26.             case SNAPCAST_MESSAGE_WIRE_CHUNK:
  27.                 //ESP_LOGI(TAG, "SNAPCAST_MESSAGE_WIRE_CHUNK");
  28.                 result=wire_chunk_message_deserialize(&(tcp->wire_chunk_message), in_buffer, 0);
  29.                 if(result){
  30.                     ESP_LOGI(TAG, "Failed to read wire chunks: %d", result);
  31.                 }
  32.                 //tcp->bits.enabled=1;
  33.                 //tcp->bits.new_wire_chunk=1;
  34.  
  35.                 //buffer=in_buffer;
  36.                 //ESP_LOGI(TAG, "Message Wire Chunk Size=%d", tcp->wire_chunk_message.size);
  37.                 //ESP_LOGI(TAG, "Message Read Size      =%d", r_len - 12);
  38.                 _snapcast_write(self, in_buffer+12, r_len - 12, 0x00, NULL);
  39.                 break;
  40.             case SNAPCAST_MESSAGE_SERVER_SETTINGS:
  41.                 for(int x=4;x<tcp->base_message.size;x++){
  42.                     ESP_LOGI(TAG, "SNAPCAST_MESSAGE_SERVER_SETTINGS: Buffer [%d]=%c", x, in_buffer[x]);
  43.                 }
  44.                 result = server_settings_message_deserialize(&(tcp->server_settings_message), in_buffer+4);
  45.                 if (result) {
  46.                     ESP_LOGI(TAG, "Failed to read server settings: %d\r\n", result);
  47.                 }
  48.                 ESP_LOGI(TAG, "Buffer length:  %d", tcp->server_settings_message.buffer_ms);
  49.                 ESP_LOGI(TAG, "Ringbuffer size:%d", tcp->server_settings_message.buffer_ms*48*4);
  50.                 ESP_LOGI(TAG, "Latency:        %d", tcp->server_settings_message.latency);
  51.                 ESP_LOGI(TAG, "Mute:           %d", tcp->server_settings_message.muted);
  52.                 ESP_LOGI(TAG, "Setting volume: %d", tcp->server_settings_message.volume);
  53.                 volume[0]=tcp->server_settings_message.volume;
  54.                 volume[1]=tcp->server_settings_message.muted;
  55.                 //_snapcast_dispatch_event(self, tcp, (void*)volume, 4, SNAPCAST_STREAM_STATE_SERVER_SETTINGS_MESAGE);
  56.                 break;
  57.             case SNAPCAST_MESSAGE_TIME:
  58.                 //ESP_LOGI(TAG, "SNAPCAST_MESSAGE_TIME: Buffer Read=%d", tcp->base_message.size);
  59.                 result = time_message_deserialize(&(tcp->time_message), in_buffer, TIME_MESSAGE_SIZE);
  60.                 if (result) {
  61.                     ESP_LOGI(TAG, "Failed to deserialize time message\r\n");
  62.                     break;
  63.                 }
  64.                 tv2.tv_sec = tcp->base_message.received.sec;
  65.                 tv2.tv_usec= tcp->base_message.received.usec;
  66.                 timersub(&now,&tv2,&tcp->server_uptime);
  67.                 uint64_t server_uptime_ms = tcp->base_message.received.sec * 1000;
  68.                 server_uptime_ms += tcp->server_uptime.tv_usec / 1000;
  69.                 ESP_LOGI(TAG,"Server Up Time[ms]=%lld", server_uptime_ms);
  70.             /*  timersub(&now, &tcp->server_uptime, &tv1);
  71.  
  72.                 time_ms=tv1.tv_sec * 1000;
  73.                 time_ms+=tv1.tv_usec/1000;
  74.                 ESP_LOGI(TAG_TIME, "Base   Uptime %d.%3d", tcp->base_message.received.sec, tcp->base_message.received.usec/1000);
  75.                 ESP_LOGI(TAG_TIME, "Server Uptime %ld.%3ld", tv1.tv_sec, tv1.tv_usec/1000);
  76.                 ESP_LOGI(TAG_TIME, "Server Uptime %lldms", time_ms);*/
  77.  
  78.                 //ESP_LOGI(TAG, "Setting volume: %d", tcp->server_settings_message.volume);
  79.                 break;
  80.             case SNAPCAST_MESSAGE_STREAM_TAGS:
  81.                 ESP_LOGI(TAG, "SNAPCAST_MESSAGE_STREAM_TAGS ");
  82.                 break;
  83.             }
  84.  
  85.         /*if(size>1){
  86.  
  87.         }*/
  88.     }/* else {
  89.         w_size = r_len;
  90.     }*/
  91.     memset(in_buffer, 0x00, 4096);
  92.     return 1;
  93. }
my read function, wich is called in snapcast_process()
  1. static esp_err_t _snapcast_read(audio_element_handle_t self, char *buffer, int len, TickType_t ticks_to_wait, void *context){
  2.     int rlen = 0;
  3.     int result = 0;
  4.     char base_buffer[BASE_MESSAGE_SIZE];
  5.     snapcast_stream_t *tcp = (snapcast_stream_t *)audio_element_getdata(self);
  6.  
  7. _start:
  8.     //rlen = esp_transport_read(tcp->t, buffer, BASE_MESSAGE_SIZE, tcp->timeout_ms);
  9.     rlen = _snapcast_stream_socket_receive(TAG, tcp->sock, base_buffer, BASE_MESSAGE_SIZE);
  10.     result = base_message_deserialize(&(tcp->base_message), base_buffer, BASE_MESSAGE_SIZE);
  11.     //rlen = esp_transport_read(tcp->t, buffer, tcp->base_message.size, tcp->timeout_ms);
  12.     rlen = _snapcast_stream_socket_receive(TAG, tcp->sock, buffer, tcp->base_message.size);
  13.     if (result) {
  14.         ESP_LOGW(TAG, "Failed to read base message: %d\r\n", result);
  15.         return ESP_FAIL;
  16.     }
  17.     tcp->read_len=rlen;
  18.     if (rlen < 0) {
  19.         int result = _get_socket_error_code_reason(__func__, tcp->sock);
  20.         if (result == 0) {
  21.             ESP_LOGW(TAG, "TCP server actively closes the connection");
  22.             return ESP_OK;
  23.         }
  24.         return ESP_FAIL;
  25.     } else if (rlen == 0) {
  26.         ESP_LOGI(TAG, "Get end of the file");
  27.        // tcp->bits.new_wire_chunk=0;
  28.        // tcp->bits.enabled=0;
  29.        // tcp->bits.sync=0;
  30.       //  tcp->read_rb=snapcast_stream_ringbuffer_get_element(tcp->rb, SNAPCAST_MESSAGE_FIRST);
  31.       //  tcp->write_rb=snapcast_stream_ringbuffer_get_element(tcp->rb, SNAPCAST_MESSAGE_FIRST);
  32.         //snapcast_stream_rinbuffer_reset(self);
  33.       //  audio_element_set_byte_pos(self, 0);
  34.       //  audio_element_set_ringbuf_done(self);
  35.       //  audio_element_reset_output_ringbuf(self);
  36.        // _snapcast_dispatch_event(self, tcp, NULL, 0, SNAPCAST_STREAM_STATE_TCP_SOCKET_TIMEOUT_MESSAGE);
  37.         goto _start;
  38.     } else {
  39.         //audio_element_update_byte_pos(self, rlen);
  40.     }
  41.     return rlen;
  42. }

Re: recv() does not give correct read value

Posted: Sat Apr 01, 2023 1:56 am
by ESP_Sprite
You're working with a TCP stream. As far as I understand, TCP streams don't have a concept of 'packets': the TCP/IP stack is allowed to hack up your bytes in any order. All you're guaranteed is that your bytes arrive in the same order as they're sent.