Fawkes API Fawkes Development Version
pcl_db_retrieve_thread.cpp
1
2/***************************************************************************
3 * pcl_db_retrieve_thread.cpp - Restore and retrieve point clouds from MongoDB
4 *
5 * Created: Thu Aug 22 12:04:09 2013
6 * Copyright 2012 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Library General Public License for more details.
18 *
19 * Read the full text in the LICENSE.GPL file in the doc directory.
20 */
21
22#include "pcl_db_retrieve_thread.h"
23
24#include <blackboard/utils/on_message_waker.h>
25#include <interfaces/PclDatabaseRetrieveInterface.h>
26
27using namespace fawkes;
28
29/** @class PointCloudDBRetrieveThread "pcl_db_retrieve_thread.h"
30 * Thread to retrieve point clouds from database on request.
31 * @author Tim Niemueller
32 */
33
34/** Constructor. */
36: Thread("PointCloudDBRetrieveThread", Thread::OPMODE_WAITFORWAKEUP), MongoDBAspect("default")
37{
38}
39
40/** Destructor. */
42{
43}
44
45void
47{
48 pl_xyz_ = NULL;
49 pl_xyzrgb_ = NULL;
50 retrieve_if_ = NULL;
51 msg_waker_ = NULL;
52
53 cfg_database_ = config->get_string(CFG_PREFIX "database-name");
54 cfg_output_id_ = config->get_string(CFG_PREFIX_RETRV "output-pcl-id");
55 cfg_original_id_ = config->get_string(CFG_PREFIX_RETRV "original-pcl-id");
56
57 foutput_ = new pcl::PointCloud<pcl::PointXYZRGB>();
58 foutput_->is_dense = false;
59 pcl_manager->add_pointcloud<pcl::PointXYZRGB>(cfg_output_id_.c_str(), foutput_);
60 output_ = pcl_utils::cloudptr_from_refptr(foutput_);
61
62 foriginal_ = new pcl::PointCloud<pcl::PointXYZRGB>();
63 foriginal_->is_dense = false;
64 pcl_manager->add_pointcloud<pcl::PointXYZRGB>(cfg_original_id_.c_str(), foriginal_);
65 original_ = pcl_utils::cloudptr_from_refptr(foriginal_);
66
68 mongodb_client, config, logger, tf_listener, original_, output_);
69
71 mongodb_client, config, logger, tf_listener, original_, output_);
72
73 try {
74 retrieve_if_ =
75 blackboard->open_for_writing<PclDatabaseRetrieveInterface>("PCL Database Retrieve");
76
77 msg_waker_ = new BlackBoardOnMessageWaker(blackboard, retrieve_if_, this);
78 } catch (Exception &e) {
79 finalize();
80 throw;
81 }
82}
83
84void
86{
87 delete msg_waker_;
88 blackboard->close(retrieve_if_);
89
90 delete pl_xyz_;
91 delete pl_xyzrgb_;
92
93 output_.reset();
94 pcl_manager->remove_pointcloud(cfg_output_id_.c_str());
95 foutput_.reset();
96
97 original_.reset();
98 pcl_manager->remove_pointcloud(cfg_original_id_.c_str());
99 foriginal_.reset();
100}
101
102void
104{
105 long timestamp = 0;
106 std::vector<long long> times(1);
107 std::string database;
108 std::string collection;
109 std::string target_frame;
110 bool original_timestamp;
111
112 if (retrieve_if_->msgq_empty())
113 return;
114
115 if (PclDatabaseRetrieveInterface::RetrieveMessage *msg = retrieve_if_->msgq_first_safe(msg)) {
116 retrieve_if_->set_final(false);
117 retrieve_if_->set_msgid(msg->id());
118 retrieve_if_->set_error("");
119 retrieve_if_->write();
120
121 timestamp = msg->timestamp();
122 times[0] = timestamp;
123 database = (strcmp(msg->database(), "") != 0) ? msg->database() : cfg_database_;
124 collection = msg->collection();
125 target_frame = msg->target_frame();
126 original_timestamp = msg->is_original_timestamp();
127 } else {
128 logger->log_warn(name(), "Unhandled message received");
129 retrieve_if_->msgq_pop();
130 return;
131 }
132 retrieve_if_->msgq_pop();
133
134 logger->log_info(name(), "Restoring from '%s' for the time %li", collection.c_str(), timestamp);
135
136 ApplicabilityStatus st_xyz, st_xyzrgb;
137 long actual_time = 0;
138
139 pl_xyz_->applicable(times, database, collection);
140 if ((st_xyz = pl_xyz_->applicable(times, database, collection)) == APPLICABLE) {
141 logger->log_info(name(), "Restoring XYZ");
142 pl_xyz_->retrieve(timestamp, database, collection, target_frame, actual_time);
143 } else if ((st_xyzrgb = pl_xyzrgb_->applicable(times, database, collection)) == APPLICABLE) {
144 logger->log_info(name(), "Restoring XYZRGB");
145 pl_xyzrgb_->retrieve(timestamp, database, collection, target_frame, actual_time);
146 if (!original_timestamp) {
147 Time now(clock);
148 pcl_utils::set_time(foutput_, now);
149 }
150 } else {
151 logger->log_warn(name(), "No applicable merging pipeline known:");
152 logger->log_warn(name(), " XYZ: %s", to_string(st_xyz));
153 logger->log_warn(name(), " XYZ/RGB: %s", to_string(st_xyzrgb));
154 retrieve_if_->set_error("No applicable merging pipeline known (for details see log)");
155 }
156
157 if (actual_time != 0) {
158 if (original_timestamp) {
159 Time now((long)actual_time);
160 Time last;
161 pcl_utils::get_time(foutput_, last);
162 // force sending with one microsecond offset if same than last time
163 if (last == now)
164 now += 1L;
165 pcl_utils::set_time(foutput_, now);
166 } else {
167 Time now(clock);
168 pcl_utils::set_time(foutput_, now);
169 }
170 }
171
172 retrieve_if_->set_final(true);
173 retrieve_if_->write();
174}
ApplicabilityStatus applicable(std::vector< long long > &times, std::string &database, std::string &collection)
Check if this pipeline instance is suitable for the given times.
void retrieve(long timestamp, std::string &database, std::string &collection, std::string &target_frame, long &actual_time)
Retrieve point clouds.
virtual ~PointCloudDBRetrieveThread()
Destructor.
virtual void loop()
Code to execute in the thread.
virtual void finalize()
Finalize the thread.
virtual void init()
Initialize the thread.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
Wake threads on receiving a blackboard message.
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void close(Interface *interface)=0
Close interface.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Thread aspect to access MongoDB.
Definition: mongodb.h:39
mongocxx::client * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:54
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:47
void remove_pointcloud(const char *id)
Remove the point cloud.
void add_pointcloud(const char *id, RefPtr< pcl::PointCloud< PointT > > cloud)
Add point cloud.
void reset()
Reset pointer.
Definition: refptr.h:455
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
A class for handling time.
Definition: time.h:93
tf::Transformer * tf_listener
This is the transform listener which saves transforms published by other threads in the system.
Definition: tf.h:67
Fawkes library namespace.