Watchdog triggered trying to implement threadpool (c++)

rajatmo
Posts: 2
Joined: Tue Jan 03, 2023 8:55 pm

Watchdog triggered trying to implement threadpool (c++)

Postby rajatmo » Tue Jan 03, 2023 9:26 pm

I was trying to make a primitive threadpool for some applications following the lines of the book "Cplusplus Concurrency In Action Practical Multithreading". I am using c++ instead of c because that is what I am comfortable with. Even though this very same code ( with std::cout instead of ESP_LOGI() ) runs fine with my desktop, it crashes because of watchdog timeout in NodeMCU esp32s. I thought the part
  1. if (work_queue_.try_pop(task))
  2. {
  3.     task();
  4. }
  5. else
  6. {
  7.     std::this_thread::yield();
  8. }
yeilding the thread was enough for the wdt to not crash the program. Can anyone please tell me what might be causing this? Thanks in advance :)

I am pasting the whole project here.

My threadsafe queue:

  1. #pragma once
  2.  
  3. #include <memory>
  4. #include <mutex>
  5. #include <queue>
  6. #include <condition_variable>
  7.  
  8.  
  9. template <typename T>
  10.     class tsqueue
  11.     {
  12.         mutable std::mutex mutex_;
  13.         std::queue<std::shared_ptr<T>> queue_;
  14.         std::condition_variable data_cond_;
  15.     public:
  16.         tsqueue()
  17.         {
  18.        
  19.         }
  20.    
  21.         void push(T new_value)
  22.         {
  23.             std::shared_ptr<T> data(std::make_shared<T>(std::move(new_value)));
  24.             std::lock_guard<std::mutex> lk(mutex_);
  25.             queue_.push(data);
  26.             data_cond_.notify_one();
  27.         }
  28.  
  29.         std::shared_ptr<T> wait_and_pop(T& value)
  30.         {
  31.             std::unique_lock<std::mutex> lock_(mutex_);
  32.             data_cond_.wait(lock_, [this] {return !queue_.empty(); });
  33.             std::shared_ptr<T> res(std::make_shared<T>(std::move(queue_.front())));
  34.             queue_.pop();
  35.             return res;
  36.         }
  37.  
  38.         bool try_pop(T& value)
  39.         {
  40.             std::lock_guard<std::mutex> lk(mutex_);
  41.             if (queue_.empty())
  42.                 return false;
  43.             value = std::move(*queue_.front());
  44.             queue_.pop();
  45.             return true;
  46.         }
  47.  
  48.         std::shared_ptr<T> try_pop()
  49.         {
  50.             std::lock_guard<std::mutex> lk(mutex_);
  51.             if (queue_.empty())
  52.             {
  53.                 return std::shared_ptr<T>();
  54.             }
  55.             std::shared_ptr<T> res(std::make_shared<T>(std::move(queue_.front())));
  56.             queue_.pop();
  57.             return res;
  58.         }
  59.  
  60.         bool empty() const
  61.         {
  62.             std::lock_guard<std::mutex> lk(mutex_);
  63.             return queue_.empty();
  64.         }
  65.  
  66.     };

My threadpool:

  1. #pragma once
  2.  
  3. #include <atomic>
  4. #include <vector>
  5. #include <functional>
  6. #include <thread>
  7.  
  8. #include "tsqueue.h"
  9.  
  10. class join_threads
  11. {
  12.     std::vector<std::thread>& threads_;
  13. public:
  14.     explicit join_threads(std::vector<std::thread>& threads)
  15.         : threads_(threads) {}
  16.     ~join_threads()
  17.     {
  18.         for (auto& thread : threads_)
  19.         {
  20.             if (thread.joinable())
  21.             {
  22.                 thread.join();
  23.             }
  24.         }
  25.     }
  26. };
  27.  
  28. class threadpool
  29. {
  30.     std::atomic_bool done_;
  31.     tsqueue<std::function<void()>> work_queue_;
  32.     std::vector<std::thread> threads_;
  33.     join_threads joiner_;
  34.  
  35.     void worker_thread()
  36.     {
  37.         while (!done_)
  38.         {
  39.             std::function<void()> task;
  40.  
  41.             if (work_queue_.try_pop(task))
  42.             {
  43.                 task();
  44.             }
  45.             else
  46.             {
  47.                 std::this_thread::yield();
  48.             }
  49.         }
  50.     }
  51.  
  52. public:
  53.     threadpool()
  54.         : done_(false)
  55.         , joiner_(threads_)
  56.     {
  57.         unsigned const thread_count = 2;
  58.  
  59.         try
  60.         {
  61.             for (unsigned i = 0; i < thread_count; i++)
  62.             {
  63.                 threads_.emplace_back(&threadpool::worker_thread, this);
  64.             }
  65.         }
  66.         catch (...)
  67.         {
  68.             done_ = true;
  69.             throw;
  70.         }
  71.     }
  72.  
  73.     ~threadpool()
  74.     {
  75.         done_ = true;
  76.     }
  77.  
  78.     template<typename FunctionType>
  79.         void submit(FunctionType f)
  80.         {
  81.             work_queue_.push(std::function<void()>(f));
  82.         }
  83.  
  84. };

