Fawkes API Fawkes Development Version
mongodb_log_tf_thread.cpp
1
2/***************************************************************************
3 * mongodb_log_tf_thread.cpp - MongoDB transforms logging Thread
4 *
5 * Created: Tue Dec 11 14:58:00 2012 (Freiburg)
6 * Copyright 2010-2017 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Library General Public License for more details.
18 *
19 * Read the full text in the LICENSE.GPL file in the doc directory.
20 */
21
22#include "mongodb_log_tf_thread.h"
23
24#include <core/threading/mutex_locker.h>
25#include <plugins/mongodb/aspect/mongodb_conncreator.h>
26#include <tf/time_cache.h>
27#include <utils/time/wait.h>
28
29#include <bsoncxx/builder/basic/document.hpp>
30#include <cstdlib>
31#include <mongocxx/client.hpp>
32#include <mongocxx/exception/operation_exception.hpp>
33
34using namespace mongocxx;
35using namespace fawkes;
36
37/** @class MongoLogTransformsThread "mongodb_log_tf_thread.h"
38 * MongoDB transforms logging thread.
39 * This thread periodically queries the transformer for recent transforms
40 * and stores them to the database.
41 *
42 * @author Tim Niemueller
43 */
44
45/** Constructor. */
47: Thread("MongoLogTransformsThread", Thread::OPMODE_CONTINUOUS),
48 MongoDBAspect("default"),
49 TransformAspect(TransformAspect::ONLY_LISTENER)
50{
52}
53
54/** Destructor. */
56{
57}
58
59void
61{
62 database_ = "fflog";
63 collection_ = "tf";
64 try {
65 database_ = config->get_string("/plugins/mongodb-log/database");
66 } catch (Exception &e) {
67 logger->log_info(name(), "No database configured, writing to %s", database_.c_str());
68 }
69
70 try {
71 collection_ = config->get_string("/plugins/mongodb-log/transforms/collection");
72 } catch (Exception &e) {
73 logger->log_info(name(), "No transforms collection configured, using %s", collection_.c_str());
74 }
75 collection_ = database_ + "." + collection_;
76
77 cfg_storage_interval_ = config->get_float("/plugins/mongodb-log/transforms/storage-interval");
78
79 if (cfg_storage_interval_ <= 0.) {
80 // 50% of the cache time, assume 50% is enough to store the data
81 cfg_storage_interval_ = tf_listener->get_cache_time() * 0.5;
82 }
83
84 wait_ = new TimeWait(clock, cfg_storage_interval_ * 1000000.);
85 mutex_ = new Mutex();
86}
87
88bool
90{
91 mutex_->lock();
92 return true;
93}
94
95void
97{
98 delete wait_;
99 delete mutex_;
100}
101
102void
104{
105 mutex_->lock();
106 fawkes::Time loop_start(clock);
107 wait_->mark_start();
108 std::vector<fawkes::Time> tf_range_start;
109 std::vector<fawkes::Time> tf_range_end;
110 ;
111
112 tf_listener->lock();
113 std::vector<tf::TimeCacheInterfacePtr> caches = tf_listener->get_frame_caches();
114 std::vector<tf::TimeCacheInterfacePtr> copies(caches.size(), tf::TimeCacheInterfacePtr());
115
116 const size_t n_caches = caches.size();
117 tf_range_start.resize(n_caches, fawkes::Time(0, 0));
118 tf_range_end.resize(n_caches, fawkes::Time(0, 0));
119 if (last_tf_range_end_.size() != n_caches) {
120 last_tf_range_end_.resize(n_caches, fawkes::Time(0, 0));
121 }
122
123 unsigned int num_transforms = 0;
124 unsigned int num_upd_caches = 0;
125
126 for (size_t i = 0; i < n_caches; ++i) {
127 if (caches[i]) {
128 tf_range_end[i] = caches[i]->get_latest_timestamp();
129 if (last_tf_range_end_[i] != tf_range_end[i]) {
130 // we have new data
131 if (!tf_range_end[i].is_zero()) {
132 tf_range_start[i] = tf_range_end[i] - cfg_storage_interval_;
133 if (last_tf_range_end_[i] > tf_range_start[i]) {
134 tf_range_start[i] = last_tf_range_end_[i];
135 }
136 }
137 copies[i] = caches[i]->clone(tf_range_start[i]);
138 last_tf_range_end_[i] = tf_range_end[i];
139 num_upd_caches += 1;
140 num_transforms += copies[i]->get_list_length();
141 }
142 }
143 }
145
146 store(copies, tf_range_start, tf_range_end);
147
148 mutex_->unlock();
149 // -1 to subtract "NO PARENT" pseudo cache
150 fawkes::Time loop_end(clock);
152 "%u transforms for %u updated frames stored in %.1f ms",
153 num_transforms,
154 num_upd_caches,
155 (loop_end - &loop_start) * 1000.);
156 wait_->wait();
157}
158
159void
160MongoLogTransformsThread::store(std::vector<tf::TimeCacheInterfacePtr> &caches,
161 std::vector<fawkes::Time> & from,
162 std::vector<fawkes::Time> & to)
163{
164 std::vector<std::string> frame_map = tf_listener->get_frame_id_mappings();
165
166 for (size_t i = 0; i < caches.size(); ++i) {
167 tf::TimeCacheInterfacePtr tc = caches[i];
168 if (!tc)
169 continue;
170
171 using namespace bsoncxx::builder;
172 basic::document document;
173
174 document.append(basic::kvp("timestamp", static_cast<int64_t>(from[i].in_msec())));
175 document.append(basic::kvp("timestamp_from", static_cast<int64_t>(from[i].in_msec())));
176 document.append(basic::kvp("timestamp_to", static_cast<int64_t>(to[i].in_msec())));
177 const tf::TimeCache::L_TransformStorage &storage = tc->get_storage();
178
179 if (storage.empty()) {
180 /*
181 fawkes::Time now(clock);
182 logger->log_warn(name(), "Empty storage for %s (start: %lu end: %lu now: %lu",
183 frame_map[i].c_str(), from[i].in_msec(), to[i].in_msec(),
184 now.in_msec());
185 */
186 continue;
187 }
188
189 document.append(basic::kvp("frame", frame_map[storage.front().frame_id]));
190 document.append(basic::kvp("child_frame", frame_map[storage.front().child_frame_id]));
191
192 /*
193 logger->log_debug(name(), "Writing %zu transforms for child frame %s",
194 storage.size(), frame_map[i].c_str());
195 */
196
197 document.append(basic::kvp("transforms", [storage, frame_map](basic::sub_array array) {
198 for (auto s = storage.begin(); s != storage.end(); ++s) {
199 /*
200 "frame" : "/bl_caster_rotation_link",
201 "child_frame" : "/bl_caster_l_wheel_link",
202 "translation" : [
203 0,
204 0.049,
205 0
206 ],
207 "rotation" : [
208 0,
209 0.607301261865804,
210 0,
211 0.7944716340664417
212 ]
213 */
214 basic::document tf_doc;
215 tf_doc.append(basic::kvp("timestamp", static_cast<int64_t>(s->stamp.in_msec())));
216 tf_doc.append(basic::kvp("frame", frame_map[s->frame_id]));
217 tf_doc.append(basic::kvp("child_frame", frame_map[s->child_frame_id]));
218 tf_doc.append(basic::kvp("rotation", [s](basic::sub_array rot_array) {
219 rot_array.append(s->rotation.x());
220 rot_array.append(s->rotation.y());
221 rot_array.append(s->rotation.z());
222 rot_array.append(s->rotation.w());
223 }));
224 tf_doc.append(basic::kvp("translation", [s](basic::sub_array trans_array) {
225 trans_array.append(s->translation.x());
226 trans_array.append(s->translation.y());
227 trans_array.append(s->translation.z());
228 }));
229 array.append(tf_doc);
230 }
231 }));
232
233 try {
234 mongodb_client->database(database_)[collection_].insert_one(document.view());
235 } catch (operation_exception &e) {
236 logger->log_warn(name(), "Inserting TF failed: %s", e.what());
237
238 } catch (std::exception &e) {
239 // for old and broken compilers
240 logger->log_warn(name(), "Inserting TF failed: %s (*)", e.what());
241 }
242 }
243}
virtual ~MongoLogTransformsThread()
Destructor.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
virtual void finalize()
Finalize the thread.
virtual void init()
Initialize the thread.
virtual void loop()
Code to execute in the thread.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Thread aspect to access MongoDB.
Definition: mongodb.h:39
mongocxx::client * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:54
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
Thread class encapsulation of pthreads.
Definition: thread.h:46
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
const char * name() const
Get name of thread.
Definition: thread.h:100
Time wait utility.
Definition: wait.h:33
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
void wait()
Wait until minimum loop time has been reached.
Definition: wait.cpp:78
A class for handling time.
Definition: time.h:93
Thread aspect to access the transform system.
Definition: tf.h:39
tf::Transformer * tf_listener
This is the transform listener which saves transforms published by other threads in the system.
Definition: tf.h:67
std::list< TransformStorage > L_TransformStorage
List of stored transforms.
Definition: time_cache.h:74
float get_cache_time() const
Get cache time.
void lock()
Lock transformer.
std::vector< TimeCacheInterfacePtr > get_frame_caches() const
Get all currently existing caches.
std::vector< std::string > get_frame_id_mappings() const
Get mappings from frame ID to names.
Fawkes library namespace.