BehaviorTree
Core Library to create and execute Behavior Trees
Loading...
Searching...
No Matches
timer_queue.h
1#pragma once
2
3#include <atomic>
4#include <chrono>
5#include <condition_variable>
6#include <functional>
7#include <mutex>
8#include <queue>
9#include <thread>
10
11#include <assert.h>
12
13namespace BT
14{
15// http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
16
17namespace details
18{
19class Semaphore
20{
21public:
22 Semaphore(unsigned int count = 0) : m_count(count)
23 {}
24
25 void notify()
26 {
27 m_count.fetch_add(1);
28 m_cv.notify_one();
29 }
30
31 template <class Clock, class Duration>
32 bool waitUntil(const std::chrono::time_point<Clock, Duration>& point)
33 {
34 std::unique_lock<std::mutex> lock(m_mtx);
35 if(!m_cv.wait_until(lock, point, [this]() { return m_count > 0 || m_unlock; }))
36 {
37 return false;
38 }
39 // Only decrement if there is a real count. If we woke because of manualUnlock,
40 // m_count may be zero and we must not decrement it.
41 if(m_count > 0)
42 {
43 m_count.fetch_sub(1);
44 }
45 // Clear the manual unlock flag
46 m_unlock = false;
47
48 return true;
49 }
50
51 void manualUnlock()
52 {
53 m_unlock = true;
54 m_cv.notify_one();
55 }
56
57private:
58 std::mutex m_mtx;
59 std::condition_variable m_cv;
60 std::atomic_uint m_count = 0;
61 std::atomic_bool m_unlock = false;
62};
63} // namespace details
64
65// Timer Queue
66//
67// Allows execution of handlers at a specified time in the future
68// Guarantees:
69// - All handlers are executed ONCE, even if canceled (aborted parameter will
70//be set to true)
71// - If TimerQueue is destroyed, it will cancel all handlers.
72// - Handlers are ALWAYS executed in the Timer Queue worker thread.
73// - Handlers execution order is NOT guaranteed
74//
75template <typename ClockT = std::chrono::steady_clock,
76 typename DurationT = std::chrono::steady_clock::duration>
77class TimerQueue
78{
79public:
80 TimerQueue()
81 {
82 m_finish.store(false);
83 m_thread = std::thread([this]() { run(); });
84 }
85
86 ~TimerQueue()
87 {
88 m_finish.store(true);
89 cancelAll();
90 if(m_thread.joinable())
91 {
92 m_thread.join();
93 }
94 }
95
96 //! Adds a new timer
97 // \return
98 // Returns the ID of the new timer. You can use this ID to cancel the
99 // timer
100 uint64_t add(std::chrono::milliseconds milliseconds, std::function<void(bool)> handler)
101 {
102 WorkItem item;
103 item.end = ClockT::now() + milliseconds;
104 item.handler = std::move(handler);
105
106 std::unique_lock<std::mutex> lk(m_mtx);
107 uint64_t id = ++m_idcounter;
108 item.id = id;
109 m_items.push(std::move(item));
110 lk.unlock();
111
112 // Something changed, so wake up timer thread
113 m_checkWork.notify();
114 return id;
115 }
116
117 //! Cancels the specified timer
118 // \return
119 // 1 if the timer was cancelled.
120 // 0 if you were too late to cancel (or the timer ID was never valid to
121 // start with)
123 {
124 // Instead of removing the item from the container (thus breaking the
125 // heap integrity), we set the item as having no handler, and put
126 // that handler on a new item at the top for immediate execution
127 // The timer thread will then ignore the original item, since it has no
128 // handler.
129 std::unique_lock<std::mutex> lk(m_mtx);
130 for(auto&& item : m_items.getContainer())
131 {
132 if(item.id == id && item.handler)
133 {
134 WorkItem newItem;
135 // Zero time, so it stays at the top for immediate execution
136 newItem.end = std::chrono::time_point<ClockT, DurationT>();
137 newItem.id = 0; // Means it is a canceled item
138 // Move the handler from item to newItem.
139 // Also, we need to manually set the handler to nullptr, since
140 // the standard does not guarantee moving an std::function will
141 // empty it. Some STL implementation will empty it, others will
142 // not.
143 newItem.handler = std::move(item.handler);
144 item.handler = nullptr;
145 m_items.push(std::move(newItem));
146
147 lk.unlock();
148 // Something changed, so wake up timer thread
149 m_checkWork.notify();
150 return 1;
151 }
152 }
153 return 0;
154 }
155
156 //! Cancels all timers
157 // \return
158 // The number of timers cancelled
160 {
161 // Setting all "end" to 0 (for immediate execution) is ok,
162 // since it maintains the heap integrity
163 std::unique_lock<std::mutex> lk(m_mtx);
164 for(auto&& item : m_items.getContainer())
165 {
166 if(item.id)
167 {
168 item.end = std::chrono::time_point<ClockT, DurationT>();
169 item.id = 0;
170 }
171 }
172 auto ret = m_items.size();
173
174 lk.unlock();
175 m_checkWork.notify();
176 return ret;
177 }
178
179 TimerQueue(const TimerQueue&) = delete;
180 TimerQueue& operator=(const TimerQueue&) = delete;
181 TimerQueue(TimerQueue&&) = delete;
182 TimerQueue& operator=(TimerQueue&&) = delete;
183
184private:
185 void run()
186 {
187 while(!m_finish.load())
188 {
189 auto end = calcWaitTime();
190 if(end.first)
191 {
192 // Timers found, so wait until it expires (or something else
193 // changes)
194 m_checkWork.waitUntil(end.second);
195 }
196 else
197 {
198 // No timers exist, so wait an arbitrary amount of time
199 m_checkWork.waitUntil(ClockT::now() + std::chrono::milliseconds(10));
200 }
201
202 // Check and execute as much work as possible, such as, all expired
203 // timers
204 checkWork();
205 }
206
207 // If we are shutting down, we should not have any items left,
208 // since the shutdown cancels all items
209 assert(m_items.size() == 0);
210 }
211
212 std::pair<bool, std::chrono::time_point<ClockT, DurationT>> calcWaitTime()
213 {
214 std::lock_guard<std::mutex> lk(m_mtx);
215 while(m_items.size())
216 {
217 if(m_items.top().handler)
218 {
219 // Item present, so return the new wait time
220 return std::make_pair(true, m_items.top().end);
221 }
222 else
223 {
224 // Discard empty handlers (they were cancelled)
225 m_items.pop();
226 }
227 }
228
229 // No items found, so return no wait time (causes the thread to wait
230 // indefinitely)
231 return std::make_pair(false, std::chrono::time_point<ClockT, DurationT>());
232 }
233
234 void checkWork()
235 {
236 std::unique_lock<std::mutex> lk(m_mtx);
237 while(m_items.size() && m_items.top().end <= ClockT::now())
238 {
239 WorkItem item(std::move(m_items.top()));
240 m_items.pop();
241
242 lk.unlock();
243 if(item.handler)
244 {
245 item.handler(item.id == 0);
246 }
247 lk.lock();
248 }
249 }
250
251 details::Semaphore m_checkWork;
252 std::thread m_thread;
253 std::atomic_bool m_finish = false;
254 uint64_t m_idcounter = 0;
255
256 struct WorkItem
257 {
258 std::chrono::time_point<ClockT, DurationT> end;
259 uint64_t id = 0; // id==0 means it was cancelled
260 std::function<void(bool)> handler;
261 bool operator>(const WorkItem& other) const
262 {
263 return end > other.end;
264 }
265 };
266
267 std::mutex m_mtx;
268 // Inheriting from priority_queue, so we can access the internal container
269 class Queue
270 : public std::priority_queue<WorkItem, std::vector<WorkItem>, std::greater<WorkItem>>
271 {
272 public:
273 std::vector<WorkItem>& getContainer()
274 {
275 return this->c;
276 }
277 } m_items;
278};
279} // namespace BT
Definition: timer_queue.h:78
size_t cancelAll()
Cancels all timers.
Definition: timer_queue.h:159
size_t cancel(uint64_t id)
Cancels the specified timer.
Definition: timer_queue.h:122
uint64_t add(std::chrono::milliseconds milliseconds, std::function< void(bool)> handler)
Adds a new timer.
Definition: timer_queue.h:100
Definition: timer_queue.h:20
The SwitchNode is equivalent to a switch statement, where a certain branch (child) is executed accord...
Definition: basic_types.h:515
Definition: action_node.h:24