[FreeRTOS] Blocked task until queue is fully filled

Arkaik
Posts: 13
Joined: Mon Jun 12, 2017 12:36 pm

[FreeRTOS] Blocked task until queue is fully filled

Postby Arkaik » Mon Jun 12, 2017 1:01 pm

Hi everyone, I recently got a esp32 dev kit c to try freRTOS programming.

I new with real-time systems and I have some problems representing how things works.

My objective is to sample data at 100Hz from an inertial measurement unit through i2c and publish them by pack of 100 through mqtt.

The sampling task is launched on hardware interrupt pin.

At my point I'm able to acquire data with a periodic task at high priority and to send those data into a structure through the queue.
Mongoose broker seems to work well as I'm able to publish my data.

Now I would like to wake up a task which would be in blocked state until the event "Queue is full".

I tried to implement it with event groups but I don't know how to generate an interrupt to call an ISR which whould notify my blocked task.

http://www.freertos.org/RTOS_Task_Notif ... Group.html
When I look at this link, I understand that vTxISR( void ) and vRxISR( void ) are called when an interrupt occurs on UART bus am I right?
Do you know if it's possible to create an interrupt sub-routine which would be called on my queue filled?

Here is my code :

Code: Select all

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "esp_wifi.h"
#include "esp_system.h"
#include "esp_event.h"
#include "esp_event_loop.h"
#include "esp_log.h"
#include "nvs_flash.h"
#include "driver/i2c.h"
#include "driver/gpio.h"
#include "mongoose.h"
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>
#include "lwip/inet.h"
#include "lwip/ip4_addr.h"
#include "esp_event.h"
#include "mpu.h"
#include "LSM9DS1_Reg.h"


// Defines for WiFi
#define SSID     "<ssid>"
#define PASSWORD "<password>"

//Define for publish event
#define PUBLISH_BIT 0x01


esp_err_t wifi_event_cb(void *ctx, system_event_t *event);
void mongooseTask(void *data);
void mongoose_event_cb(struct mg_connection *nc, int ev, void *evData);

char *mongoose_eventToString(int ev);
char *mgStrToStr(struct mg_str mgStr);

//Broker informations
const char *s_topic = "/test/slot1";
const char * s_serverAddr = "ip:port";
struct mg_mqtt_topic_expression s_topic_expr = {"/test/slot1", 0};
struct mg_connection * mqttConnection = NULL;

//Wifi network informations
ip4_addr_t net_ip;
ip4_addr_t net_gw;
ip4_addr_t net_msk;
bool b_isConnected = false;

//I2C Bus
i2c_port_t i2cBus = I2C_NUM_0;

//GPIO pins
int acqPin = GPIO_NUM_4;
int errPin = GPIO_NUM_16;

//Task handlers
static TaskHandle_t acqTask_handler = NULL;
static TaskHandle_t errTask_handler = NULL;
static TaskHandle_t publishTask_handler = NULL;

//Sample Queue
QueueHandle_t sampleQueue_handler = NULL;

//Queue message struct
struct sampleMessage
{
	char messageId;
	float gyrData[3];
	float accData[3];
};

//Sensors scale values
float accScale, gyrScale, magScale;


void acqTask()
{
	float acc_xyz[3];
	float gyr_xyz[3];
	static int sampleCount = 0;
	BaseType_t xHigherPriorityTaskWoken = pdFALSE;
	
	readGyr(i2cBus, gyrScale, &gyr_xyz[0]);
	readAcc(i2cBus, accScale, &acc_xyz[0]);
	
	struct sampleMessage msg;
	
	msg.messageId = sampleCount;
	memcpy(&msg.gyrData, gyr_xyz, sizeof(gyr_xyz));
	memcpy(&msg.accData, acc_xyz, sizeof(acc_xyz));
	
	if(sampleQueue_handler != 0)
	{
		// Send an unsigned long.  Wait for 10 ticks for space to become available if necessary.
		if(xQueueSend(sampleQueue_handler, (void *) &msg, (TickType_t) 10) != pdPASS)
		{
			printf("[Error] Failed to send message to queue\n");
		}
		else
		{
			sampleCount += 1;
		}
	}
	if(sampleCount == 99)
	{
		//Notify the publish task that the queue is filled by setting the PUBLISH_BIT in the task's notification value.
		xTaskNotifyFromISR(publishTask_handler, PUBLISH_BIT, eSetBits, &xHigherPriorityTaskWoken);
	}
	//portYIELD_FROM_ISR(xHigherPriorityTaskWoken);

	vTaskDelete(NULL);
}