My main.cpp:

  1. #include <chrono>
  2. #include <sstream>
  3. #include <esp_log.h>
  4. #include <freertos/FreeRTOS.h>
  5. #include <freertos/task.h>
  6. #include "ThreadPool.h"
  7.  
  8.  
  9. const auto sleep_time = std::chrono::seconds { 5 };
  10.  
  11. void print_thread_info(const char *extra = nullptr)
  12. {
  13.     std::stringstream ss;
  14.     if (extra) {
  15.         ss << extra;
  16.     }
  17.     ss << "Core id: " << xPortGetCoreID()
  18.        << ", prio: " << uxTaskPriorityGet(nullptr)
  19.        << ", minimum free stack: " << uxTaskGetStackHighWaterMark(nullptr) << " bytes.";
  20.     ESP_LOGI(pcTaskGetTaskName(nullptr), "%s", ss.str().c_str());
  21. }
  22.  
  23.  
  24. extern "C" void app_main(void)
  25. {
  26.     threadpool pool;
  27.     for (int i = 0; i < 2; i++)
  28.     {
  29.         pool.submit([=]
  30.         {
  31.             print_thread_info();
  32.         });
  33.     }
  34.    
  35.     while (true)
  36.     {
  37.         std::this_thread::sleep_for(sleep_time);
  38.     }
  39.  
  40. }

The error:

rst:0x1 (POWERON_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT)
configsip: 0, SPIWP:0xee
clk_drv:0x00,q_drv:0x00,d_drv:0x00,cs0_drv:0x00,hd_drv:0x00,wp_drv:0x00
mode:DIO, clock div:2
load:0x3fff0030,len:6612
load:0x40078000,len:14788
load:0x40080400,len:3792
entry 0x40080694
I (27) boot: ESP-IDF v4.4.1-dirty 2nd stage bootloader
I (27) boot: compile time 01:55:03
I (27) boot: chip revision: 1
I (30) boot_comm: chip revision: 1, min. bootloader chip revision: 0
I (37) boot.esp32: SPI Speed : 40MHz
I (42) boot.esp32: SPI Mode : DIO
I (47) boot.esp32: SPI Flash Size : 4MB
I (51) boot: Enabling RNG early entropy source...
I (57) boot: Partition Table:
I (60) boot: ## Label Usage Type ST Offset Length
I (67) boot: 0 nvs WiFi data 01 02 00009000 00006000
I (75) boot: 1 phy_init RF data 01 01 0000f000 00001000
I (82) boot: 2 factory factory app 00 00 00010000 00100000
I (90) boot: End of partition table
I (94) boot_comm: chip revision: 1, min. application chip revision: 0
I (101) esp_image: segment 0: paddr=00010020 vaddr=3f400020 size=16104h ( 90372) map
I (142) esp_image: segment 1: paddr=0002612c vaddr=3ffb0000 size=02364h ( 9060) load
I (146) esp_image: segment 2: paddr=00028498 vaddr=40080000 size=07b80h ( 31616) load
I (161) esp_image: segment 3: paddr=00030020 vaddr=400d0020 size=18e90h (102032) map
I (198) esp_image: segment 4: paddr=00048eb8 vaddr=40087b80 size=03c4ch ( 15436) load
I (205) esp_image: segment 5: paddr=0004cb0c vaddr=50000000 size=00010h ( 16) load
I (211) boot: Loaded app from partition at offset 0x10000
I (211) boot: Disabling RNG early entropy source...
I (227) cpu_start: Pro cpu up.
I (227) cpu_start: Starting app cpu, entry point is 0x400810fc
I (0) cpu_start: App cpu up.
I (241) cpu_start: Pro cpu start user code
I (241) cpu_start: cpu freq: 160000000
I (242) cpu_start: Application information:
I (246) cpu_start: Project name: ThreadPool
I (251) cpu_start: App version: 1
I (256) cpu_start: Compile time: Jan 4 2023 01:54:37
I (262) cpu_start: ELF file SHA256: a5e0f37c0d142cc5...
I (268) cpu_start: ESP-IDF: v4.4.1-dirty
I (273) heap_init: Initializing. RAM available for dynamic allocation:
I (280) heap_init: At 3FFAE6E0 len 00001920 (6 KiB): DRAM
I (286) heap_init: At 3FFB2DF8 len 0002D208 (180 KiB): DRAM
I (293) heap_init: At 3FFE0440 len 00003AE0 (14 KiB): D/IRAM
I (299) heap_init: At 3FFE4350 len 0001BCB0 (111 KiB): D/IRAM
I (305) heap_init: At 4008B7CC len 00014834 (82 KiB): IRAM
I (313) spi_flash: detected chip: generic
I (316) spi_flash: flash io: dio
I (322) cpu_start: Starting scheduler on PRO CPU.
I (0) cpu_start: Starting scheduler on APP CPU.
E (10331) task_wdt: Task watchdog got triggered. The following tasks did not reset the watchdog in time:
E (10331) task_wdt: - IDLE (CPU 0)
E (10331) task_wdt: Tasks currently running:
E (10331) task_wdt: CPU 0: pthread
E (10331) task_wdt: CPU 1: pthread
E (10331) task_wdt: Print CPU 0 (current core) backtrace


