Fawkes API Fawkes Development Version
thread_manager.cpp
1
2/***************************************************************************
3 * thread_manager.cpp - Thread manager
4 *
5 * Created: Thu Nov 3 19:11:31 2006 (on train to Cologne)
6 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7 *
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version. A runtime exception applies to
14 * this software (see LICENSE.GPL_WRE file mentioned below for details).
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU Library General Public License for more details.
20 *
21 * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22 */
23
24#include <aspect/blocked_timing.h>
25#include <baseapp/thread_manager.h>
26#include <core/exceptions/software.h>
27#include <core/exceptions/system.h>
28#include <core/threading/mutex_locker.h>
29#include <core/threading/thread.h>
30#include <core/threading/thread_finalizer.h>
31#include <core/threading/thread_initializer.h>
32#include <core/threading/wait_condition.h>
33
34namespace fawkes {
35
36/** @class ThreadManager <baseapp/thread_manager.h>
37 * Base application thread manager.
38 * This class provides a manager for the threads. Threads are memorized by
39 * their wakeup hook. When the thread manager is deleted, all threads are
40 * appropriately cancelled, joined and deleted. Thus the thread manager
41 * can be used for "garbage collection" of threads.
42 *
43 * The thread manager allows easy wakeup of threads of a given wakeup hook.
44 *
45 * The thread manager needs a thread initializer. Each thread that is added
46 * to the thread manager is initialized with this. The runtime type information
47 * (RTTI) supplied by C++ can be used to initialize threads if appropriate
48 * (if the thread has certain aspects that need special treatment).
49 *
50 * @author Tim Niemueller
51 */
52
53/** Constructor.
54 * @param parent_manager parent thread manager
55 */
56ThreadManager::ThreadManagerAspectCollector::ThreadManagerAspectCollector(
57 ThreadManager *parent_manager)
58{
59 parent_manager_ = parent_manager;
60}
61
62void
63ThreadManager::ThreadManagerAspectCollector::add(ThreadList &tl)
64{
65 BlockedTimingAspect *timed_thread;
66
67 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
68 if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL) {
69 throw IllegalArgumentException(
70 "ThreadProducerAspect may not add threads with BlockedTimingAspect");
71 }
72 }
73
74 parent_manager_->add_maybelocked(tl, /* lock */ false);
75}
76
77void
78ThreadManager::ThreadManagerAspectCollector::add(Thread *t)
79{
80 BlockedTimingAspect *timed_thread;
81
82 if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL) {
83 throw IllegalArgumentException(
84 "ThreadProducerAspect may not add threads with BlockedTimingAspect");
85 }
86
87 parent_manager_->add_maybelocked(t, /* lock */ false);
88}
89
90void
91ThreadManager::ThreadManagerAspectCollector::remove(ThreadList &tl)
92{
93 BlockedTimingAspect *timed_thread;
94
95 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
96 if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL) {
97 throw IllegalArgumentException(
98 "ThreadProducerAspect may not remove threads with BlockedTimingAspect");
99 }
100 }
101
102 parent_manager_->remove_maybelocked(tl, /* lock */ false);
103}
104
105void
106ThreadManager::ThreadManagerAspectCollector::remove(Thread *t)
107{
108 BlockedTimingAspect *timed_thread;
109
110 if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL) {
111 throw IllegalArgumentException(
112 "ThreadProducerAspect may not remove threads with BlockedTimingAspect");
113 }
114
115 parent_manager_->remove_maybelocked(t, /* lock */ false);
116}
117
118void
119ThreadManager::ThreadManagerAspectCollector::force_remove(fawkes::ThreadList &tl)
120{
121 throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
122}
123
124void
125ThreadManager::ThreadManagerAspectCollector::force_remove(fawkes::Thread *t)
126{
127 throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
128}
129
130/** Constructor.
131 * When using this constructor you need to make sure to call set_inifin()
132 * before any thread is added.
133 */
135{
136 initializer_ = NULL;
137 finalizer_ = NULL;
138 threads_.clear();
139 waitcond_timedthreads_ = new WaitCondition();
140 interrupt_timed_thread_wait_ = false;
141 aspect_collector_ = new ThreadManagerAspectCollector(this);
142}
143
144/** Constructor.
145 * This contsructor is equivalent to the one without parameters followed
146 * by a call to set_inifins().
147 * @param initializer thread initializer
148 * @param finalizer thread finalizer
149 */
151{
152 initializer_ = NULL;
153 finalizer_ = NULL;
154 threads_.clear();
155 waitcond_timedthreads_ = new WaitCondition();
156 interrupt_timed_thread_wait_ = false;
157 aspect_collector_ = new ThreadManagerAspectCollector(this);
158 set_inifin(initializer, finalizer);
159}
160
161/** Destructor. */
163{
164 // stop all threads, we call finalize, and we run through it as long as there are
165 // still running threads, after that, we force the thread's death.
166 for (tit_ = threads_.begin(); tit_ != threads_.end(); ++tit_) {
167 try {
168 tit_->second.force_stop(finalizer_);
169 } catch (Exception &e) {
170 } // ignore
171 }
172 try {
173 untimed_threads_.force_stop(finalizer_);
174 } catch (Exception &e) {
175 } // ignore
176 threads_.clear();
177
178 delete waitcond_timedthreads_;
179 delete aspect_collector_;
180}
181
182/** Set initializer/finalizer.
183 * This method has to be called before any thread is added/removed.
184 * @param initializer thread initializer
185 * @param finalizer thread finalizer
186 */
187void
189{
190 initializer_ = initializer;
191 finalizer_ = finalizer;
192}
193
194/** Remove the given thread from internal structures.
195 * Thread is removed from the internal structures. If the thread has the
196 * BlockedTimingAspect then the hook is added to the changed list.
197 *
198 * @param t thread to remove
199 * @param changed list of changed hooks, appropriate hook is added if necessary
200 */
201void
202ThreadManager::internal_remove_thread(Thread *t)
203{
204 BlockedTimingAspect *timed_thread;
205
206 if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL) {
207 // find thread and remove
209 if (threads_.find(hook) != threads_.end()) {
210 threads_[hook].remove_locked(t);
211 if (threads_[hook].empty())
212 threads_.erase(hook);
213 }
214 } else {
215 untimed_threads_.remove_locked(t);
216 }
217}
218
219/** Add the given thread to internal structures.
220 * Thread is added to the internal structures. If the thread has the
221 * BlockedTimingAspect then the hook is added to the changed list.
222 *
223 * @param t thread to add
224 * @param changed list of changed hooks, appropriate hook is added if necessary
225 */
226void
227ThreadManager::internal_add_thread(Thread *t)
228{
229 BlockedTimingAspect *timed_thread;
230 if ((timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL) {
231 BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook();
232
233 if (threads_.find(hook) == threads_.end()) {
234 threads_[hook].set_name("ThreadManagerList Hook %i", hook);
235 threads_[hook].set_maintain_barrier(true);
236 }
237 threads_[hook].push_back_locked(t);
238
239 waitcond_timedthreads_->wake_all();
240 } else {
241 untimed_threads_.push_back_locked(t);
242 }
243}
244
245/** Add threads.
246 * Add the given threads to the thread manager. The threads are initialised
247 * as appropriate and started. See the class documentation for supported
248 * specialisations of threads and the performed initialisation steps.
249 * If the thread initializer cannot initalize one or more threads no thread
250 * is added. In this regard the operation is atomic, either all threads are
251 * added or none.
252 * @param tl thread list with threads to add
253 * @exception CannotInitializeThreadException thrown if at least one of the
254 * threads could not be initialised
255 */
256void
257ThreadManager::add_maybelocked(ThreadList &tl, bool lock)
258{
259 if (!(initializer_ && finalizer_)) {
260 throw NullPointerException("ThreadManager: initializer/finalizer not set");
261 }
262
263 if (tl.sealed()) {
264 throw Exception("Not accepting new threads from list that is not fresh, "
265 "list '%s' already sealed",
266 tl.name());
267 }
268
269 tl.lock();
270
271 // Try to initialise all threads
272 try {
273 tl.init(initializer_, finalizer_);
274 } catch (Exception &e) {
275 tl.unlock();
276 throw;
277 }
278
279 tl.seal();
280 tl.start();
281
282 // All thread initialized, now add threads to internal structure
283 MutexLocker locker(threads_.mutex(), lock);
284 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
285 internal_add_thread(*i);
286 }
287
288 tl.unlock();
289}
290
291/** Add one thread.
292 * Add the given thread to the thread manager. The thread is initialized
293 * as appropriate and started. See the class documentation for supported
294 * specialisations of threads and the performed initialisation steps.
295 * If the thread initializer cannot initalize the thread it is not added.
296 * @param thread thread to add
297 * @param lock if true the environment is locked before adding the thread
298 * @exception CannotInitializeThreadException thrown if at least the
299 * thread could not be initialised
300 */
301void
302ThreadManager::add_maybelocked(Thread *thread, bool lock)
303{
304 if (thread == NULL) {
305 throw NullPointerException("FawkesThreadMananger: cannot add NULL as thread");
306 }
307
308 if (!(initializer_ && finalizer_)) {
309 throw NullPointerException("ThreadManager: initializer/finalizer not set");
310 }
311
312 try {
313 initializer_->init(thread);
314 } catch (CannotInitializeThreadException &e) {
315 thread->notify_of_failed_init();
316 e.append("Adding thread in ThreadManager failed");
317 throw;
318 }
319
320 // if the thread's init() method fails, we need to finalize that very
321 // thread only with the finalizer, already initialized threads muts be
322 // fully finalized
323 try {
324 thread->init();
325 } catch (CannotInitializeThreadException &e) {
326 thread->notify_of_failed_init();
327 finalizer_->finalize(thread);
328 throw;
329 } catch (Exception &e) {
330 thread->notify_of_failed_init();
331 CannotInitializeThreadException cite(e);
332 cite.append("Could not initialize thread '%s' (ThreadManager)", thread->name());
333 finalizer_->finalize(thread);
334 throw cite;
335 } catch (std::exception &e) {
336 thread->notify_of_failed_init();
337 CannotInitializeThreadException cite;
338 cite.append("Caught std::exception: %s", e.what());
339 cite.append("Could not initialize thread '%s' (ThreadManager)", thread->name());
340 finalizer_->finalize(thread);
341 throw cite;
342 } catch (...) {
343 thread->notify_of_failed_init();
344 CannotInitializeThreadException cite("Could not initialize thread '%s' (ThreadManager)",
345 thread->name());
346 cite.append("Unknown exception caught");
347 finalizer_->finalize(thread);
348 throw cite;
349 }
350
351 thread->start();
352 MutexLocker locker(threads_.mutex(), lock);
353 internal_add_thread(thread);
354}
355
356/** Remove the given threads.
357 * The thread manager tries to finalize and stop the threads and then removes the
358 * threads from the internal structures.
359 *
360 * This may fail if at least one thread of the given list cannot be finalized, for
361 * example if prepare_finalize() returns false or if the thread finalizer cannot
362 * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
363 *
364 * @param tl threads to remove.
365 * @exception CannotFinalizeThreadException At least one thread cannot be safely
366 * finalized
367 * @exception ThreadListNotSealedException if the given thread lits tl is not
368 * sealed the thread manager will refuse to remove it
369 */
370void
371ThreadManager::remove_maybelocked(ThreadList &tl, bool lock)
372{
373 if (!(initializer_ && finalizer_)) {
374 throw NullPointerException("ThreadManager: initializer/finalizer not set");
375 }
376
377 if (!tl.sealed()) {
378 throw ThreadListNotSealedException("(ThreadManager) Cannot remove unsealed thread "
379 "list. Not accepting unsealed list '%s' for removal",
380 tl.name());
381 }
382
383 tl.lock();
384 MutexLocker locker(threads_.mutex(), lock);
385
386 try {
387 if (!tl.prepare_finalize(finalizer_)) {
388 tl.cancel_finalize();
389 tl.unlock();
390 throw CannotFinalizeThreadException("One or more threads in list '%s' cannot be "
391 "finalized",
392 tl.name());
393 }
394 } catch (CannotFinalizeThreadException &e) {
395 tl.unlock();
396 throw;
397 } catch (Exception &e) {
398 tl.unlock();
399 e.append("One or more threads in list '%s' cannot be finalized", tl.name());
400 throw CannotFinalizeThreadException(e);
401 }
402
403 tl.stop();
404 try {
405 tl.finalize(finalizer_);
406 } catch (Exception &e) {
407 tl.unlock();
408 throw;
409 }
410
411 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
412 internal_remove_thread(*i);
413 }
414
415 tl.unlock();
416}
417
418/** Remove the given thread.
419 * The thread manager tries to finalize and stop the thread and then removes the
420 * thread from the internal structures.
421 *
422 * This may fail if the thread cannot be finalized, for
423 * example if prepare_finalize() returns false or if the thread finalizer cannot
424 * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
425 *
426 * @param thread thread to remove.
427 * @exception CannotFinalizeThreadException At least one thread cannot be safely
428 * finalized
429 */
430void
431ThreadManager::remove_maybelocked(Thread *thread, bool lock)
432{
433 if (thread == NULL)
434 return;
435
436 if (!(initializer_ && finalizer_)) {
437 throw NullPointerException("ThreadManager: initializer/finalizer not set");
438 }
439
440 MutexLocker locker(threads_.mutex(), lock);
441 try {
442 if (!thread->prepare_finalize()) {
443 thread->cancel_finalize();
444 throw CannotFinalizeThreadException("Thread '%s'cannot be finalized", thread->name());
445 }
446 } catch (CannotFinalizeThreadException &e) {
447 e.append("ThreadManager cannot stop thread '%s'", thread->name());
448 thread->cancel_finalize();
449 throw;
450 }
451
452 thread->cancel();
453 thread->join();
454 thread->finalize();
455 finalizer_->finalize(thread);
456
457 internal_remove_thread(thread);
458}
459
460/** Force removal of the given threads.
461 * The thread manager tries to finalize and stop the threads and then removes the
462 * threads from the internal structures.
463 *
464 * This will succeed even if a thread of the given list cannot be finalized, for
465 * example if prepare_finalize() returns false or if the thread finalizer cannot
466 * finalize the thread.
467 *
468 * <b>Caution, using this function may damage your robot.</b>
469 *
470 * @param tl threads to remove.
471 * @exception ThreadListNotSealedException if the given thread lits tl is not
472 * sealed the thread manager will refuse to remove it
473 * The threads are removed from thread manager control. The threads will be stopped
474 * before they are removed (may cause unpredictable results otherwise).
475 */
476void
478{
479 if (!tl.sealed()) {
480 throw ThreadListNotSealedException("Not accepting unsealed list '%s' for removal", tl.name());
481 }
482
483 tl.lock();
484 threads_.mutex()->stopby();
485 bool caught_exception = false;
486 Exception exc("Forced removal of thread list %s failed", tl.name());
487 try {
488 tl.force_stop(finalizer_);
489 } catch (Exception &e) {
490 caught_exception = true;
491 exc = e;
492 }
493
494 for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
495 internal_remove_thread(*i);
496 }
497
498 tl.unlock();
499
500 if (caught_exception) {
501 throw exc;
502 }
503}
504
505/** Force removal of the given thread.
506 * The thread manager tries to finalize and stop the thread and then removes the
507 * thread from the internal structures.
508 *
509 * This will succeed even if the thread cannot be finalized, for
510 * example if prepare_finalize() returns false or if the thread finalizer cannot
511 * finalize the thread.
512 *
513 * <b>Caution, using this function may damage your robot.</b>
514 *
515 * @param thread thread to remove.
516 * @exception ThreadListNotSealedException if the given thread lits tl is not
517 * sealed the thread manager will refuse to remove it
518 * The threads are removed from thread manager control. The threads will be stopped
519 * before they are removed (may cause unpredictable results otherwise).
520 */
521void
523{
524 MutexLocker lock(threads_.mutex());
525 try {
526 thread->prepare_finalize();
527 } catch (Exception &e) {
528 // ignore
529 }
530
531 thread->cancel();
532 thread->join();
533 thread->finalize();
534 if (finalizer_)
535 finalizer_->finalize(thread);
536
537 internal_remove_thread(thread);
538}
539
540void
542{
543 MutexLocker lock(threads_.mutex());
544
545 unsigned int timeout_sec = 0;
546 if (timeout_usec >= 1000000) {
547 timeout_sec = timeout_usec / 1000000;
548 timeout_usec -= timeout_sec * 1000000;
549 }
550
551 // Note that the following lines might throw an exception, we just pass it on
552 if (threads_.find(hook) != threads_.end()) {
553 threads_[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000);
554 }
555}
556
557void
559{
560 MutexLocker lock(threads_.mutex());
561
562 if (threads_.find(hook) != threads_.end()) {
563 if (barrier) {
564 threads_[hook].wakeup(barrier);
565 } else {
566 threads_[hook].wakeup();
567 }
568 if (threads_[hook].size() == 0) {
569 threads_.erase(hook);
570 }
571 }
572}
573
574void
575ThreadManager::try_recover(std::list<std::string> &recovered_threads)
576{
577 threads_.lock();
578 for (tit_ = threads_.begin(); tit_ != threads_.end(); ++tit_) {
579 tit_->second.try_recover(recovered_threads);
580 }
581 threads_.unlock();
582}
583
584bool
586{
587 return (threads_.size() > 0);
588}
589
590void
592{
593 interrupt_timed_thread_wait_ = false;
594 waitcond_timedthreads_->wait();
595 if (interrupt_timed_thread_wait_) {
596 interrupt_timed_thread_wait_ = false;
597 throw InterruptedException("Waiting for timed threads was interrupted");
598 }
599}
600
601void
603{
604 interrupt_timed_thread_wait_ = true;
605 waitcond_timedthreads_->wake_all();
606}
607
608/** Get a thread collector to be used for an aspect initializer.
609 * @return thread collector instance to use for ThreadProducerAspect.
610 */
613{
614 return aspect_collector_;
615}
616
617} // end namespace fawkes
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:32
Thread aspect to use blocked timing.
WakeupHook blockedTimingAspectHook() const
Get the wakeup hook.
WakeupHook
Type to define at which hook the thread is woken up.
Base class for exceptions in Fawkes.
Definition: exception.h:36
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
Mutex locking helper.
Definition: mutex_locker.h:34
Thread finalizer interface.
virtual void finalize(Thread *thread)=0
Finalize a thread.
Thread initializer interface.
virtual void init(Thread *thread)=0
This method is called by the ThreadManager for each newly added Thread.
Thread list not sealed exception.
Definition: thread_list.h:50
List of threads.
Definition: thread_list.h:56
void remove_locked(Thread *thread)
Remove with lock protection.
void push_back_locked(Thread *thread)
Add thread to the end with lock protection.
const char * name()
Name of the thread list.
void force_stop(ThreadFinalizer *finalizer)
Force stop of all threads.
bool sealed()
Check if list is sealed.
virtual void interrupt_timed_thread_wait()
Interrupt any currently running wait_for_timed_threads() and cause it to throw an InterruptedExceptio...
virtual ~ThreadManager()
Destructor.
virtual void try_recover(std::list< std::string > &recovered_threads)
Try to recover threads.
virtual void wait_for_timed_threads()
Wait for timed threads.
virtual void wakeup_and_wait(BlockedTimingAspect::WakeupHook hook, unsigned int timeout_usec=0)
Wakeup thread for given hook and wait for completion.
virtual void wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier=0)
Wakeup thread for given hook.
ThreadCollector * aspect_collector() const
Get a thread collector to be used for an aspect initializer.
virtual bool timed_threads_exist()
Check if any timed threads exist.
ThreadManager()
Constructor.
virtual void force_remove(ThreadList &tl)
Force removal of the given threads.
void set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
Set initializer/finalizer.
Thread class encapsulation of pthreads.
Definition: thread.h:46
void join()
Join the thread.
Definition: thread.cpp:597
bool prepare_finalize()
Prepare finalization.
Definition: thread.cpp:375
void cancel()
Cancel a thread.
Definition: thread.cpp:646
virtual void finalize()
Finalize the thread.
Definition: thread.cpp:463
Wait until a given condition holds.
void wait()
Wait for the condition forever.
void wake_all()
Wake up all waiting threads.
Fawkes library namespace.