void publishTask()
{
	const TickType_t xMaxBlockTime = pdMS_TO_TICKS(1000);
	BaseType_t xResult;
	uint32_t ulNotifiedValue;
	
	while(true)
	{
		//Wait for the event
		xResult = xTaskNotifyWait( pdFALSE,	// Don't clear bits on entry.
			ULONG_MAX,						// Clear all bits on exit.
			&ulNotifiedValue,				// Stores the notified value.
			xMaxBlockTime );
		
		if((xResult == pdPASS) && ((ulNotifiedValue & PUBLISH_BIT) != 0))
		{
			/*****     Publish     *****/
			printf("Publish\n");
		}
	}
}

void errTask()
{
	gpio_isr_handler_remove(acqPin);
	gpio_isr_handler_remove(errPin);
	
	printf("[Error] A sample has been lost\n\tExiting program...\n");
	
	vTaskDelete(NULL);
}

void gpio_isr_acq_handler(void * arg)
{
	xTaskCreate(&acqTask, "acqTask", 2048, NULL, configMAX_PRIORITIES-1, &acqTask_handler);
}

void gpio_isr_err_handler(void * arg)
{
	xTaskCreate(&errTask, "errTask", 2048, NULL, configMAX_PRIORITIES, &errTask_handler);
}

void app_main(void)
{
	ESP_ERROR_CHECK(esp_event_loop_init(wifi_event_cb, NULL));
	
	/////////////////////// Wifi connection //////////////////////
	nvs_flash_init();
	tcpip_adapter_init();	
	
	wifi_init_config_t wifiConfig = WIFI_INIT_CONFIG_DEFAULT();
	
	ESP_ERROR_CHECK(esp_wifi_init(&wifiConfig));
	ESP_ERROR_CHECK(esp_wifi_set_storage(WIFI_STORAGE_RAM));
	ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA));
	
	wifi_config_t sta_config = {
		.sta = 
		{
			.ssid = SSID,
			.password = PASSWORD,
			.bssid_set = 0
		}
	};
	ESP_ERROR_CHECK(esp_wifi_set_config(WIFI_IF_STA, &sta_config));
	ESP_ERROR_CHECK(esp_wifi_start());
	ESP_ERROR_CHECK(esp_wifi_connect());
	
	while(!b_isConnected)
	{}

	// GPIO config
	gpio_config_t io_conf;
	
	//Acq pin
	io_conf.intr_type = GPIO_PIN_INTR_POSEDGE;
	io_conf.pin_bit_mask = (1 << acqPin);
	io_conf.mode = GPIO_MODE_INPUT;
	io_conf.pull_up_en = 0;
	io_conf.pull_down_en = 1;
	
	gpio_config(&io_conf);
	
	//Err pin
	io_conf.intr_type = GPIO_PIN_INTR_POSEDGE;
	io_conf.pin_bit_mask = (1 << errPin);
	io_conf.mode = GPIO_MODE_INPUT;
	io_conf.pull_up_en = 0;
	io_conf.pull_down_en = 1;
	
	gpio_config(&io_conf);
	
	// I2C config
	i2c_master_init(i2cBus, 18, 19);
	
	accSetDataRate(i2cBus, A_ODR_119_HZ);
	accScale = accSetScale(i2cBus, A_SCALE_2G);
	accEnableOutputs(i2cBus);
	
	gyrSetDataRate(i2cBus, G_ODR_119_HZ);
	gyrScale = gyrSetScale(i2cBus, G_SCALE_245DPS);
	gyrEnableOutputs(i2cBus);
	
	enableMultipleByteAccess(i2cBus);
	
	setFifoThreshold(i2cBus, 2);
	enableFifoThreshold(i2cBus);
	setFifoMode(i2cBus, FIFO_MODE_CONTINUOUS);
	enableFifo(i2cBus);
	
	setInt1Source(i2cBus, INT1_SRC_FTH);
	setInt2Source(i2cBus, INT2_SRC_OVR);
	
	//Create communication queue
	sampleQueue_handler = xQueueCreate(100, sizeof(struct sampleMessage));
	
	//ISR callbacks
	gpio_install_isr_service(0);
	gpio_isr_handler_add(acqPin, gpio_isr_acq_handler, NULL);
	gpio_isr_handler_add(errPin, gpio_isr_err_handler, NULL);
	
	clearFifo(i2cBus);
}

esp_err_t wifi_event_cb(void *ctx, system_event_t *event)
{
	if(event->event_id == SYSTEM_EVENT_STA_GOT_IP)
	{
		net_ip = event->event_info.got_ip.ip_info.ip;
		net_gw = event->event_info.got_ip.ip_info.gw;
		net_msk = event->event_info.got_ip.ip_info.netmask;
		b_isConnected = true;
		
		printf("Connected to %s\n", SSID);
		printf("\tIP: %s\n", inet_ntoa(net_ip));
		printf("\tNetmask: %s\n", inet_ntoa(net_msk));
		printf("\tGateway: %s\n\n", inet_ntoa(net_gw));
		
		xTaskCreatePinnedToCore(&mongooseTask, "mongooseTask", 20000, NULL, 5, NULL,0);
	}
	return ESP_OK;
}

