23#include "mongodb_log_pcl_thread.h"
26#include <core/threading/mutex_locker.h>
27#include <utils/time/wait.h>
31#include <mongocxx/client.hpp>
32#include <mongocxx/exception/operation_exception.hpp>
33#include <mongocxx/gridfs/uploader.hpp>
37using namespace mongocxx;
67 cfg_storage_interval_ =
config->
get_float(
"/plugins/mongodb-log/pointclouds/storage-interval");
69 cfg_chunk_size_ = 2097152;
71 cfg_chunk_size_ =
config->
get_uint(
"/plugins/mongodb-log/pointclouds/chunk-size");
76 cfg_flush_after_write_ =
false;
78 cfg_flush_after_write_ =
config->
get_uint(
"/plugins/mongodb-log/pointclouds/flush-after-write");
82 std::vector<std::string> includes;
87 std::vector<std::string> excludes;
95 gridfs_ = mongodb_->database(database_).gridfs_bucket();
102 std::vector<std::string>::iterator p;
103 std::vector<std::string>::iterator f;
104 for (p = pcls.begin(); p != pcls.end(); ++p) {
105 bool include = includes.empty();
107 for (f = includes.begin(); f != includes.end(); ++f) {
108 if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
116 for (f = excludes.begin(); f != excludes.end(); ++f) {
117 if (fnmatch(f->c_str(), p->c_str(), 0) != FNM_NOMATCH) {
131 std::string topic_name = std::string(
"PointClouds.") + *p;
133 while ((pos = topic_name.find_first_of(
" -", pos)) != std::string::npos) {
134 topic_name.replace(pos, 1,
"_");
138 pi.topic_name = topic_name;
142 std::string frame_id;
143 unsigned int width, height;
146 adapter_->
get_info(*p, width, height, frame_id, is_dense, fieldinfo);
147 pi.msg.header.frame_id = frame_id;
148 pi.msg.width = width;
149 pi.msg.height = height;
150 pi.msg.is_dense = is_dense;
151 pi.msg.fields.clear();
152 pi.msg.fields.resize(fieldinfo.size());
153 for (
unsigned int i = 0; i < fieldinfo.size(); ++i) {
154 pi.msg.fields[i].name = fieldinfo[i].name;
155 pi.msg.fields[i].offset = fieldinfo[i].offset;
156 pi.msg.fields[i].datatype = fieldinfo[i].datatype;
157 pi.msg.fields[i].count = fieldinfo[i].count;
163 wait_ =
new TimeWait(
clock, cfg_storage_interval_ * 1000000.);
164 mutex_ =
new Mutex();
188 std::map<std::string, PointCloudInfo>::iterator p;
189 unsigned int num_stored = 0;
190 for (p = pcls_.begin(); p != pcls_.end(); ++p) {
191 PointCloudInfo &pi = p->second;
192 std::string frame_id;
193 unsigned int width, height;
195 size_t point_size, num_points;
198 p->first, frame_id, width, height, time, &point_data, point_size, num_points);
199 size_t data_size = point_size /
sizeof(uint8_t) * num_points;
201 if (pi.last_sent != time) {
206 using namespace bsoncxx::builder;
207 basic::document document;
208 std::stringstream
name;
210 auto uploader = gridfs_.open_upload_stream(
name.str());
211 uploader.write((uint8_t *)point_data, data_size);
212 auto result = uploader.close();
213 document.append(basic::kvp(
"timestamp",
static_cast<int64_t
>(time.
in_msec())));
214 document.append(basic::kvp(
"pointcloud", [&](basic::sub_document subdoc) {
215 subdoc.append(basic::kvp(
"frame_id", pi.msg.header.frame_id));
216 subdoc.append(basic::kvp(
"is_dense", pi.msg.is_dense));
217 subdoc.append(basic::kvp(
"width",
static_cast<int32_t
>(width)));
218 subdoc.append(basic::kvp(
"height",
static_cast<int32_t
>(height)));
219 subdoc.append(basic::kvp(
"point_size",
static_cast<int32_t
>(point_size)));
220 subdoc.append(basic::kvp(
"num_points",
static_cast<int32_t
>(num_points)));
222 subdoc.append(basic::kvp(
"data", result.id()));
223 subdoc.append(basic::kvp(
"field_info", [pi](basic::sub_array array) {
224 for (uint i = 0; i < pi.msg.fields.size(); i++) {
225 basic::document field_info_doc;
226 field_info_doc.append(basic::kvp(
"name", pi.msg.fields[i].name));
227 field_info_doc.append(
228 basic::kvp(
"offset", static_cast<int32_t>(pi.msg.fields[i].offset)));
229 field_info_doc.append(
230 basic::kvp(
"datatype", static_cast<int32_t>(pi.msg.fields[i].datatype)));
231 field_info_doc.append(
232 basic::kvp(
"count", static_cast<int32_t>(pi.msg.fields[i].count)));
233 array.append(field_info_doc);
239 mongodb_->database(database_)[pi.topic_name].insert_one(document.view());
241 }
catch (operation_exception &e) {
243 "Failed to insert into %s: %s",
249 float diff = (end - &
start) * 1000.;
251 "Stored point cloud %s (time %li) in %.1f ms",
265 "Stored %u of %zu point clouds in %.1f ms",
268 (loop_end - &loop_start) * 1000.);
270 if (cfg_flush_after_write_) {
272 using namespace bsoncxx::builder;
273 basic::document flush_cmd;
274 flush_cmd.append(basic::kvp(
"fsync", 1));
275 flush_cmd.append(basic::kvp(
"async", 1));
276 auto reply = mongodb_client->database(
"admin").run_command(flush_cmd.view());
277 if (reply.view()[
"ok"].get_double() != 1) {
280 reply.view()[
"errmsg"].get_utf8().value.to_string().c_str());
MongoLogPointCloudThread()
Constructor.
virtual void init()
Initialize the thread.
virtual ~MongoLogPointCloudThread()
Destructor.
virtual void loop()
Code to execute in the thread.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
virtual void finalize()
Finalize the thread.
Point cloud adapter class.
void get_data(const std::string &id, std::string &frame_id, unsigned int &width, unsigned int &height, fawkes::Time &time, void **data_ptr, size_t &point_size, size_t &num_points)
Get current data of point cloud.
void get_info(const std::string &id, unsigned int &width, unsigned int &height, std::string &frame_id, bool &is_dense, V_PointFieldInfo &pfi)
Get info about point cloud.
std::vector< PointFieldInfo > V_PointFieldInfo
Vector of PointFieldInfo.
Clock * clock
By means of this member access to the clock is given.
Configuration * config
This is the Configuration member used to access the configuration.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Thread aspect to access MongoDB.
mongocxx::client * mongodb_client
MongoDB client to use to interact with the database.
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
virtual void log_debug(const char *component, const char *format,...)
Log debug message.
Mutex mutual exclusion lock.
void lock()
Lock this mutex.
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
std::vector< std::string > get_pointcloud_list() const
Get list of point cloud IDs.
Thread class encapsulation of pthreads.
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
const char * name() const
Get name of thread.
void start(bool wait=true)
Call this method to start the thread.
void mark_start()
Mark start of loop.
A class for handling time.
long in_msec() const
Convert the stored time into milli-seconds.
Fawkes library namespace.