22#include <core/threading/mutex_locker.h>
23#include <syncpoint/exceptions.h>
24#include <syncpoint/syncpoint.h>
25#include <utils/time/time.h>
61 uint max_waittime_sec ,
62 uint max_waittime_nsec )
63: identifier_(identifier),
67 creation_time_(
Time()),
69 mutex_next_wait_(new
Mutex()),
71 mutex_wait_for_one_(new
Mutex()),
73 mutex_wait_for_all_(new
Mutex()),
75 wait_for_all_timer_running_(false),
76 max_waittime_sec_(max_waittime_sec),
77 max_waittime_nsec_(max_waittime_nsec),
79 last_emitter_reset_(
Time(0l))
81 if (identifier.empty()) {
85 if (identifier.compare(0, 1,
"/")) {
92 if (identifier !=
"/" && !identifier.compare(identifier.size() - 1, 1,
"/")) {
98SyncPoint::~SyncPoint()
152 emit(component,
true);
164 if (!emit_locker_.empty()) {
179 if (!emitters_.count(component)) {
188 bool pred_remove_from_pending =
false;
189 if (remove_from_pending) {
190 multiset<string>::iterator it_pending = pending_emitters_.find(component);
191 if (it_pending != pending_emitters_.end()) {
192 pending_emitters_.erase(it_pending);
194 if (last_emitter_reset_ <= predecessor_->last_emitter_reset_) {
195 pred_remove_from_pending =
true;
200 if (pending_emitters_.empty()) {
213 predecessor_->
emit(component, pred_remove_from_pending);
248 std::set<std::string> * watchers;
252 bool * timer_running;
253 string * timer_owner;
255 if (type == WAIT_FOR_ONE) {
260 timer_running = NULL;
261 }
else if (type == WAIT_FOR_ALL) {
281 if (watchers->count(component)) {
290 bool need_to_wait = !emitters_.empty() || type == WAIT_FOR_ONE;
292 watchers->insert(component);
296 if (emit_locker_ == component) {
302 if (type == WAIT_FOR_ONE) {
305 pthread_cleanup_push(cleanup_mutex, mutex_cond);
307 pthread_cleanup_pop(1);
310 handle_default(component, type);
314 if (*timer_running) {
316 pthread_cleanup_push(cleanup_mutex, mutex_cond);
318 pthread_cleanup_pop(1);
320 *timer_running =
true;
321 *timer_owner = component;
322 if (wait_sec != 0 || wait_nsec != 0) {
328 pthread_cleanup_push(cleanup_mutex, mutex_cond);
330 pthread_cleanup_pop(1);
332 *timer_running =
false;
335 handle_default(component, type);
358 wait(component, WAIT_FOR_ONE);
367 wait(component, WAIT_FOR_ALL);
378 wait(component, SyncPoint::WAIT_FOR_ONE, wait_sec, wait_nsec);
389 wait(component, SyncPoint::WAIT_FOR_ALL, wait_sec, wait_nsec);
422 if (emit_locker_.empty()) {
423 emit_locker_ = component;
426 "%s tried to call lock_until_next_wait, "
427 "but %s already did the same. Ignoring.",
429 emit_locker_.c_str());
442 emitters_.insert(component);
443 pending_emitters_.insert(component);
459 multiset<string>::iterator it_emitter = emitters_.find(component);
460 if (it_emitter == emitters_.end()) {
465 if (emit_if_pending && is_pending(component)) {
472 emitters_.erase(it_emitter);
487 return emitters_.count(component) > 0;
507pair<set<string>::iterator,
bool>
532 if (type == WAIT_FOR_ONE) {
534 }
else if (type == WAIT_FOR_ALL) {
570 case SyncPoint::WAIT_FOR_ONE: {
574 case SyncPoint::WAIT_FOR_ALL: {
583SyncPoint::reset_emitters()
585 last_emitter_reset_ =
Time();
586 pending_emitters_ = emitters_;
590SyncPoint::is_pending(
string component)
592 return pending_emitters_.count(component) > 0;
596SyncPoint::handle_default(
string component, WakeupType type)
599 "Thread time limit exceeded while waiting for syncpoint '%s'. "
600 "Time limit: %f sec.",
603 bad_components_.insert(pending_emitters_.begin(), pending_emitters_.end());
604 if (!bad_components_.empty()) {
605 stringstream message;
606 for (set<string>::const_iterator it = bad_components_.begin(); it != bad_components_.end();
608 message <<
" " << *it;
609 const auto &last_call =
611 return call.get_caller() == *it;
614 message <<
" (" << Time().in_sec() - last_call->get_call_time().in_sec() <<
"s)";
617 logger_->
log_warn(component.c_str(),
"bad components:%s", message.str().c_str());
618 }
else if (type == SyncPoint::WAIT_FOR_ALL) {
619 throw Exception(
"SyncPoints: component %s defaulted, "
620 "but there is no pending emitter. This is probably a bug.",
Circular buffer with a fixed size.
void push_back(const Type &val)
Insert an element at the end of the buffer and delete the first element if necessary.
Base class for exceptions in Fawkes.
Log through multiple loggers.
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
void relock()
Lock this mutex, again.
void unlock()
Unlock the mutex.
Mutex mutual exclusion lock.
void lock()
Lock this mutex.
void unlock()
Unlock the mutex.
A call (wait() or emit()) to a SyncPoint.
Invalid identifier used (i.e.
A component called wait() but is already waiting.
Emit was called on a SyncBarrier but the calling component is not registered as emitter.
Emit was called by a component which isn't in the watcher set (or wrong component argument was passed...
Emit was called by a component which isn't in the watcher set (or wrong component argument was passed...
bool operator<(const SyncPoint &other) const
LessThan Operator.
virtual void reltime_wait_for_one(const std::string &component, uint wait_sec, uint wait_nsec)
wait for the sync point, but abort after given time
CircularBuffer< SyncPointCall > get_wait_calls(WakeupType type=WAIT_FOR_ONE) const
virtual void wait_for_one(const std::string &component)
Wait for a single emitter.
virtual void wait(const std::string &component, WakeupType=WAIT_FOR_ONE, uint wait_sec=0, uint wait_nsec=0)
wait for the sync point to be emitted by any other component
std::multiset< std::string > get_emitters() const
virtual void reltime_wait_for_all(const std::string &component, uint wait_sec, uint wait_nsec)
Wait for all registered emitters for the given time.
std::set< std::string > watchers_wait_for_one_
Set of all components which are currently waiting for a single emitter.
bool wait_for_all_timer_running_
true if the wait for all timer is running
virtual void unwait(const std::string &component)
abort waiting
CircularBuffer< SyncPointCall > wait_for_all_calls_
A buffer of the most recent wait calls of type WAIT_FOR_ALL.
Mutex * mutex_wait_for_one_
Mutex used for cond_wait_for_one_.
WaitCondition * cond_wait_for_one_
WaitCondition which is used for wait_for_one()
virtual void unregister_emitter(const std::string &component, bool emit_if_pending=true)
unregister as emitter
uint max_waittime_sec_
maximum waiting time in secs
std::string wait_for_all_timer_owner_
the component that started the wait-for-all timer
Mutex * mutex_next_wait_
Mutex used to allow lock_until_next_wait.
uint max_waittime_nsec_
maximum waiting time in nsecs
Mutex * mutex_wait_for_all_
Mutex used for cond_wait_for_all_.
std::set< std::string > get_watchers() const
std::set< std::string > watchers_wait_for_all_
Set of all components which are currently waiting on the barrier.
bool is_emitter(const std::string &component) const
Check if the given component is an emitter.
std::pair< std::set< std::string >::iterator, bool > add_watcher(std::string watcher)
Add a watcher to the watch list.
virtual void register_emitter(const std::string &component)
register as emitter
bool watcher_is_waiting(std::string watcher, WakeupType type) const
Check if the given waiter is currently waiting with the given type.
virtual void emit(const std::string &component)
send a signal to all waiting threads
WaitCondition * cond_next_wait_
WaitCondition used for lock_until_next_wait.
MultiLogger * logger_
Logger.
CircularBuffer< SyncPointCall > get_emit_calls() const
SyncPoint(std::string identifier, MultiLogger *logger, uint max_waittime_sec=0, uint max_waittime_nsec=0)
Constructor.
void lock_until_next_wait(const std::string &component)
Lock the SyncPoint for emitters until the specified component does the next wait() call.
std::set< std::string > watchers_
Set of all components which use this SyncPoint.
CircularBuffer< SyncPointCall > emit_calls_
A buffer of the most recent emit calls.
CircularBuffer< SyncPointCall > wait_for_one_calls_
A buffer of the most recent wait calls of type WAIT_FOR_ONE.
std::string get_identifier() const
const std::string identifier_
The unique identifier of the SyncPoint.
bool is_watcher(const std::string &component) const
Check if the given component is a watch.
virtual void wait_for_all(const std::string &component)
Wait for all registered emitters.
WakeupType
Type to define when a thread wakes up after waiting for a SyncPoint.
WaitCondition * cond_wait_for_all_
WaitCondition which is used for wait_for_all()
bool operator==(const SyncPoint &other) const
EqualOperator.
Mutex * mutex_
Mutex used to protect all member variables.
A class for handling time.
Wait until a given condition holds.
void wait()
Wait for the condition forever.
void wake_all()
Wake up all waiting threads.
bool reltimed_wait(unsigned int sec, unsigned int nanosec)
Wait with relative timeout.
Fawkes library namespace.