// FreeRTOS task to start Mongoose.
void mongooseTask(void *data)
{
	printf("Mongoose task starting\n");
	struct mg_mgr mgr;
	printf("Mongoose: Starting setup\n");
	mg_mgr_init(&mgr, NULL);
	printf("Mongoose: Succesfully inited\n");
	printf("Mongoose: Starting connection to %s\n", s_serverAddr);
	mqttConnection = mg_connect(&mgr, s_serverAddr, mongoose_event_cb);
	
	if (mqttConnection == NULL)
	{
		printf("Mongoose: Impossible to connect to %s\n", s_serverAddr);
		vTaskDelete(NULL);
		return;
	}
	printf("Mongoose: Successfully connected\n");
	
	mg_set_protocol_mqtt(mqttConnection);
	mg_send_mqtt_handshake(mqttConnection, "dummy");

	//mg_mqtt_publish(mqttConnection, "/test/slot1", 50, 0, "Test publish", 12);
	
	while(true)
	{
		mg_mgr_poll(&mgr, 1000);
	}
}

void mongoose_event_cb(struct mg_connection *nc, int ev, void *evData)
{
	//struct mg_mqtt_message *msg = (struct mg_mqtt_message *) evData;
	
	//printf("Mongoose event : %s\n", mongoose_eventToString(ev));
}

/**
 * Convert a Mongoose event type to a string.
 */
char *mongoose_eventToString(int ev)
{
	static char temp[100];
	switch (ev)
	{
		case MG_EV_CONNECT:
			return "MG_EV_CONNECT";
		case MG_EV_ACCEPT:
			return "MG_EV_ACCEPT";
		case MG_EV_CLOSE:
			return "MG_EV_CLOSE";
		case MG_EV_SEND:
			return "MG_EV_SEND";
		case MG_EV_RECV:
			return "MG_EV_RECV";
		case MG_EV_HTTP_REQUEST:
			return "MG_EV_HTTP_REQUEST";
		case MG_EV_HTTP_REPLY:
			return "MG_EV_HTTP_REPLY";
		case MG_EV_MQTT_CONNACK:
			return "MG_EV_MQTT_CONNACK";
		case MG_EV_MQTT_CONNACK_ACCEPTED:
			return "MG_EV_MQTT_CONNACK";
		case MG_EV_MQTT_CONNECT:
			return "MG_EV_MQTT_CONNECT";
		case MG_EV_MQTT_DISCONNECT:
			return "MG_EV_MQTT_DISCONNECT";
		case MG_EV_MQTT_PINGREQ:
			return "MG_EV_MQTT_PINGREQ";
		case MG_EV_MQTT_PINGRESP:
			return "MG_EV_MQTT_PINGRESP";
		case MG_EV_MQTT_PUBACK:
			return "MG_EV_MQTT_PUBACK";
		case MG_EV_MQTT_PUBCOMP:
			return "MG_EV_MQTT_PUBCOMP";
		case MG_EV_MQTT_PUBLISH:
			return "MG_EV_MQTT_PUBLISH";
		case MG_EV_MQTT_PUBREC:
			return "MG_EV_MQTT_PUBREC";
		case MG_EV_MQTT_PUBREL:
			return "MG_EV_MQTT_PUBREL";
		case MG_EV_MQTT_SUBACK:
			return "MG_EV_MQTT_SUBACK";
		case MG_EV_MQTT_SUBSCRIBE:
			return "MG_EV_MQTT_SUBSCRIBE";
		case MG_EV_MQTT_UNSUBACK:
			return "MG_EV_MQTT_UNSUBACK";
		case MG_EV_MQTT_UNSUBSCRIBE:
			return "MG_EV_MQTT_UNSUBSCRIBE";
		case MG_EV_WEBSOCKET_HANDSHAKE_REQUEST:
			return "MG_EV_WEBSOCKET_HANDSHAKE_REQUEST";
		case MG_EV_WEBSOCKET_HANDSHAKE_DONE:
			return "MG_EV_WEBSOCKET_HANDSHAKE_DONE";
		case MG_EV_WEBSOCKET_FRAME:
			return "MG_EV_WEBSOCKET_FRAME";
	}
	sprintf(temp, "Unknown event: %d", ev);
	return temp;
} //eventToString

// Convert a Mongoose string type to a string.
char *mgStrToStr(struct mg_str mgStr)
{
	char *retStr = (char *) malloc(mgStr.len + 1);
	memcpy(retStr, mgStr.p, mgStr.len);
	retStr[mgStr.len] = 0;
	return retStr;
}

When I execute it I have the following error : Untested FreeRTOS function xTaskNotifyFromISR

