Fawkes API Fawkes Development Version
pcl_db_store_thread.cpp
1
2/***************************************************************************
3 * pcl_db_store_thread.cpp - Store point clouds to MongoDB
4 *
5 * Created: Mon May 05 14:26:22 2014
6 * Copyright 2012-2014 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Library General Public License for more details.
18 *
19 * Read the full text in the LICENSE.GPL file in the doc directory.
20 */
21
22#include "pcl_db_store_thread.h"
23
24#include <blackboard/utils/on_message_waker.h>
25#include <interfaces/PclDatabaseStoreInterface.h>
26#include <pcl_utils/pcl_adapter.h>
27
28// from MongoDB
29#include <bsoncxx/builder/basic/document.hpp>
30#include <mongocxx/client.hpp>
31#include <mongocxx/exception/operation_exception.hpp>
32#include <mongocxx/gridfs/bucket.hpp>
33#include <mongocxx/gridfs/uploader.hpp>
34
35#define CFG_PREFIX "/perception/pcl-db/"
36
37using namespace fawkes;
38using namespace mongocxx;
39
40/** @class PointCloudDBStoreThread "pcl_db_store_thread.h"
41 * Thread to store point clouds from database on request.
42 * @author Tim Niemueller
43 */
44
45/** Constructor. */
47: Thread("PointCloudDBStoreThread", Thread::OPMODE_WAITFORWAKEUP), MongoDBAspect("default")
48{
49}
50
51/** Destructor. */
53{
54}
55
56void
58{
59 store_if_ = NULL;
60 adapter_ = NULL;
61 msg_waker_ = NULL;
62
63 cfg_database_ = config->get_string(CFG_PREFIX "database-name");
64
65 adapter_ = new PointCloudAdapter(pcl_manager, logger);
66
67 try {
68 store_if_ = blackboard->open_for_writing<PclDatabaseStoreInterface>("PCL Database Store");
69
70 msg_waker_ = new BlackBoardOnMessageWaker(blackboard, store_if_, this);
71 } catch (Exception &e) {
72 finalize();
73 throw;
74 }
75}
76
77void
79{
80 delete msg_waker_;
81 blackboard->close(store_if_);
82 delete adapter_;
83}
84
85void
87{
88 if (store_if_->msgq_empty())
89 return;
90
91 if (PclDatabaseStoreInterface::StoreMessage *msg = store_if_->msgq_first_safe(msg)) {
92 store_if_->set_final(false);
93 store_if_->set_msgid(msg->id());
94 store_if_->set_error("");
95 store_if_->write();
96
97 std::string msg_database = msg->database();
98 std::string msg_collection = msg->collection();
99
100 bool store = true;
101 std::string errmsg;
102 std::string database = (msg_database != "") ? msg_database : cfg_database_;
103 std::string collection = database + ".";
104 if (msg_collection == "") {
105 collection += "pcls";
106 } else if (msg_collection.compare(0, 3, "fs.") == 0) {
107 errmsg = "Passed in collection uses GridFS namespace";
108 store = false;
109 } else {
110 collection += msg->collection();
111 }
112
113 if (store)
114 store_pointcloud(msg->pcl_id(), database, collection, errmsg);
115
116 store_if_->set_error(errmsg.c_str());
117 store_if_->set_final(true);
118 store_if_->write();
119 } else {
120 logger->log_warn(name(), "Unhandled message received");
121 }
122 store_if_->msgq_pop();
123}
124
125bool
126PointCloudDBStoreThread::store_pointcloud(std::string pcl_id,
127 std::string database,
128 std::string collection,
129 std::string &errmsg)
130{
131 if (!pcl_manager->exists_pointcloud(pcl_id.c_str())) {
132 errmsg = "PointCloud does not exist";
133 return false;
134 }
135
136 std::string frame_id;
137 unsigned int width, height;
138 bool is_dense;
139 void * point_data;
140 size_t point_size, num_points;
141 fawkes::Time time;
143
144 adapter_->get_data_and_info(pcl_id,
145 frame_id,
146 is_dense,
147 width,
148 height,
149 time,
150 fieldinfo,
151 &point_data,
152 point_size,
153 num_points);
154
155 size_t data_size = point_size * num_points;
156
157 auto gridfs = mongodb_client->database(database).gridfs_bucket();
158
159 std::stringstream name;
160 name << "pcl_" << time.in_msec();
161 auto uploader = gridfs.open_upload_stream(name.str());
162 uploader.write(static_cast<uint8_t *>(point_data), data_size);
163 auto result = uploader.close();
164 using namespace bsoncxx::builder;
165 basic::document document;
166 document.append(basic::kvp("timestamp", static_cast<int64_t>(time.in_msec())));
167 document.append(basic::kvp("pointcloud", [&](basic::sub_document subdoc) {
168 subdoc.append(basic::kvp("frame_id", frame_id));
169 subdoc.append(basic::kvp("is_dense", is_dense));
170 subdoc.append(basic::kvp("width", static_cast<int64_t>(width)));
171 subdoc.append(basic::kvp("height", static_cast<int64_t>(height)));
172 subdoc.append(basic::kvp("point_size", static_cast<int64_t>(point_size)));
173 subdoc.append(basic::kvp("num_points", static_cast<int64_t>(num_points)));
174 subdoc.append(basic::kvp("data", [&](basic::sub_document datadoc) {
175 datadoc.append(basic::kvp("id", result.id()));
176 datadoc.append(basic::kvp("filename", name.str()));
177 }));
178 subdoc.append(basic::kvp("field_info", [fieldinfo](basic::sub_array fi_array) {
179 for (auto fi : fieldinfo) {
180 basic::document fi_doc;
181 fi_doc.append(basic::kvp("name", fi.name));
182 fi_doc.append(basic::kvp("offset", static_cast<int64_t>(fi.offset)));
183 fi_doc.append(basic::kvp("datatype", fi.datatype));
184 fi_doc.append(basic::kvp("count", static_cast<int64_t>(fi.count)));
185 }
186 }));
187 }));
188
189 try {
190 mongodb_client->database(database)[collection].insert_one(document.view());
191 } catch (mongocxx::operation_exception &e) {
192 logger->log_warn(this->name(), "Failed to insert into %s: %s", collection.c_str(), e.what());
193 errmsg = e.what();
194 return false;
195 }
196
197 return true;
198}
Point cloud adapter class.
Definition: pcl_adapter.h:39
void get_data_and_info(const std::string &id, std::string &frame_id, bool &is_dense, unsigned int &width, unsigned int &height, fawkes::Time &time, V_PointFieldInfo &pfi, void **data_ptr, size_t &point_size, size_t &num_points)
Get data and info of point cloud.
std::vector< PointFieldInfo > V_PointFieldInfo
Vector of PointFieldInfo.
Definition: pcl_adapter.h:66
virtual void loop()
Code to execute in the thread.
virtual ~PointCloudDBStoreThread()
Destructor.
virtual void init()
Initialize the thread.
virtual void finalize()
Finalize the thread.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
Wake threads on receiving a blackboard message.
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void close(Interface *interface)=0
Close interface.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Thread aspect to access MongoDB.
Definition: mongodb.h:39
mongocxx::client * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:54
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:47
bool exists_pointcloud(const char *id)
Check if point cloud exists.
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
A class for handling time.
Definition: time.h:93
long in_msec() const
Convert the stored time into milli-seconds.
Definition: time.cpp:228
Fawkes library namespace.