recv() does not give correct read value
Posted: Fri Mar 31, 2023 8:34 pm
Hi there,
At the moment i am developing a snapcast client for my ESP32. Snapcast is a multi room audio solution.
The Snapcast Server is sending a constant stream of 20ms raw pcm audio data, which is 3840 bytes.
4 bytes left and right audio multiplied by 48000/50, this is equal to 3840bytes.
my Socket Read Function gives me back the numbers of bytes read. But every now and then my Socket Read Function does not give me back the correct number of bytes read.
My code is based on the esp-adf TCP Stream Reader Example. But i gues my problem is not esp-adf specific, so i post the question
in this forum. How can i ensure that recv() always give me back the correct amount of bytes.
the read buffer for recv is 4096 bytes and it is created when the task for snapcast_stream is initialized. The pointer to that buffer is then
passed as an argument to recv();
my read function.
my open socket function
my process function, which wich runs in a while(1) loop
my read function, wich is called in snapcast_process()
At the moment i am developing a snapcast client for my ESP32. Snapcast is a multi room audio solution.
The Snapcast Server is sending a constant stream of 20ms raw pcm audio data, which is 3840 bytes.
4 bytes left and right audio multiplied by 48000/50, this is equal to 3840bytes.
my Socket Read Function gives me back the numbers of bytes read. But every now and then my Socket Read Function does not give me back the correct number of bytes read.
My code is based on the esp-adf TCP Stream Reader Example. But i gues my problem is not esp-adf specific, so i post the question
in this forum. How can i ensure that recv() always give me back the correct amount of bytes.
the read buffer for recv is 4096 bytes and it is created when the task for snapcast_stream is initialized. The pointer to that buffer is then
passed as an argument to recv();
my read function.
- static int _snapcast_stream_socket_receive(const char *tag, const int sock, char * data, size_t max_len){
- int len = recv(sock, data, max_len, 0);
- // ESP_LOGI(tag, "[sock=%d]: MAX_LEN: %d", sock, max_len);
- if (len < 0) {
- if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) {
- return 0; // Not an error
- }
- if (errno == ENOTCONN) {
- ESP_LOGW(tag, "[sock=%d]: Connection closed", sock);
- return -2; // Socket has been disconnected
- }
- log_socket_error(tag, sock, errno, "Error occurred during receiving");
- return -1;
- }
- return len;
- }
- static esp_err_t _snapcast_open(audio_element_handle_t self)
- {
- AUDIO_NULL_CHECK(TAG, self, return ESP_FAIL);
- snapcast_stream_t *tcp = (snapcast_stream_t *)audio_element_getdata(self);
- char base_message_serialized[BASE_MESSAGE_SIZE];
- char *hello_message_serialized;
- int result;
- struct timeval now;
- int sockfd, connfd;
- struct sockaddr_in servaddr, cli;
- if (tcp->is_open) {
- ESP_LOGE(TAG, "Already opened");
- return ESP_FAIL;
- }
- ESP_LOGI(TAG, "Host is %s, port is %d\n", tcp->host, tcp->port);
- // assign IP, PORT
- servaddr.sin_family = AF_INET;
- servaddr.sin_addr.s_addr = inet_addr(tcp->host);
- servaddr.sin_port = htons(tcp->port);
- sockfd = socket(AF_INET, SOCK_STREAM, 0);
- if (connect(sockfd, (SA*)&servaddr, sizeof(servaddr))!= 0) {
- ESP_LOGE(TAG,"connection with the server failed...\n");
- return ESP_FAIL;
- }
- tcp->sock=sockfd;
- // esp_transport_handle_t t = esp_transport_tcp_init();
- // esp_transport_list_handle_t transport_list = esp_transport_list_init();
- // esp_transport_list_add(transport_list, t, "http");
- // AUDIO_NULL_CHECK(TAG, t, return ESP_FAIL);
- //tcp->sock = esp_transport_connect(t, tcp->host, tcp->port, SNAPCAST_CONNECT_TIMEOUT_MS);
- if (tcp->sock < 0) {
- _get_socket_error_code_reason(__func__, tcp->sock);
- return ESP_FAIL;
- }
- tcp->is_open = true;
- // tcp->t = t;
- tcp->base_message.type = SNAPCAST_MESSAGE_BASE; // default state, no current message
- tcp->base_message.sent.sec = 0;
- tcp->base_message.sent.usec = 0;
- tcp->base_message.received.sec = 0;
- tcp->base_message.received.usec = 0;
- tcp->received_header = false;
- tcp->last_sync.tv_sec = 0;
- tcp->last_sync.tv_usec = 0;
- tcp->id_counter = 0;
- tcp->time_message.latency.sec = 0;
- tcp->time_message.latency.usec = 0;
- result = gettimeofday(&now, NULL);
- if (result) {
- ESP_LOGI(TAG, "Failed to gettimeofday\r\n");
- return ESP_FAIL;
- }
- esp_read_mac(base_mac, ESP_MAC_WIFI_STA);
- sprintf(mac_address, "%02X:%02X:%02X:%02X:%02X:%02X", base_mac[0], base_mac[1], base_mac[2], base_mac[3], base_mac[4], base_mac[5]);
- ESP_LOGI("SNAPCAST_TCP", "%02X:%02X:%02X:%02X:%02X:%02X", base_mac[0], base_mac[1], base_mac[2], base_mac[3], base_mac[4], base_mac[5]);
- base_message_t base_message = {
- SNAPCAST_MESSAGE_HELLO, // type
- 0x0, // id
- 0x0, // refersTo
- { now.tv_sec, now.tv_usec }, // sent
- { 0x0, 0x0 }, // received
- 0x0, // size
- };
- hello_message_t hello_message = {
- mac_address,
- SNAPCAST_STREAM_CLIENT_NAME, // hostname
- "0.0.2", // client version
- "libsnapcast", // client name
- "esp32", // os name
- "xtensa", // arch
- 1, // instance
- mac_address, // id
- 2, // protocol version
- };
- hello_message_serialized = hello_message_serialize(&hello_message, (size_t *)&(base_message.size));
- if (!hello_message_serialized) {
- ESP_LOGI(TAG, "Failed to serialize hello message\r\b");
- return ESP_FAIL;
- }
- if (result) {
- ESP_LOGI(TAG, "Failed to serialize base message\r\n");
- return ESP_FAIL;
- }
- result=base_message_serialize(&base_message, base_message_serialized, BASE_MESSAGE_SIZE);
- if (result) {
- ESP_LOGI(TAG, "Failed to serialize base message\r\n");
- return ESP_FAIL;
- }
- if(_snapcast_stream_socket_send(TAG, sockfd, base_message_serialized, BASE_MESSAGE_SIZE)<0){
- free(hello_message_serialized);
- return ESP_FAIL;
- }
- if(_snapcast_stream_socket_send(TAG, sockfd, hello_message_serialized, base_message.size)<0){
- free(hello_message_serialized);
- return ESP_FAIL;
- }
- //_snapcast_dispatch_event(self, tcp, NULL, 0, SNAPCAST_STREAM_STATE_CONNECTED);
- free(hello_message_serialized);
- sntp_snapcast=tcp;
- // send_time_tm_handle = xTimerCreate( "snapclient_timer", 5000 / portTICK_RATE_MS, pdTRUE, NULL, send_time_timer_cb);
- // xTimerStart(send_time_tm_handle, 0);
- return ESP_OK;
- }
- static esp_err_t _snapcast_process(audio_element_handle_t self, char *in_buffer, int in_len){
- struct timeval now, tv2; //, last_time_sync;
- int result;
- int w_size = 1;
- int r_len = 1;
- int volume[] ={0, 0};
- //char *buffer;
- if (gettimeofday(&now, NULL)) {
- ESP_LOGI(TAG, "Failed to gettimeofday\r\n");
- }
- snapcast_stream_t *tcp = (snapcast_stream_t *)audio_element_getdata(self);
- r_len=_snapcast_read(self, in_buffer, BASE_MESSAGE_SIZE, tcp->timeout_ms, NULL);
- if(r_len > 0) {
- if(r_len > 4096){
- ESP_LOGI(TAG, "Failed Write Size %d", tcp->wire_chunk_message.size);
- r_len=4096;
- }else if(r_len < 0){
- return ESP_FAIL;
- }
- switch(tcp->base_message.type){
- case SNAPCAST_MESSAGE_CODEC_HEADER:
- ESP_LOGI(TAG, "SNAPCAST_MESSAGE_CODEC_HEADER");
- //tcp->bits.enabled=0;
- break;
- case SNAPCAST_MESSAGE_WIRE_CHUNK:
- //ESP_LOGI(TAG, "SNAPCAST_MESSAGE_WIRE_CHUNK");
- result=wire_chunk_message_deserialize(&(tcp->wire_chunk_message), in_buffer, 0);
- if(result){
- ESP_LOGI(TAG, "Failed to read wire chunks: %d", result);
- }
- //tcp->bits.enabled=1;
- //tcp->bits.new_wire_chunk=1;
- //buffer=in_buffer;
- //ESP_LOGI(TAG, "Message Wire Chunk Size=%d", tcp->wire_chunk_message.size);
- //ESP_LOGI(TAG, "Message Read Size =%d", r_len - 12);
- _snapcast_write(self, in_buffer+12, r_len - 12, 0x00, NULL);
- break;
- case SNAPCAST_MESSAGE_SERVER_SETTINGS:
- for(int x=4;x<tcp->base_message.size;x++){
- ESP_LOGI(TAG, "SNAPCAST_MESSAGE_SERVER_SETTINGS: Buffer [%d]=%c", x, in_buffer[x]);
- }
- result = server_settings_message_deserialize(&(tcp->server_settings_message), in_buffer+4);
- if (result) {
- ESP_LOGI(TAG, "Failed to read server settings: %d\r\n", result);
- }
- ESP_LOGI(TAG, "Buffer length: %d", tcp->server_settings_message.buffer_ms);
- ESP_LOGI(TAG, "Ringbuffer size:%d", tcp->server_settings_message.buffer_ms*48*4);
- ESP_LOGI(TAG, "Latency: %d", tcp->server_settings_message.latency);
- ESP_LOGI(TAG, "Mute: %d", tcp->server_settings_message.muted);
- ESP_LOGI(TAG, "Setting volume: %d", tcp->server_settings_message.volume);
- volume[0]=tcp->server_settings_message.volume;
- volume[1]=tcp->server_settings_message.muted;
- //_snapcast_dispatch_event(self, tcp, (void*)volume, 4, SNAPCAST_STREAM_STATE_SERVER_SETTINGS_MESAGE);
- break;
- case SNAPCAST_MESSAGE_TIME:
- //ESP_LOGI(TAG, "SNAPCAST_MESSAGE_TIME: Buffer Read=%d", tcp->base_message.size);
- result = time_message_deserialize(&(tcp->time_message), in_buffer, TIME_MESSAGE_SIZE);
- if (result) {
- ESP_LOGI(TAG, "Failed to deserialize time message\r\n");
- break;
- }
- tv2.tv_sec = tcp->base_message.received.sec;
- tv2.tv_usec= tcp->base_message.received.usec;
- timersub(&now,&tv2,&tcp->server_uptime);
- uint64_t server_uptime_ms = tcp->base_message.received.sec * 1000;
- server_uptime_ms += tcp->server_uptime.tv_usec / 1000;
- ESP_LOGI(TAG,"Server Up Time[ms]=%lld", server_uptime_ms);
- /* timersub(&now, &tcp->server_uptime, &tv1);
- time_ms=tv1.tv_sec * 1000;
- time_ms+=tv1.tv_usec/1000;
- ESP_LOGI(TAG_TIME, "Base Uptime %d.%3d", tcp->base_message.received.sec, tcp->base_message.received.usec/1000);
- ESP_LOGI(TAG_TIME, "Server Uptime %ld.%3ld", tv1.tv_sec, tv1.tv_usec/1000);
- ESP_LOGI(TAG_TIME, "Server Uptime %lldms", time_ms);*/
- //ESP_LOGI(TAG, "Setting volume: %d", tcp->server_settings_message.volume);
- break;
- case SNAPCAST_MESSAGE_STREAM_TAGS:
- ESP_LOGI(TAG, "SNAPCAST_MESSAGE_STREAM_TAGS ");
- break;
- }
- /*if(size>1){
- }*/
- }/* else {
- w_size = r_len;
- }*/
- memset(in_buffer, 0x00, 4096);
- return 1;
- }
- static esp_err_t _snapcast_read(audio_element_handle_t self, char *buffer, int len, TickType_t ticks_to_wait, void *context){
- int rlen = 0;
- int result = 0;
- char base_buffer[BASE_MESSAGE_SIZE];
- snapcast_stream_t *tcp = (snapcast_stream_t *)audio_element_getdata(self);
- _start:
- //rlen = esp_transport_read(tcp->t, buffer, BASE_MESSAGE_SIZE, tcp->timeout_ms);
- rlen = _snapcast_stream_socket_receive(TAG, tcp->sock, base_buffer, BASE_MESSAGE_SIZE);
- result = base_message_deserialize(&(tcp->base_message), base_buffer, BASE_MESSAGE_SIZE);
- //rlen = esp_transport_read(tcp->t, buffer, tcp->base_message.size, tcp->timeout_ms);
- rlen = _snapcast_stream_socket_receive(TAG, tcp->sock, buffer, tcp->base_message.size);
- if (result) {
- ESP_LOGW(TAG, "Failed to read base message: %d\r\n", result);
- return ESP_FAIL;
- }
- tcp->read_len=rlen;
- if (rlen < 0) {
- int result = _get_socket_error_code_reason(__func__, tcp->sock);
- if (result == 0) {
- ESP_LOGW(TAG, "TCP server actively closes the connection");
- return ESP_OK;
- }
- return ESP_FAIL;
- } else if (rlen == 0) {
- ESP_LOGI(TAG, "Get end of the file");
- // tcp->bits.new_wire_chunk=0;
- // tcp->bits.enabled=0;
- // tcp->bits.sync=0;
- // tcp->read_rb=snapcast_stream_ringbuffer_get_element(tcp->rb, SNAPCAST_MESSAGE_FIRST);
- // tcp->write_rb=snapcast_stream_ringbuffer_get_element(tcp->rb, SNAPCAST_MESSAGE_FIRST);
- //snapcast_stream_rinbuffer_reset(self);
- // audio_element_set_byte_pos(self, 0);
- // audio_element_set_ringbuf_done(self);
- // audio_element_reset_output_ringbuf(self);
- // _snapcast_dispatch_event(self, tcp, NULL, 0, SNAPCAST_STREAM_STATE_TCP_SOCKET_TIMEOUT_MESSAGE);
- goto _start;
- } else {
- //audio_element_update_byte_pos(self, rlen);
- }
- return rlen;
- }