From what I understand the problem is that this function should be called from an ISR, is that right ?

Whatever, I think I miss some understanding around how freeRTOS works, I would be glad if someone could help me a little by giving my advise about how I could achieve my goal.

Thanks in advance ;)

kurtzweber
Posts: 64
Joined: Tue Jan 10, 2017 1:09 pm

Re: [FreeRTOS] Blocked task until queue is fully filled

Postby kurtzweber » Mon Jun 12, 2017 2:18 pm

Hi

if you want to syncronize two tasks you can do it using event bits without the need of an ISR... FreeRTOS indeed offers different methods to raise/clear event bits depending on you are inside an ISR or not:

http://www.freertos.org/FreeRTOS-Event-Groups.html

so in your example your "mqtt_send" task can wait for a specific bit:

Code: Select all

xEventGroupWaitBits(my_event_group, BUFFER_FULL_BIT, false, true, portMAX_DELAY);
and when your high-priority task detects that the buffer is full, it can set the bit:

Code: Select all

xEventGroupSetBits(my_event_group, BUFFER_FULL_BIT);
to "unblock" the mqtt_send task and have the buffer sent to the MQTT broker.

Hope I answered your question!

Arkaik
Posts: 13
Joined: Mon Jun 12, 2017 12:36 pm

Re: [FreeRTOS] Blocked task until queue is fully filled

Postby Arkaik » Mon Jun 12, 2017 3:43 pm

Thank you, that's exactly what I wanted to do ^^.

Just another question, about software architecture.

Finally my task will also concate and encode data before sending it throught mqtt. So maybe it's better to have the "mqtt_sending" task running for concatenation and encoding when the high-priority one is terminated?

I'm thinking about that because my average time for acquisition is 4ms and it's period is 10ms. I want to be sure I will never loose a sample so if the treatment for "mqtt_sending" task is long maybe it's better to divide it's execution on multiple parts?

I'm absolutely not sure about what I'm saying but I would like to understand well the pros/cons of each method.

Don't you think such an algorithm would be better?

Code: Select all

void acqTask()
{
	float acc_xyz[3];
	float gyr_xyz[3];
	static int sampleCount = 0;

	readGyr(i2cBus, gyrScale, &gyr_xyz[0]);
	readAcc(i2cBus, accScale, &acc_xyz[0]);

	struct sampleMessage msg;
	
	msg.messageId = sampleCount;
	memcpy(&msg.gyrData, gyr_xyz, sizeof(gyr_xyz));
	memcpy(&msg.accData, acc_xyz, sizeof(acc_xyz));
	
	if(sampleQueue_handler != 0)
	{
		//////////////// MUTEX LOCK ////////////////////
		// Send an unsigned long.  Wait for 10 ticks for space to become available if necessary.
		if(xQueueSend(sampleQueue_handler, (void *) &msg, (TickType_t) 10) != pdPASS)
		{
			/////////////// MUTEX UNLOCK ////////////////////
			printf("[Error] Failed to send message to queue\n");
		}
		else
		{
			/////////////// MUTEX UNLOCK ////////////////////
			sampleCount += 1;
		}
	}
	vTaskDelete(NULL);
}

void publishTask()
{
	char b64_message[2400];
	static int sampleCount = 0;
	
	struct sampleMessage receivedMessage;
	
	while(true)
	{
		if(sampleQueue_handler != 0)
		{
			//////////////// MUTEX LOCK ////////////////////
			// Receive a message on the created queue. Block indefinitely until a message is available.
			if(xQueueReceive(sampleQueue_handler, &receivedMessage, portMAX_DELAY))
			{
				/////////////// MUTEX UNLOCK ////////////////////
				//Add encoded message to the string to be published
				strcat(&b64_message[0], encodeMessage(receivedMessage));
				sampleCount += 1;
			}
			else
			{
				/////////////// MUTEX UNLOCK ////////////////////
				printf("[Error] Failed to receive message from queue\n");
			}
		}
		if(sampleCount == 99)
		{
			//publish()
			sampleCount = 0;
		}
	}
}
Thanks again for your help ;)

EDIT :

Finally I tried and my program is in deadLock ^^. The publish task has the mutex and is locked waiting for a message to append in the queue while the acquisition task cannnot take the mutex and so on append a message in the queue....

It raise another question (sorry I have a lot of them ^^). If I block on xQueueReceive for only 10 ticks for example, my acquisition task will also be blocked during those 10 ticks because it's waiting for the mutex, right?

Also because acquisition tasks are launched on sensor interrupt, it can have several acquisition tasks at the same time, how do i know if the next running acquisition task will be the first one that has been bloked because of the publish task possessing the mutex and not the one that is just newly created from the last interrupt?

Who is online

Users browsing this forum: Emirhanuzum and 88 guests