Backtrace:0x400D7957:0x3FFB0B900x400827F9:0x3FFB0BB0 0x40084229:0x3FFB79F0 0x400873B5:0x3FFB7A10 0x400D132D:0x3FFB7A30 0x400D5A9F:0x3FFB7A50 0x400E8299:0x3FFB7AA0 0x400D90BD:0x3FFB7AC0 0x400D1300:0x3FFB7AE0 0x40087F51:0x3FFB7B00

E (10331) task_wdt: Print CPU 1 backtrace


Backtrace:0x400841BD:0x3FFB11900x400827F9:0x3FFB11B0 0x4000BFED:0x3FFB8760 0x40087271:0x3FFB8770 0x40085DD9:0x3FFB8790 0x40081058:0x3FFB87D0 0x4008108A:0x3FFB87F0 0x400D59AE:0x3FFB8810 0x400E8299:0x3FFB8860 0x400D90BD:0x3FFB8880 0x400D1300:0x3FFB88A0 0x40087F51:0x3FFB88C0

E (15331) task_wdt: Task watchdog got triggered. The following tasks did not reset the watchdog in time:
E (15331) task_wdt: - IDLE (CPU 0)
E (15331) task_wdt: Tasks currently running:
E (15331) task_wdt: CPU 0: pthread
E (15331) task_wdt: CPU 1: pthread
E (15331) task_wdt: Print CPU 0 (current core) backtrace

rajatmo
Posts: 2
Joined: Tue Jan 03, 2023 8:55 pm

Re: Watchdog triggered trying to implement threadpool (c++)

Postby rajatmo » Wed Jan 04, 2023 6:54 pm

For anyone else in this situation: this problem was discussed in this thread : viewtopic.php?t=6649. yield() is different here than in desktop OS.

The modification that works for me uses condition_variable:
  1. #pragma once
  2. #include <atomic>
  3. #include <vector>
  4. #include <functional>
  5. #include <thread>
  6. #include <mutex>
  7. #include <condition_variable>
  8.  
  9. #include "tsqueue.h"
  10.  
  11. class join_threads
  12. {
  13.     std::vector<std::thread>& threads_;
  14. public:
  15.     explicit join_threads(std::vector<std::thread>& threads)
  16.         : threads_(threads) {}
  17.     ~join_threads()
  18.     {
  19.         for (auto& thread : threads_)
  20.         {
  21.             if (thread.joinable())
  22.             {
  23.                 thread.join();
  24.             }
  25.         }
  26.     }
  27. };
  28.  
  29. class threadpool
  30. {
  31.     unsigned thread_count;
  32.     std::atomic_bool done_;
  33.     std::mutex mutex_;
  34.     std::condition_variable condition_;
  35.     tsqueue<std::function<void()>> work_queue_;
  36.     std::vector<std::thread> threads_;
  37.     join_threads joiner_;
  38.  
  39.     void worker_thread()
  40.     {      
  41.         while (!done_)
  42.         {
  43.             std::function<void()> task;
  44.                        
  45.             if (work_queue_.try_pop(task))
  46.             {
  47.                 task();
  48.             }
  49.             else
  50.             {
  51.                 //Ask for main thread to tell me to work
  52.                 std::unique_lock<std::mutex> lock_(mutex_);
  53.                 condition_.wait(lock_, [this] {return !work_queue_.empty(); });
  54.             }
  55.         }
  56.     }
  57.  
  58. public:
  59.     threadpool(unsigned thread_count_ = 2) : thread_count(thread_count_), done_(false) , joiner_(threads_)
  60.     {
  61.         try
  62.         {
  63.             for (unsigned i = 0; i < thread_count; i++)
  64.             {
  65.                 threads_.emplace_back(&threadpool::worker_thread, this);
  66.             }
  67.         }
  68.         catch (...)
  69.         {
  70.             done_ = true;
  71.             throw;
  72.         }
  73.        
  74.     }
  75.  
  76.     ~threadpool()
  77.     {
  78.         done_ = true;
  79.     }
  80.  
  81.     template<typename FunctionType>
  82.         void submit(FunctionType f)
  83.         {
  84.             work_queue_.push(std::function<void()>(f));
  85.             //Notify worker thread to stop waiting
  86.             if(!work_queue_.empty())
  87.                 condition_.notify_one();
  88.         }
  89. };
Any additional suggestions are welcome.

Who is online

Users browsing this forum: Bing [Bot] and 127 guests