Fawkes API Fawkes Development Version
test_syncpoint.cpp
1/***************************************************************************
2 * test_syncpoint.cpp - SyncPoint Unit Test
3 *
4 * Created: Wed Jan 22 11:17:43 2014
5 * Copyright 2014-2018 Till Hofmann
6 *
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Library General Public License for more details.
18 *
19 * Read the full text in the LICENSE.GPL file in the doc directory.
20 */
21
22#include <gtest/gtest.h>
23
24#include <pthread.h>
25#ifdef __FreeBSD__
26# include <pthread_np.h>
27#endif
28#include <core/threading/barrier.h>
29#include <core/threading/mutex.h>
30#include <core/threading/mutex_locker.h>
31#include <core/threading/wait_condition.h>
32#include <core/utils/refptr.h>
33#include <libs/syncpoint/exceptions.h>
34#include <libs/syncpoint/syncpoint.h>
35#include <libs/syncpoint/syncpoint_manager.h>
36#include <logging/cache.h>
37#include <logging/multi.h>
38#include <sys/time.h>
39
40#include <atomic>
41#include <cmath>
42#include <errno.h>
43#include <string>
44#include <time.h>
45#include <unistd.h>
46
47using namespace fawkes;
48using namespace std;
49
50/** @class SyncPointTest
51 * Test class for SyncPoint
52 * This class tests basic functionality of SyncPoints
53 */
54class SyncPointTest : public ::testing::Test
55{
56protected:
57 /**
58 * Initialize the test class
59 */
60 virtual void
62 {
63 logger_ = new MultiLogger();
64 string id1 = "/id1";
65 string id2 = "/id2";
66 sp1 = new SyncPoint(id1, logger_);
67 sp2 = new SyncPoint(id1, logger_);
68 sp3 = new SyncPoint(id2, logger_);
69 }
70
71 /** Clean up */
72 virtual void
74 {
75 delete logger_;
76 }
77
78 /** A syncpoint for testing */
80 /** A syncpoint for testing */
82 /** A syncpoint for testing */
84
85 /** Logger for testing */
87};
88
89/** @class SyncPointManagerTest
90 * Test class for SyncPointManager
91 * This class tests basic functionality of the SyncPointManager
92 */
93class SyncPointManagerTest : public ::testing::Test
94{
95protected:
96 /**
97 * Initialize the test class
98 */
100 {
101 logger_ = new MultiLogger();
105
106 pthread_attr_init(&attrs);
107 }
108
109 /**
110 * Deinitialize the test class
111 */
113 {
114 pthread_attr_destroy(&attrs);
115 delete logger_;
116 // delete cache_logger_;
117 }
118
119 /**
120 * A Pointer to a SyncPointManager
121 */
123
124 /** Logger used to initialize SyncPoints */
126
127 /** Cache Logger used for testing */
129
130 /** Thread attributes */
131 pthread_attr_t attrs;
132};
133
134/** @class SyncBarrierTest
135 * Test SyncBarriers
136 */
138{
139protected:
140 /** Constructor. */
142 {
143 }
144};
145
146TEST_F(SyncPointTest, CreateSyncPoint)
147{
148 ASSERT_TRUE(*sp1 != NULL);
149}
150
151TEST_F(SyncPointTest, Equals)
152{
153 // RefPtr<SyncPoint>
154 ASSERT_NE(sp1, sp2);
155 // SyncPoint*
156 ASSERT_NE(*sp1, *sp2);
157 // SyncPoint
158 ASSERT_EQ(**sp1, **sp2);
159}
160
161TEST_F(SyncPointTest, LessThan)
162{
163 ASSERT_LT(**sp1, **sp3);
164 ASSERT_FALSE(**sp3 < **sp1);
165 ASSERT_FALSE(**sp1 < **sp2);
166 ASSERT_FALSE(**sp2 < **sp1);
167}
168
169TEST_F(SyncPointTest, SyncPointSets)
170{
171 using namespace std;
172 set<RefPtr<SyncPoint>, SyncPointSetLessThan> sp_set;
173 pair<set<RefPtr<SyncPoint>>::iterator, bool> ret;
174
175 // insert sp1
176 ret = sp_set.insert(sp1);
177 ASSERT_TRUE(ret.second);
178 ASSERT_EQ(sp1->get_identifier(), (*(ret.first))->get_identifier());
179
180 // insert sp3
181 ret = sp_set.insert(sp3);
182 ASSERT_TRUE(ret.second);
183 ASSERT_EQ(sp3->get_identifier(), (*(ret.first))->get_identifier());
184
185 // insert sp1 again
186 ret = sp_set.insert(sp1);
187 ASSERT_FALSE(ret.second);
188 ASSERT_EQ(sp1->get_identifier(), (*(ret.first))->get_identifier());
189
190 // insert sp2 (same as sp1)
191 ret = sp_set.insert(sp2);
192 ASSERT_FALSE(ret.second);
193 ASSERT_EQ(sp2->get_identifier(), (*(ret.first))->get_identifier());
194}
195
197{
198 ASSERT_EQ(0u, manager->get_syncpoints().size());
199 manager->get_syncpoint("test", "/test/1");
200 ASSERT_EQ(3u, manager->get_syncpoints().size());
201 ASSERT_EQ(1u,
202 manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/1", logger_))));
203 manager->get_syncpoint("test2", "/test/2");
204 ASSERT_EQ(4u, manager->get_syncpoints().size());
205 ASSERT_EQ(1u,
206 manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/1", logger_))));
207 ASSERT_EQ(1u,
208 manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/2", logger_))));
209 manager->get_syncpoint("test3", "/test/1");
210 ASSERT_EQ(4u, manager->get_syncpoints().size());
211 ASSERT_EQ(1u,
212 manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/1", logger_))));
213 ASSERT_EQ(1u,
214 manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test/2", logger_))));
215 ASSERT_EQ(1u, manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/", logger_))));
216 ASSERT_EQ(1u,
217 manager->get_syncpoints().count(RefPtr<SyncPoint>(new SyncPoint("/test", logger_))));
218}
219
220TEST_F(SyncPointManagerTest, WatcherSet)
221{
222 ASSERT_NO_THROW(manager->get_syncpoint("component 1", "/test"));
223 ASSERT_NO_THROW(manager->get_syncpoint("component 2", "/test"));
224 ASSERT_NO_THROW(manager->get_syncpoint("component 3", "/test"));
225}
226
227/** Test what happens if we acquire a SyncPoint, release it, and then acquire it
228 * again. If release_syncpoint works properly, this should not throw. Otherwise,
229 * we would expect a SyncPointAlreadyOpenedException
230 */
231TEST_F(SyncPointManagerTest, ReleaseAndReacquire)
232{
233 string comp = "component";
234 string id = "/test/sp1";
235 RefPtr<SyncPoint> sp = manager->get_syncpoint(comp, id);
236 set<RefPtr<SyncPoint>, SyncPointSetLessThan> syncpoints = manager->get_syncpoints();
237 ASSERT_EQ(1, syncpoints.count(RefPtr<SyncPoint>(new SyncPoint("/test", logger_))));
238 for (set<RefPtr<SyncPoint>>::const_iterator sp_it = syncpoints.begin(); sp_it != syncpoints.end();
239 sp_it++) {
240 EXPECT_EQ(1, (*sp_it)->get_watchers().count(comp))
241 << "for component '" << comp << "' and SyncPoint '" << (*sp_it)->get_identifier() << "'";
242 }
243 manager->release_syncpoint(comp, sp);
244 for (set<RefPtr<SyncPoint>>::const_iterator sp_it = syncpoints.begin(); sp_it != syncpoints.end();
245 sp_it++) {
246 EXPECT_EQ(0, (*sp_it)->get_watchers().count(comp))
247 << "for component '" << comp << "' and SyncPoint '" << (*sp_it)->get_identifier() << "'";
248 }
249 ASSERT_NO_THROW(manager->get_syncpoint(comp, id));
250}
251
252TEST_F(SyncPointTest, EmptyIdentifier)
253{
254 ASSERT_THROW(sp1 = new SyncPoint("", NULL), SyncPointInvalidIdentifierException);
255}
256
257TEST_F(SyncPointTest, InvalidIdentifier)
258{
259 EXPECT_THROW(sp1 = new SyncPoint("invalid", NULL), SyncPointInvalidIdentifierException);
260 EXPECT_NO_THROW(sp1 = new SyncPoint("/", NULL));
261 EXPECT_THROW(sp1 = new SyncPoint("/test/", NULL), SyncPointInvalidIdentifierException);
262}
263
264TEST_F(SyncPointManagerTest, SyncPointManagerExceptions)
265{
266 RefPtr<SyncPoint> invalid_sp;
267 ASSERT_THROW(invalid_sp = manager->get_syncpoint("", "/test/sp1"),
269
270 // make sure syncpoint_manager doesn't catch the exceptions thrown by SyncPoint
271 ASSERT_THROW(invalid_sp = manager->get_syncpoint("waiter", ""),
273 ASSERT_THROW(invalid_sp = manager->get_syncpoint("waiter", "invalid"),
275}
276
277TEST_F(SyncPointManagerTest, SyncPointHierarchyRegisteredWatchers)
278{
279 string comp = "component1";
280 string id = "/test/sp1";
281 RefPtr<SyncPoint> sp = manager->get_syncpoint(comp, "/test/sp1");
282 set<RefPtr<SyncPoint>, SyncPointSetLessThan> syncpoints = manager->get_syncpoints();
283 set<RefPtr<SyncPoint>>::iterator sp_test_it =
284 syncpoints.find(RefPtr<SyncPoint>(new SyncPoint("/test", logger_)));
285 set<RefPtr<SyncPoint>>::iterator sp_root_it =
286 syncpoints.find(RefPtr<SyncPoint>(new SyncPoint("/", logger_)));
287 ASSERT_NE(syncpoints.end(), sp_test_it);
288 ASSERT_NE(syncpoints.end(), sp_root_it);
289 RefPtr<SyncPoint> sp_test = *sp_test_it;
290 RefPtr<SyncPoint> sp_root = *sp_root_it;
291 EXPECT_EQ(1, syncpoints.count(sp_test));
292 EXPECT_EQ(1, syncpoints.count(sp_root));
293 EXPECT_EQ(1, sp->get_watchers().count(comp));
294 EXPECT_EQ(1, sp_test->get_watchers().count(comp));
295 EXPECT_EQ(0, sp_test->get_watchers().count(id));
296 EXPECT_EQ(1, sp_root->get_watchers().count(comp));
297 EXPECT_EQ(0, sp_root->get_watchers().count(id));
298 EXPECT_EQ(0, sp_root->get_watchers().count(sp_test->get_identifier()));
299
300 manager->release_syncpoint(comp, sp);
301 EXPECT_EQ(0, sp_test->get_watchers().count(id));
302}
303
304TEST_F(SyncPointManagerTest, SyncPointComponentRegistersForMultipleSyncPoints)
305{
306 string comp = "component1";
307 string sp1_id = "/test/sp1";
308 string sp2_id = "/test/sp2";
309 RefPtr<SyncPoint> sp1 = manager->get_syncpoint(comp, sp1_id);
310 // the following should not throw
311 // if it does, registering for the predecessor '/test' may be broken
312 RefPtr<SyncPoint> sp2 = manager->get_syncpoint(comp, sp2_id);
313 RefPtr<SyncPoint> predecessor =
314 *manager->get_syncpoints().find(RefPtr<SyncPoint>(new SyncPoint("/test", logger_)));
315 EXPECT_EQ(1, sp1->get_watchers().count(comp))
316 << comp << " is not registered for " << sp1->get_identifier() << ", but should be!";
317 EXPECT_EQ(1, sp2->get_watchers().count(comp))
318 << comp << " is not registered for " << sp2->get_identifier() << ", but should be!";
319 EXPECT_EQ(1, predecessor->get_watchers().count(comp))
320 << comp << " is not registered for " << predecessor->get_identifier() << ", but should be!";
321
322 manager->release_syncpoint(comp, sp1);
323 EXPECT_EQ(1, sp2->get_watchers().count(comp));
324 EXPECT_EQ(1, predecessor->get_watchers().count(comp))
325 << comp << " is not registered for " << predecessor->get_identifier() << ", but should be!";
326}
327
328enum ThreadStatus { PENDING, RUNNING, FINISHED };
329
330/** struct used for multithreading tests */
332{
333 /** SyncPointManager passed to the thread */
335 /** Thread number */
336 uint thread_nr = 0;
337 /** Wait type */
338 SyncPoint::WakeupType type = SyncPoint::WAIT_FOR_ONE;
339 /** Number of wait calls the thread should make */
341 /** Name of the SyncPoint */
343 /** Name of the component */
344 string component = "";
345 /** timeout in sec */
346 uint timeout_sec = 0;
347 /** timeout in nsec */
348 uint timeout_nsec = 0;
349 /** current status of the thread */
350 atomic<ThreadStatus> status;
351 /** Mutex to protect cond_running */
353 /** WaitCondition to indicate that the thread is running */
355 /** Mutex to protect cond_finished */
357 /** WaitCondition to indicate that the thread has finished */
359 /** Barrier for startup synchronization. */
361};
362
363/** Helper function to wait for a thread to be running */
364bool
365wait_for_running(waiter_thread_params *params, long int sec = 1, long int nanosec = 0)
366{
367 RefPtr<SyncPoint> sp = params->manager->get_syncpoint("test_runner", params->sp_identifier);
368 const int wait_time_us = 1000;
369 for (uint i = 0; i < (sec * pow(10, 9) + nanosec) / (wait_time_us * pow(10, 3)); i++) {
370 if (sp->watcher_is_waiting(params->component, params->type)) {
371 return true;
372 }
373 usleep(wait_time_us);
374 }
375 return false;
376}
377
378/** Helper function to wait for a thread to be finished */
379bool
380wait_for_finished(waiter_thread_params *params, long int sec = 1, long int nanosec = 0)
381{
382 MutexLocker ml(params->mutex_finished);
383 if (params->status == FINISHED) {
384 return true;
385 } else {
386 return params->cond_finished.reltimed_wait(sec, nanosec);
387 }
388}
389
390/** get a SyncPoint and wait for it */
391void *
392start_waiter_thread(void *data)
393{
395 string component = params->component;
396 RefPtr<SyncPoint> sp = params->manager->get_syncpoint(component, params->sp_identifier);
397 params->status = RUNNING;
398 if (params->start_barrier) {
399 params->start_barrier->wait();
400 }
401 params->mutex_running.lock();
402 params->cond_running.wake_all();
403 params->mutex_running.unlock();
404 for (uint i = 0; i < params->num_wait_calls; i++) {
405 sp->wait(component, params->type, params->timeout_sec, params->timeout_nsec);
406 }
407 params->status = FINISHED;
408 params->mutex_finished.lock();
409 params->cond_finished.wake_all();
410 params->mutex_finished.unlock();
411 pthread_exit(NULL);
412}
413
414TEST_F(SyncPointManagerTest, MultipleWaits)
415{
416 RefPtr<SyncPoint> sp_ref = manager->get_syncpoint("component", "/test/sp1");
417 pthread_t thread1;
419 params.component = "component";
420 params.manager = manager;
421 params.num_wait_calls = 1;
422 params.sp_identifier = "/test/sp1";
423 pthread_create(&thread1, &attrs, start_waiter_thread, &params);
424 wait_for_running(&params);
425 ASSERT_THROW(sp_ref->wait("component"), SyncPointMultipleWaitCallsException);
426 pthread_cancel(thread1);
427 pthread_join(thread1, NULL);
428}
429
430/** Create multiple threads which will all call get_syncpoint
431 * for the same SyncPoint. Do not wait for the SyncPoint but return
432 * immediately.
433 */
434TEST_F(SyncPointManagerTest, MultipleManagerRequests)
435{
436 uint num_threads = 50;
437 pthread_t threads[num_threads];
438 waiter_thread_params *params[num_threads];
439 string sp_identifier = "/test/sp1";
440 for (uint i = 0; i < num_threads; i++) {
441 params[i] = new waiter_thread_params();
442 params[i]->component = "component " + to_string(i);
443 params[i]->manager = manager;
444 params[i]->thread_nr = i;
445 params[i]->num_wait_calls = 0;
446 params[i]->sp_identifier = sp_identifier;
447 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
448 pthread_yield();
449 ASSERT_LE(manager->get_syncpoints().size(), 3u);
450 }
451
452 for (uint i = 0; i < num_threads; i++) {
453 pthread_join(threads[i], NULL);
454 delete params[i];
455 }
456}
457
458/** start multiple threads and let them wait.
459 * This just tests whether there are any segfaults.
460 * No assertions are made.
461 */
462TEST_F(SyncPointManagerTest, ParallelWaitCalls)
463{
464 uint num_threads = 50;
465 uint num_wait_calls = 10;
466 pthread_t threads[num_threads];
467 waiter_thread_params *params[num_threads];
468 string sp_identifier = "/test/sp1";
469 for (uint i = 0; i < num_threads; i++) {
470 params[i] = new waiter_thread_params();
471 params[i]->component = "component " + to_string(i);
472 params[i]->manager = manager;
473 params[i]->thread_nr = i;
474 params[i]->num_wait_calls = num_wait_calls;
475 params[i]->sp_identifier = sp_identifier;
476 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
477 pthread_yield();
478 ASSERT_LE(manager->get_syncpoints().size(), 3u);
479 }
480
481 for (uint i = 0; i < num_threads; i++) {
482 EXPECT_TRUE(wait_for_running(params[i]));
483 }
484 for (uint i = 0; i < num_threads; i++) {
485 pthread_cancel(threads[i]);
486 ASSERT_EQ(0, pthread_join(threads[i], NULL));
487 delete params[i];
488 }
489}
490
491/** start multiple threads, let them wait for a SyncPoint,
492 * emit the SyncPoint and verify that they all returned
493 */
494TEST_F(SyncPointManagerTest, ParallelWaitsReturn)
495{
496 uint num_threads = 10;
497 uint num_wait_calls = 5;
498 pthread_t threads[num_threads];
499 waiter_thread_params *params[num_threads];
500 string sp_identifier = "/test/sp1";
501 for (uint i = 0; i < num_threads; i++) {
502 params[i] = new waiter_thread_params();
503 params[i]->component = "component " + to_string(i);
504 params[i]->manager = manager;
505 params[i]->thread_nr = i;
506 params[i]->num_wait_calls = num_wait_calls;
507 params[i]->sp_identifier = sp_identifier;
508 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
509 pthread_yield();
510 }
511
512 for (uint i = 0; i < num_threads; i++) {
513 EXPECT_TRUE(wait_for_running(params[i]));
514 }
515
516 string component = "emitter";
517 RefPtr<SyncPoint> sp = manager->get_syncpoint(component, sp_identifier);
518 sp->register_emitter(component);
519 for (uint i = 0; i < num_wait_calls; i++) {
520 sp->emit(component);
521 usleep(20000);
522 }
523
524 for (uint i = 0; i < num_threads; i++) {
525 ASSERT_TRUE(wait_for_finished(params[i]));
526 pthread_join(threads[i], NULL);
527 delete params[i];
528 }
529}
530
531/** start multiple threads, let them wait for a SyncPoint,
532 * but don't emit the SyncPoint. Verify that they have not returned
533 */
534TEST_F(SyncPointManagerTest, WaitDoesNotReturnImmediately)
535{
536 uint num_threads = 50;
537 pthread_t threads[num_threads];
538 waiter_thread_params *params[num_threads];
539 for (uint i = 0; i < num_threads; i++) {
540 params[i] = new waiter_thread_params();
541 params[i]->component = "component " + to_string(i);
542 params[i]->manager = manager;
543 params[i]->thread_nr = i;
544 params[i]->num_wait_calls = 1;
545 params[i]->sp_identifier = "/test/sp1";
546 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
547 }
548
549 for (uint i = 0; i < num_threads; i++) {
550 EXPECT_TRUE(wait_for_running(params[i]));
551 }
552
553 for (uint i = 0; i < num_threads; i++) {
554 EXPECT_EQ(RUNNING, params[i]->status);
555 pthread_cancel(threads[i]);
556 ASSERT_EQ(0, pthread_join(threads[i], NULL));
557 delete params[i];
558 }
559}
560
561/**
562 * Test the SyncPoint hierarchy.
563 * This creates a SyncPoint, an emitter and waiters which wait for the
564 * SyncPoint's predecessor, the predecessor's predecessor (grandparent),
565 * and the root SyncPoint ("/").
566 */
567TEST_F(SyncPointManagerTest, SyncPointHierarchy)
568{
569 vector<string> identifiers = {"/test/topic", "/test", "/", "/other/topic"};
570 uint num_threads = identifiers.size();
571 pthread_t threads[num_threads];
572 waiter_thread_params *params[num_threads];
573 for (uint i = 0; i < num_threads; i++) {
574 params[i] = new waiter_thread_params();
575 params[i]->component = "component " + to_string(i);
576 params[i]->manager = manager;
577 params[i]->thread_nr = i;
578 params[i]->num_wait_calls = 1;
579 params[i]->sp_identifier = identifiers.at(i);
580 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
581 }
582
583 for (uint i = 0; i < num_threads; i++) {
584 EXPECT_TRUE(wait_for_running(params[i]));
585 }
586 RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter", "/test/topic/sp");
587 sp->register_emitter("emitter");
588 sp->emit("emitter");
589
590 /* The first waiters should be unblocked */
591 for (uint i = 0; i < num_threads - 1; i++) {
592 ASSERT_TRUE(wait_for_finished(params[i]));
593 pthread_join(threads[i], NULL);
594 delete params[i];
595 }
596
597 /* The last waiter should still wait */
598 pthread_t last_thread = threads[num_threads - 1];
599 EXPECT_FALSE(wait_for_finished(params[num_threads - 1], 0, pow(10, 6)));
600 pthread_cancel(last_thread);
601 ASSERT_EQ(0, pthread_join(last_thread, NULL));
602}
603
604/** Emit a barrier without registering */
605TEST_F(SyncBarrierTest, EmitWithoutRegister)
606{
607 string component = "emitter";
608 RefPtr<SyncPoint> barrier = manager->get_syncpoint(component, "/test/barrier");
609 ASSERT_THROW(barrier->emit(component), SyncPointNonEmitterCalledEmitException);
610}
611
612/** Register multiple times
613 * This is allowed, but the component should then also emit multiple times */
614TEST_F(SyncBarrierTest, MultipleRegisterCalls)
615{
616 string component = "emitter";
617 RefPtr<SyncPoint> barrier = manager->get_syncpoint(component, "/test/barrier");
618 EXPECT_NO_THROW(barrier->register_emitter(component));
619 EXPECT_NO_THROW(barrier->register_emitter(component));
620}
621
622/** get a SyncBarrier, register as emitter and emit */
623void *
624start_barrier_emitter_thread(void *data)
625{
627 string component = "emitter " + to_string(params->thread_nr);
629 EXPECT_NO_THROW(sp = params->manager->get_syncpoint(component, params->sp_identifier));
630 sp->register_emitter(component);
631 for (uint i = 0; i < params->num_wait_calls; i++) {
632 sp->emit(component);
633 }
634 pthread_exit(NULL);
635}
636
637/** Helper class which registers and emits a given SyncBarrier */
639{
640public:
641 /** Constructor.
642 * @param identifier The identifier of this emitter.
643 * @param syncbarrier The identifier of the SyncBarrier to register for.
644 * @param manager Pointer to the SyncPointManager to use.
645 */
646 Emitter(string identifier, string syncbarrier, RefPtr<SyncPointManager> manager)
647 : identifier_(identifier), manager_(manager)
648 {
649 barrier_ = manager->get_syncpoint(identifier_, syncbarrier);
650 barrier_->register_emitter(identifier_);
651 }
652
653 /** Destructor. */
654 virtual ~Emitter()
655 {
656 barrier_->unregister_emitter(identifier_);
657 manager_->release_syncpoint(identifier_, barrier_);
658 }
659
660 /** emit the SyncBarrier */
661 void
663 {
664 barrier_->emit(identifier_);
665 }
666
667private:
668 string identifier_;
669 RefPtr<SyncPoint> barrier_;
671};
672
673/** Barrier: wait() returns immediately if no emitter is registered */
674TEST_F(SyncBarrierTest, WaitWithNoRegisteredEmitter)
675{
676 string barrier_id = "/test/barrier";
677 RefPtr<SyncPoint> barrier = manager->get_syncpoint("main loop", barrier_id);
678 const uint num_waiter_threads = 1;
679 const uint num_wait_calls = 1;
680 pthread_t waiter_threads[num_waiter_threads];
681 waiter_thread_params *params[num_waiter_threads];
682 for (uint i = 0; i < num_waiter_threads; i++) {
683 params[i] = new waiter_thread_params();
684 params[i]->type = SyncPoint::WAIT_FOR_ALL;
685 params[i]->component = "component " + to_string(i);
686 params[i]->manager = manager;
687 params[i]->thread_nr = i;
688 params[i]->num_wait_calls = num_wait_calls;
689 params[i]->sp_identifier = barrier_id;
690 pthread_create(&waiter_threads[i], &attrs, start_waiter_thread, params[i]);
691 }
692 for (uint i = 0; i < num_waiter_threads; i++) {
693 ASSERT_TRUE(wait_for_finished(params[i]));
694 pthread_join(waiter_threads[i], NULL);
695 delete params[i];
696 }
697}
698
699/** Start multiple threads, let them wait for a SyncBarrier,
700 * also have two threads registered as emitter.
701 * Let the first thread emit the barrier, assert the waiters did not unblock,
702 * then let the second thread emit.
703 * This tests the fundamental difference to a SyncPoint: With a SyncPoint,
704 * wait() returns if the SyncPoint is emitted by one component.
705 * With a SyncBarrier, all registered emitters need to emit the SyncBarrier
706 * before wait() returns.
707 */
708TEST_F(SyncBarrierTest, WaitForAllEmitters)
709{
710 string barrier_id = "/test/barrier";
711 Emitter em1("emitter 1", barrier_id, manager);
712 Emitter em2("emitter 2", barrier_id, manager);
713
714 RefPtr<SyncPoint> barrier = manager->get_syncpoint("main loop", barrier_id);
715
716 const uint num_waiter_threads = 50;
717 const uint num_wait_calls = 1;
718 pthread_t waiter_threads[num_waiter_threads];
719 waiter_thread_params *params[num_waiter_threads];
720 for (uint i = 0; i < num_waiter_threads; i++) {
721 params[i] = new waiter_thread_params();
722 params[i]->component = "component " + to_string(i);
723 params[i]->type = SyncPoint::WAIT_FOR_ALL;
724 params[i]->manager = manager;
725 params[i]->thread_nr = i;
726 params[i]->num_wait_calls = num_wait_calls;
727 params[i]->sp_identifier = barrier_id;
728 pthread_create(&waiter_threads[i], &attrs, start_waiter_thread, params[i]);
729 }
730
731 for (uint i = 0; i < num_waiter_threads; i++) {
732 EXPECT_TRUE(wait_for_running(params[i]));
733 }
734
735 em1.emit();
736
737 for (uint i = 0; i < num_waiter_threads; i++) {
738 EXPECT_EQ(RUNNING, params[i]->status);
739 }
740
741 em1.emit();
742 em2.emit();
743
744 for (uint i = 0; i < num_waiter_threads; i++) {
745 ASSERT_TRUE(wait_for_finished(params[i]));
746 pthread_join(waiter_threads[i], NULL);
747 delete params[i];
748 }
749}
750
751/** two barriers, emit the first one. Only the threads waiting on the first
752 * barrier should unblock
753 */
754TEST_F(SyncBarrierTest, BarriersAreIndependent)
755{
756 string barrier1_id = "/test/barrier1";
757 string barrier2_id = "/test/barrier2";
758 Emitter em1("em1", barrier1_id, manager);
759 Emitter em2("em2", barrier2_id, manager);
760
761 RefPtr<SyncPoint> barrier1 = manager->get_syncpoint("m1", barrier1_id);
762
763 RefPtr<SyncPoint> barrier2 = manager->get_syncpoint("m2", barrier2_id);
764
765 const uint num_waiter_threads = 50;
766 const uint num_wait_calls = 1;
767 pthread_t waiter_threads1[num_waiter_threads];
768 waiter_thread_params *params1[num_waiter_threads];
769 for (uint i = 0; i < num_waiter_threads; i++) {
770 params1[i] = new waiter_thread_params();
771 params1[i]->component = "component " + to_string(i);
772 params1[i]->type = SyncPoint::WAIT_FOR_ALL;
773 params1[i]->manager = manager;
774 params1[i]->thread_nr = i;
775 params1[i]->num_wait_calls = num_wait_calls;
776 params1[i]->sp_identifier = barrier1_id;
777 pthread_create(&waiter_threads1[i], &attrs, start_waiter_thread, params1[i]);
778 }
779
780 pthread_t waiter_threads2[num_waiter_threads];
781 waiter_thread_params *params2[num_waiter_threads];
782 for (uint i = 0; i < num_waiter_threads; i++) {
783 params2[i] = new waiter_thread_params();
784 params2[i]->component = "component " + to_string(i);
785 params2[i]->type = SyncPoint::WAIT_FOR_ALL;
786 params2[i]->manager = manager;
787 params2[i]->thread_nr = num_waiter_threads + i;
788 params2[i]->num_wait_calls = num_wait_calls;
789 params2[i]->sp_identifier = barrier2_id;
790 pthread_create(&waiter_threads2[i], &attrs, start_waiter_thread, params2[i]);
791 }
792
793 for (uint i = 0; i < num_waiter_threads; i++) {
794 EXPECT_TRUE(wait_for_running(params1[i]));
795 }
796
797 for (uint i = 0; i < num_waiter_threads; i++) {
798 EXPECT_TRUE(wait_for_running(params2[i]));
799 }
800
801 em1.emit();
802
803 for (uint i = 0; i < num_waiter_threads; i++) {
804 ASSERT_TRUE(wait_for_finished(params1[i]));
805 pthread_join(waiter_threads1[i], NULL);
806 delete params1[i];
807 }
808
809 for (uint i = 0; i < num_waiter_threads; i++) {
810 EXPECT_EQ(RUNNING, params2[i]->status);
811 }
812
813 em2.emit();
814
815 for (uint i = 0; i < num_waiter_threads; i++) {
816 ASSERT_TRUE(wait_for_finished(params2[i]));
817 pthread_join(waiter_threads2[i], NULL);
818 delete params2[i];
819 }
820}
821
822/**
823 * Test the SyncBarrier hierarchy, similar to the SyncPoint hierarchy test.
824 * This creates a SyncBarrier, an emitter and waiters which wait for the
825 * SyncBarrier's predecessor, the predecessor's predecessor (grandparent),
826 * and the root SyncBarrier ("/").
827 */
828TEST_F(SyncBarrierTest, SyncBarrierHierarchy)
829{
830 Emitter em1("emitter 1", "/test/topic/b1", manager);
831 Emitter em2("emitter 2", "/test/topic/b2", manager);
832 Emitter em3("emitter 3", "/other/topic", manager);
833
834 vector<string> identifiers = {"/test/topic", "/test", "/", "/other/topic"};
835 uint num_threads = identifiers.size();
836 pthread_t threads[num_threads];
837 waiter_thread_params *params[num_threads];
838 Barrier * barrier = new Barrier(num_threads + 1);
839 for (uint i = 0; i < num_threads; i++) {
840 params[i] = new waiter_thread_params();
841 params[i]->component = "component " + to_string(i);
842 params[i]->type = SyncPoint::WAIT_FOR_ALL;
843 params[i]->manager = manager;
844 params[i]->thread_nr = i;
845 params[i]->num_wait_calls = 1;
846 params[i]->sp_identifier = identifiers.at(i);
847 params[i]->start_barrier = barrier;
848 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
849 }
850
851 barrier->wait();
852 delete barrier;
853
854 for (uint i = 0; i < num_threads; i++) {
855 EXPECT_TRUE(wait_for_running(params[i]));
856 }
857
858 em1.emit();
859 for (uint i = 0; i < num_threads; i++) {
860 ASSERT_EQ(RUNNING, params[i]->status);
861 }
862 em2.emit();
863 /* The first waiters should be unblocked */
864 for (uint i = 0; i < num_threads - 2; i++) {
865 ASSERT_TRUE(wait_for_finished(params[i]));
866 pthread_join(threads[i], NULL);
867 delete params[i];
868 }
869 /* The last two waiters should still be waiting */
870 for (uint i = num_threads - 2; i < num_threads; i++) {
871 EXPECT_EQ(RUNNING, params[i]->status);
872 pthread_cancel(threads[i]);
873 ASSERT_EQ(0, pthread_join(threads[i], NULL));
874 delete params[i];
875 }
876}
877
878/** One component registers as emitter for two syncpoints, two other components
879 * wait for the first and second syncpoint respectively.
880 * Then, the first component unregisters for the first syncpoint.
881 * Test whether it is still registered for the second syncpoint.
882 * A third waiter waits for the predecessor syncpoint and should also still be
883 * waiting after the emitter has unregistered for the first syncpoint.
884 */
885TEST_F(SyncPointManagerTest, OneEmitterRegistersForMultipleSyncPointsHierarchyTest)
886{
887 string id_sp1 = "/test/sp1";
888 string id_sp2 = "/test/sp2";
889 string id_sp_pred = "/test";
890 string id_emitter = "component_emitter";
891 string id_waiter1 = "component_waiter1";
892 string id_waiter2 = "component_waiter2";
893 string id_waiter3 = "component_waiter_on_predecessor";
894
895 RefPtr<SyncPoint> sp1 = manager->get_syncpoint(id_emitter, id_sp1);
896 RefPtr<SyncPoint> sp2 = manager->get_syncpoint(id_emitter, id_sp2);
897 manager->get_syncpoint(id_waiter1, id_sp1);
898 manager->get_syncpoint(id_waiter2, id_sp2);
899 RefPtr<SyncPoint> pred = manager->get_syncpoint(id_waiter3, id_sp_pred);
900 sp1->register_emitter(id_emitter);
901 sp2->register_emitter(id_emitter);
902 EXPECT_EQ(1, sp1->get_emitters().count(id_emitter));
903 EXPECT_EQ(1, sp2->get_emitters().count(id_emitter));
904 // this should be 2 as the emitter has registered twice
905 EXPECT_EQ(2, pred->get_emitters().count(id_emitter));
906
908 params1->manager = manager;
909 params1->component = id_waiter1;
910 params1->type = SyncPoint::WAIT_FOR_ALL;
911 params1->num_wait_calls = 1;
912 params1->sp_identifier = id_sp1;
913
915 params2->manager = manager;
916 params2->component = id_waiter2;
917 params2->type = SyncPoint::WAIT_FOR_ALL;
918 params2->num_wait_calls = 1;
919 params2->sp_identifier = id_sp2;
920
922 params3->manager = manager;
923 params3->component = id_waiter3;
924 params3->type = SyncPoint::WAIT_FOR_ALL;
925 params3->num_wait_calls = 1;
926 params3->sp_identifier = id_sp_pred;
927
928 pthread_t pthread1;
929 pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
930 pthread_t pthread2;
931 pthread_create(&pthread2, &attrs, start_waiter_thread, params2);
932 pthread_t pthread3;
933 pthread_create(&pthread3, &attrs, start_waiter_thread, params3);
934 EXPECT_TRUE(wait_for_running(params1));
935 EXPECT_TRUE(wait_for_running(params2));
936 EXPECT_TRUE(wait_for_running(params3));
937
938 sp1->emit(id_emitter);
939
940 ASSERT_TRUE(wait_for_finished(params1));
941 ASSERT_FALSE(wait_for_finished(params2, 0, 10 * pow(10, 6)));
942 // this should be waiting as the component has registered twice for '/test'
943 // and thus should emit '/test' also twice (by hierarchical emit calls)
944 ASSERT_FALSE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
945 sp2->emit(id_emitter);
946 ASSERT_TRUE(wait_for_finished(params2, 0, 10 * pow(10, 6)));
947 ASSERT_TRUE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
948
949 pthread_join(pthread1, NULL);
950 pthread_join(pthread2, NULL);
951 pthread_join(pthread3, NULL);
952
953 sp2->unregister_emitter(id_emitter);
954 EXPECT_EQ(1, sp1->get_emitters().count(id_emitter));
955 EXPECT_EQ(0, sp2->get_emitters().count(id_emitter));
956 EXPECT_EQ(1, pred->get_emitters().count(id_emitter));
957
958 pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
959 pthread_create(&pthread2, &attrs, start_waiter_thread, params2);
960 pthread_create(&pthread3, &attrs, start_waiter_thread, params3);
961
962 ASSERT_TRUE(wait_for_running(params1));
963 ASSERT_TRUE(wait_for_running(params3));
964
965 ASSERT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
966 ASSERT_TRUE(wait_for_finished(params2));
967 ASSERT_FALSE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
968
969 sp1->emit(id_emitter);
970 ASSERT_TRUE(wait_for_finished(params1));
971 ASSERT_TRUE(wait_for_finished(params3));
972 pthread_join(pthread1, NULL);
973 pthread_join(pthread2, NULL);
974 pthread_join(pthread3, NULL);
975 delete params1;
976 delete params2;
977 delete params3;
978}
979
980/** Test if an exception is thrown if a registered emitter is currently not
981 * pending
982 */
983TEST_F(SyncBarrierTest, NonPendingEmitterEmits)
984{
985 Emitter em1("em1", "/barrier", manager);
986 // register a second emitter to avoid immediate reset after emit
987 Emitter em2("em2", "/barrier", manager);
988 EXPECT_NO_THROW(em1.emit());
989 EXPECT_NO_THROW(em1.emit());
990}
991
992/** Test if a component waiting for a syncpoint is woken up
993 * if an emitter is registered for two successor syncpoints and the emitter
994 * emits the same syncpoint twice
995 */
996TEST_F(SyncPointManagerTest, EmitterEmitsSameSyncPointTwiceTest)
997{
998 RefPtr<SyncPoint> sp1 = manager->get_syncpoint("emitter", "/test/sp1");
999 RefPtr<SyncPoint> sp2 = manager->get_syncpoint("emitter", "/test/sp2");
1000 RefPtr<SyncPoint> sp_pred = manager->get_syncpoint("waiter", "/test");
1001
1002 sp1->register_emitter("emitter");
1003 sp2->register_emitter("emitter");
1004
1006 params1->manager = manager;
1007 params1->component = "waiter";
1008 params1->type = SyncPoint::WAIT_FOR_ALL;
1009 params1->num_wait_calls = 1;
1010 params1->sp_identifier = "/test";
1011
1012 pthread_t pthread1;
1013 pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
1014
1015 EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1016
1017 sp1->emit("emitter");
1018
1019 EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1020
1021 sp1->emit("emitter");
1022 EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1023
1024 sp2->emit("emitter");
1025 ASSERT_TRUE(wait_for_finished(params1));
1026 pthread_join(pthread1, NULL);
1027
1028 delete params1;
1029}
1030
1031/** Test if the component returns when using reltime_wait */
1032TEST_F(SyncPointManagerTest, RelTimeWaitTest)
1033{
1034 RefPtr<SyncPoint> sp1 = manager->get_syncpoint("emitter", "/test/sp1");
1035 manager->get_syncpoint("waiter", "/test/sp1");
1036 sp1->register_emitter("emitter");
1037 pthread_t thread;
1038 waiter_thread_params params;
1039 params.manager = manager;
1040 params.type = SyncPoint::WAIT_FOR_ALL;
1041 params.num_wait_calls = 1;
1042 params.timeout_sec = 0;
1043 params.timeout_nsec = 100000;
1044 params.component = "waiter";
1045 params.sp_identifier = "/test/sp1";
1046 pthread_create(&thread, NULL, start_waiter_thread, &params);
1047 ASSERT_TRUE(wait_for_finished(&params));
1048 /* The SyncPoint should have logged the error */
1049 ASSERT_GT(cache_logger_->get_messages().size(), 0);
1050}
1051
1052/// @cond INTERNALS
1053struct emitter_thread_data
1054{
1056 std::string name;
1057 std::string sp_name;
1058 atomic<ThreadStatus> status;
1059 Mutex mutex_running;
1060 WaitCondition cond_running = WaitCondition(&mutex_running);
1061 Mutex mutex_finished;
1062 WaitCondition cond_finished = WaitCondition(&mutex_finished);
1063};
1064/// @endcond
1065
1066/** helper function to call emit in a thread */
1067void *
1068call_emit(void *data)
1069{
1070 emitter_thread_data *tdata = (emitter_thread_data *)data;
1071 tdata->status = RUNNING;
1072 tdata->mutex_running.lock();
1073 tdata->cond_running.wake_all();
1074 tdata->mutex_running.unlock();
1075 RefPtr<SyncPoint> sp = tdata->manager->get_syncpoint(tdata->name, tdata->sp_name);
1076 sp->register_emitter(tdata->name);
1077 sp->emit(tdata->name);
1078 tdata->status = FINISHED;
1079 tdata->mutex_finished.lock();
1080 tdata->cond_finished.wake_all();
1081 tdata->mutex_finished.unlock();
1082 return NULL;
1083}
1084
1085/** Test the functionality of lock_until_next_wait */
1086TEST_F(SyncPointManagerTest, LockUntilNextWaitTest)
1087{
1088 RefPtr<SyncPoint> sp = manager->get_syncpoint("component", "/test");
1089
1090 sp->lock_until_next_wait("component");
1091 pthread_t thread;
1092 emitter_thread_data *emitter_params = new emitter_thread_data();
1093 emitter_params->manager = manager;
1094 emitter_params->name = "emitter";
1095 emitter_params->sp_name = "/test";
1096 pthread_create(&thread, NULL, call_emit, (void *)emitter_params);
1097
1098 emitter_params->mutex_running.lock();
1099 if (emitter_params->status != RUNNING) {
1100 ASSERT_TRUE(emitter_params->cond_running.reltimed_wait(1, 0));
1101 }
1102 emitter_params->mutex_running.unlock();
1103 emitter_params->mutex_finished.lock();
1104 EXPECT_FALSE(emitter_params->cond_finished.reltimed_wait(0, 100000));
1105 emitter_params->mutex_finished.unlock();
1106
1107 pthread_t waiter_thread;
1108 waiter_thread_params waiter_params;
1109 waiter_params.manager = manager;
1110 waiter_params.component = "component";
1111 waiter_params.num_wait_calls = 1;
1112 waiter_params.sp_identifier = "/test";
1113 pthread_create(&waiter_thread, NULL, start_waiter_thread, &waiter_params);
1114
1115 emitter_params->mutex_finished.lock();
1116 ASSERT_TRUE(emitter_params->status == FINISHED
1117 || emitter_params->cond_finished.reltimed_wait(1, 0));
1118 emitter_params->mutex_finished.unlock();
1119 pthread_join(thread, NULL);
1120 pthread_join(waiter_thread, NULL);
1121 delete emitter_params;
1122}
1123
1124/** helper function used for testing wait() */
1125void *
1126call_wait_for_all(void *data)
1127{
1128 SyncPoint *sp = (SyncPoint *)(data);
1129 sp->wait_for_all("waiter");
1130 return NULL;
1131}
1132
1133/** Test the functionality of lock_until_next_wait
1134 * Test whether the waiter really calls wait before ALL emitters call emit
1135 * This tests a potential race condition between wait() and emit() */
1136TEST_F(SyncPointManagerTest, LockUntilNextWaitWaiterComesFirstTest)
1137{
1138 RefPtr<SyncPoint> sp = manager->get_syncpoint("waiter", "/test");
1139
1140 sp->lock_until_next_wait("waiter");
1141
1142 uint num_emitters = 100;
1143 pthread_t emitter_thread[num_emitters];
1144 emitter_thread_data *params[num_emitters];
1145 for (uint i = 0; i < num_emitters; i++) {
1146 params[i] = new emitter_thread_data();
1147 params[i]->manager = manager;
1148 string emitter_name = "emitter" + to_string(i);
1149 params[i]->name = emitter_name;
1150 params[i]->sp_name = "/test";
1151 pthread_create(&emitter_thread[i], NULL, call_emit, (void *)params[i]);
1152 }
1153
1154 for (uint i = 0; i < num_emitters; i++) {
1155 params[i]->mutex_running.lock();
1156 if (params[i]->status != RUNNING) {
1157 ASSERT_TRUE(params[i]->cond_running.reltimed_wait(1, 0));
1158 }
1159 params[i]->mutex_running.unlock();
1160 }
1161
1162 pthread_t waiter_thread;
1164 thread_params.component = "waiter";
1165 thread_params.type = SyncPoint::WAIT_FOR_ALL;
1166 thread_params.manager = manager;
1167 thread_params.thread_nr = 1;
1168 thread_params.num_wait_calls = 1;
1169 thread_params.sp_identifier = "/test";
1170 pthread_create(&waiter_thread, &attrs, start_waiter_thread, &thread_params);
1171
1172 for (uint i = 0; i < num_emitters; i++) {
1173 params[i]->mutex_finished.lock();
1174 ASSERT_TRUE(params[i]->status == FINISHED || params[i]->cond_finished.reltimed_wait(1, 0));
1175 params[i]->mutex_finished.unlock();
1176 pthread_join(emitter_thread[i], NULL);
1177 delete params[i];
1178 }
1179
1180 ASSERT_TRUE(wait_for_finished(&thread_params));
1181 pthread_join(waiter_thread, NULL);
1182}
1183
1184/** Test whether all waiters are always released at the same time, even if one
1185 * waiter called wait after one emitter already emitted. In particular, this
1186 * tests the following scenario:
1187 * 1. waiter1: wait
1188 * 2. emitter1: emit
1189 * 3. waiter2: wait
1190 * 4. emitter2: emit
1191 * 5. both waiter1 and waiter2 are released
1192 */
1193TEST_F(SyncPointManagerTest, WaitersAreAlwaysReleasedSimultaneouslyTest)
1194{
1195 string sp_identifier = "/test";
1196 RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter1", sp_identifier);
1197 manager->get_syncpoint("emitter2", sp_identifier);
1198 sp->register_emitter("emitter1");
1199 sp->register_emitter("emitter2");
1200 uint num_threads = 2;
1201 pthread_t threads[num_threads];
1202 waiter_thread_params params[num_threads];
1203 for (uint i = 0; i < num_threads; i++) {
1204 params[i].component = "component " + to_string(i);
1205 params[i].manager = manager;
1206 params[i].type = SyncPoint::WAIT_FOR_ALL;
1207 params[i].thread_nr = i;
1208 params[i].num_wait_calls = 1;
1209 params[i].sp_identifier = sp_identifier;
1210 }
1211 pthread_create(&threads[0], &attrs, start_waiter_thread, &params[0]);
1212 ASSERT_FALSE(wait_for_finished(&params[0], 0, 10 * pow(10, 6)));
1213 sp->emit("emitter1");
1214 ASSERT_FALSE(wait_for_finished(&params[0], 0, 10 * pow(10, 6)));
1215 pthread_create(&threads[1], &attrs, start_waiter_thread, &params[1]);
1216 for (uint i = 0; i < num_threads; i++) {
1217 ASSERT_FALSE(wait_for_finished(&params[i], 0, 10 * pow(10, 6)));
1218 }
1219 sp->emit("emitter2");
1220 for (uint i = 0; i < num_threads; i++) {
1221 ASSERT_TRUE(wait_for_finished(&params[i]));
1222 pthread_join(threads[i], NULL);
1223 }
1224}
1225
1226/** Test whether all syncpoints are released simultaneously if a timeout occurs;
1227 * i.e. make sure that only the first waiter's timeout matters and all
1228 * subsequent waiters are released when the first waiter times out.
1229 */
1230TEST_F(SyncPointManagerTest, WaitersTimeoutSimultaneousReleaseTest)
1231{
1232 RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter1", "/test");
1233 sp->register_emitter("emitter1");
1234 uint num_threads = 2;
1235 pthread_t threads[num_threads];
1236 string sp_identifier = "/test";
1237 waiter_thread_params params[num_threads];
1238 for (uint i = 0; i < num_threads; i++) {
1239 params[i].component = "component " + to_string(i);
1240 params[i].type = SyncPoint::WAIT_FOR_ALL;
1241 params[i].manager = manager;
1242 params[i].thread_nr = i;
1243 params[i].num_wait_calls = 1;
1244 params[i].timeout_sec = 0;
1245 params[i].timeout_nsec = 100 * pow(10, 6);
1246 params[i].sp_identifier = sp_identifier;
1247 }
1248 pthread_create(&threads[0], &attrs, start_waiter_thread, &params[0]);
1249 EXPECT_TRUE(wait_for_running(&params[0]));
1250 params[1].timeout_sec = 5;
1251 params[1].timeout_nsec = 0;
1252 pthread_create(&threads[1], &attrs, start_waiter_thread, &params[1]);
1253 for (uint i = 0; i < num_threads; i++) {
1254 EXPECT_TRUE(wait_for_running(&params[i]));
1255 }
1256 wait_for_finished(&params[0], params[0].timeout_sec, params[0].timeout_nsec);
1257 wait_for_finished(&params[1], 0, pow(10, 6));
1258 for (uint i = 0; i < num_threads; i++) {
1259 pthread_join(threads[i], NULL);
1260 }
1261}
1262
1263/** Similar as before, test if the timeout is handled properly. This time, let
1264 * a wait_for_one with a short timeout step by. The other waiters should not be
1265 * affected, i.e. they should still be waiting even when the timeout for the
1266 * wait_for_one occurred.
1267 * In other words, wait_for_one waiters are handled completeley separately.
1268 */
1269TEST_F(SyncPointManagerTest, WaitForOneSeparateTimeoutTest)
1270{
1271 RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter1", "/test");
1272 sp->register_emitter("emitter1");
1273 string sp_identifier = "/test";
1274 uint num_threads = 2;
1275 Barrier * barrier = new Barrier(num_threads + 2);
1276 pthread_t wait_for_one_thread;
1277 waiter_thread_params wait_for_one_params;
1278 wait_for_one_params.component = "wait_for_one";
1279 wait_for_one_params.type = SyncPoint::WAIT_FOR_ONE;
1280 wait_for_one_params.manager = manager;
1281 wait_for_one_params.thread_nr = 2;
1282 wait_for_one_params.num_wait_calls = 1;
1283 wait_for_one_params.timeout_sec = 0;
1284 wait_for_one_params.timeout_nsec = 100 * pow(10, 6);
1285 wait_for_one_params.status = PENDING;
1286 wait_for_one_params.sp_identifier = sp_identifier;
1287 wait_for_one_params.start_barrier = barrier;
1288 pthread_create(&wait_for_one_thread, &attrs, start_waiter_thread, &wait_for_one_params);
1289 pthread_t threads[num_threads];
1290 waiter_thread_params params[num_threads];
1291 for (uint i = 0; i < num_threads; i++) {
1292 params[i].component = "component " + to_string(i);
1293 params[i].type = SyncPoint::WAIT_FOR_ALL;
1294 params[i].manager = manager;
1295 params[i].thread_nr = i;
1296 params[i].num_wait_calls = 1;
1297 params[i].timeout_sec = 1;
1298 params[i].timeout_nsec = 0;
1299 params[i].sp_identifier = sp_identifier;
1300 params[i].start_barrier = barrier;
1301 pthread_create(&threads[i], &attrs, start_waiter_thread, &params[i]);
1302 }
1303 barrier->wait();
1304 EXPECT_TRUE(wait_for_running(&wait_for_one_params));
1305 for (uint i = 0; i < num_threads; i++) {
1306 EXPECT_TRUE(wait_for_running(&params[i]));
1307 }
1308 EXPECT_TRUE(wait_for_finished(&wait_for_one_params));
1309 for (uint i = 0; i < num_threads; i++) {
1310 EXPECT_EQ(RUNNING, params[i].status);
1311 }
1312 for (uint i = 0; i < num_threads; i++) {
1313 EXPECT_TRUE(wait_for_finished(&params[i], params[i].timeout_sec, params[i].timeout_nsec));
1314 pthread_join(threads[i], NULL);
1315 }
1316 pthread_join(wait_for_one_thread, NULL);
1317}
1318
1319TEST_F(SyncPointManagerTest, MultipleWaitsWithoutEmitters)
1320{
1321 RefPtr<SyncPoint> sp = manager->get_syncpoint("waiter", "/test");
1322 pthread_t waiter_thread;
1324 thread_params.component = "waiter";
1325 thread_params.type = SyncPoint::WAIT_FOR_ALL;
1326 thread_params.manager = manager;
1327 thread_params.thread_nr = 1;
1328 thread_params.num_wait_calls = 2;
1329 thread_params.sp_identifier = "/test";
1330 pthread_create(&waiter_thread, &attrs, start_waiter_thread, &thread_params);
1331 ASSERT_TRUE(wait_for_finished(&thread_params));
1332 pthread_join(waiter_thread, NULL);
1333}
1334
1335TEST_F(SyncPointManagerTest, ReleaseOfEmitterThrowsException)
1336{
1337 RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter", "/test");
1338 sp->register_emitter("emitter");
1339 ASSERT_THROW(manager->release_syncpoint("emitter", sp), SyncPointCannotReleaseEmitter);
1340}
1341
1342TEST_F(SyncPointManagerTest, UnregisterNonEmitter)
1343{
1344 RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter", "/test");
1345 // "emitter" is a watcher but not an emitter
1346 EXPECT_NO_THROW(sp->unregister_emitter("emitter"));
1347 // "foo" is not known to the syncpoint
1348 EXPECT_NO_THROW(sp->unregister_emitter("foo"));
1349}
1350
1351TEST_F(SyncPointManagerTest, ReleaseBarrierWaiter)
1352{
1353 RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter", "/test");
1354 sp->register_emitter("emitter");
1355 pthread_t waiter_thread;
1357 thread_params.component = "component 1";
1358 thread_params.type = SyncPoint::WAIT_FOR_ALL;
1359 thread_params.manager = manager;
1360 thread_params.thread_nr = 1;
1361 thread_params.num_wait_calls = 1;
1362 thread_params.sp_identifier = "/test";
1363 thread_params.timeout_sec = 2;
1364 pthread_create(&waiter_thread, &attrs, start_waiter_thread, &thread_params);
1365 EXPECT_TRUE(wait_for_running(&thread_params));
1366 ASSERT_TRUE(sp->watcher_is_waiting("component 1", SyncPoint::WAIT_FOR_ALL));
1367 pthread_cancel(waiter_thread);
1368 pthread_join(waiter_thread, NULL);
1369 ASSERT_TRUE(sp->watcher_is_waiting("component 1", SyncPoint::WAIT_FOR_ALL));
1370 manager->release_syncpoint("component 1", sp);
1371 sp = manager->get_syncpoint("component 1", "/test");
1372 EXPECT_NO_THROW(sp->reltime_wait_for_all("component 1", 0, pow(10, 6)));
1373}
Helper class which registers and emits a given SyncBarrier.
virtual ~Emitter()
Destructor.
Emitter(string identifier, string syncbarrier, RefPtr< SyncPointManager > manager)
Constructor.
void emit()
emit the SyncBarrier
Test SyncBarriers.
SyncBarrierTest()
Constructor.
Test class for SyncPointManager This class tests basic functionality of the SyncPointManager.
SyncPointManagerTest()
Initialize the test class.
virtual ~SyncPointManagerTest()
Deinitialize the test class.
RefPtr< SyncPointManager > manager
A Pointer to a SyncPointManager.
pthread_attr_t attrs
Thread attributes.
CacheLogger * cache_logger_
Cache Logger used for testing.
MultiLogger * logger_
Logger used to initialize SyncPoints.
Test class for SyncPoint This class tests basic functionality of SyncPoints.
MultiLogger * logger_
Logger for testing.
virtual void SetUp()
Initialize the test class.
RefPtr< SyncPoint > sp1
A syncpoint for testing.
RefPtr< SyncPoint > sp3
A syncpoint for testing.
RefPtr< SyncPoint > sp2
A syncpoint for testing.
virtual void TearDown()
Clean up.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:32
virtual void wait()
Wait for other threads.
Definition: barrier.cpp:153
Logging Cache.
Definition: cache.h:38
Log through multiple loggers.
Definition: multi.h:35
void add_logger(Logger *logger)
Add a logger.
Definition: multi.cpp:110
Mutex locking helper.
Definition: mutex_locker.h:34
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
The component called release but is still registered as emitter.
Definition: exceptions.h:203
Invalid component name used (i.e.
Definition: exceptions.h:140
Invalid identifier used (i.e.
Definition: exceptions.h:122
This class gives access to SyncPoints.
void release_syncpoint(const std::string &component, RefPtr< SyncPoint > syncpoint)
Release a SyncPoint.
RefPtr< SyncPoint > get_syncpoint(const std::string &component, const std::string &identifier)
Get a SyncPoint.
A component called wait() but is already waiting.
Definition: exceptions.h:156
Emit was called on a SyncBarrier but the calling component is not registered as emitter.
Definition: exceptions.h:174
Compare sets of syncpoints.
Definition: syncpoint.h:44
The SyncPoint class.
Definition: syncpoint.h:50
virtual void wait(const std::string &component, WakeupType=WAIT_FOR_ONE, uint wait_sec=0, uint wait_nsec=0)
wait for the sync point to be emitted by any other component
Definition: syncpoint.cpp:241
std::multiset< std::string > get_emitters() const
Definition: syncpoint.cpp:545
virtual void reltime_wait_for_all(const std::string &component, uint wait_sec, uint wait_nsec)
Wait for all registered emitters for the given time.
Definition: syncpoint.cpp:387
virtual void unregister_emitter(const std::string &component, bool emit_if_pending=true)
unregister as emitter
Definition: syncpoint.cpp:456
std::set< std::string > get_watchers() const
Definition: syncpoint.cpp:518
virtual void register_emitter(const std::string &component)
register as emitter
Definition: syncpoint.cpp:439
bool watcher_is_waiting(std::string watcher, WakeupType type) const
Check if the given waiter is currently waiting with the given type.
Definition: syncpoint.cpp:567
virtual void emit(const std::string &component)
send a signal to all waiting threads
Definition: syncpoint.cpp:150
void lock_until_next_wait(const std::string &component)
Lock the SyncPoint for emitters until the specified component does the next wait() call.
Definition: syncpoint.cpp:418
std::string get_identifier() const
Definition: syncpoint.cpp:107
virtual void wait_for_all(const std::string &component)
Wait for all registered emitters.
Definition: syncpoint.cpp:365
WakeupType
Type to define when a thread wakes up after waiting for a SyncPoint.
Definition: syncpoint.h:56
Wait until a given condition holds.
void wake_all()
Wake up all waiting threads.
bool reltimed_wait(unsigned int sec, unsigned int nanosec)
Wait with relative timeout.
Fawkes library namespace.
The parameters passed to the threads.
struct used for multithreading tests
uint num_wait_calls
Number of wait calls the thread should make.
Mutex mutex_finished
Mutex to protect cond_finished.
Barrier * start_barrier
Barrier for startup synchronization.
WaitCondition cond_finished
WaitCondition to indicate that the thread has finished.
RefPtr< SyncPointManager > manager
SyncPointManager passed to the thread.
WaitCondition cond_running
WaitCondition to indicate that the thread is running.
uint timeout_nsec
timeout in nsec
SyncPoint::WakeupType type
Wait type.
uint timeout_sec
timeout in sec
uint thread_nr
Thread number.
string component
Name of the component.
atomic< ThreadStatus > status
current status of the thread
string sp_identifier
Name of the SyncPoint.
Mutex mutex_running
Mutex to protect cond_running.