Fawkes API Fawkes Development Version
pcl_db_merge_thread.cpp
1
2/***************************************************************************
3 * pcl_db_merge_thread.cpp - Restore and merge point clouds from MongoDB
4 *
5 * Created: Wed Nov 28 10:56:10 2012 (Freiburg)
6 * Copyright 2012 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Library General Public License for more details.
18 *
19 * Read the full text in the LICENSE.GPL file in the doc directory.
20 */
21
22#include "pcl_db_merge_thread.h"
23
24#include <blackboard/utils/on_message_waker.h>
25#include <interfaces/PclDatabaseMergeInterface.h>
26
27using namespace fawkes;
28
29/** @class PointCloudDBMergeThread "pcl_db_merge_thread.h"
30 * Thread to merge point clouds from database on request.
31 * @author Tim Niemueller
32 */
33
34/** Constructor. */
36: Thread("PointCloudDBMergeThread", Thread::OPMODE_WAITFORWAKEUP), MongoDBAspect("default")
37{
38}
39
40/** Destructor. */
42{
43}
44
45void
47{
48 pl_xyz_ = NULL;
49 pl_xyzrgb_ = NULL;
50 merge_if_ = NULL;
51 msg_waker_ = NULL;
52
53 cfg_database_ = config->get_string(CFG_PREFIX "database-name");
54 cfg_output_id_ = config->get_string(CFG_PREFIX_MERGE "output-pcl-id");
55
56 foutput_ = new pcl::PointCloud<pcl::PointXYZRGB>();
57 //foutput_->header.frame_id = finput_->header.frame_id;
58 foutput_->is_dense = false;
59 pcl_manager->add_pointcloud<pcl::PointXYZRGB>(cfg_output_id_.c_str(), foutput_);
60 output_ = pcl_utils::cloudptr_from_refptr(foutput_);
61
64
67
68 try {
69 merge_if_ = blackboard->open_for_writing<PclDatabaseMergeInterface>("PCL Database Merge");
70
71 msg_waker_ = new BlackBoardOnMessageWaker(blackboard, merge_if_, this);
72 } catch (Exception &e) {
73 finalize();
74 throw;
75 }
76}
77
78void
80{
81 delete msg_waker_;
82 blackboard->close(merge_if_);
83
84 delete pl_xyz_;
85 delete pl_xyzrgb_;
86
87 output_.reset();
88 pcl_manager->remove_pointcloud(cfg_output_id_.c_str());
89 foutput_.reset();
90}
91
92void
94{
95 std::vector<long> times;
96 std::string database;
97 std::string collection;
98 //= "PointClouds.openni_pointcloud_xyz";
99
100 if (merge_if_->msgq_empty())
101 return;
102
103 if (PclDatabaseMergeInterface::MergeMessage *msg = merge_if_->msgq_first_safe(msg)) {
104 merge_if_->set_final(false);
105 merge_if_->set_msgid(msg->id());
106 merge_if_->set_error("");
107 merge_if_->write();
108
109 int64_t *timestamps = msg->timestamps();
110 for (size_t i = 0; i < msg->maxlenof_timestamps(); ++i) {
111 if (timestamps[i] > 0) {
112 times.push_back(timestamps[i]);
113 }
114 }
115 database = (strcmp(msg->database(), "") != 0) ? msg->database() : cfg_database_;
116 collection = msg->collection();
117 }
118
119 merge_if_->msgq_pop();
120
121 /* For testing
122 collection = "PointClouds.openni_pointcloud_xyz";
123 times.clear();
124 times.push_back(1354200347715);
125 times.push_back(1354200406578);
126 times.push_back(1354200473345);
127 */
128
129 if (times.empty()) {
130 logger->log_warn(name(), "Called for merge from %s, but no times given", collection.c_str());
131 merge_if_->set_final(true);
132 merge_if_->set_error("Called for merge, but no non-zero times given");
133 merge_if_->write();
134 return;
135 }
136
137 logger->log_info(name(), "Restoring from '%s' for the following times", collection.c_str());
138 for (size_t i = 0; i < times.size(); ++i) {
139 logger->log_info(name(), " %li", times[i]);
140 }
141
142 ApplicabilityStatus st_xyz, st_xyzrgb;
143
144 std::vector<long long> ll_times(times.begin(), times.end());
145 pl_xyz_->applicable(ll_times, database, collection);
146 if ((st_xyz = pl_xyz_->applicable(ll_times, database, collection)) == APPLICABLE) {
147 pl_xyz_->merge(times, database, collection);
148 Time now(clock);
149 pcl_utils::set_time(foutput_, now);
150 } else if ((st_xyzrgb = pl_xyzrgb_->applicable(ll_times, database, collection)) == APPLICABLE) {
151 pl_xyzrgb_->merge(times, database, collection);
152 Time now(clock);
153 pcl_utils::set_time(foutput_, now);
154 } else {
155 logger->log_warn(name(), "No applicable merging pipeline known:");
156 logger->log_warn(name(), " XYZ: %s", to_string(st_xyz));
157 logger->log_warn(name(), " XYZ/RGB: %s", to_string(st_xyzrgb));
158 merge_if_->set_error("Merge failed, see pcl-db-merge log");
159 }
160
161 merge_if_->set_final(true);
162 merge_if_->write();
163}
void merge(std::vector< long > &times, std::string &database, std::string &collection)
Merge point clouds.
virtual ~PointCloudDBMergeThread()
Destructor.
virtual void loop()
Code to execute in the thread.
virtual void finalize()
Finalize the thread.
virtual void init()
Initialize the thread.
ApplicabilityStatus applicable(std::vector< long long > &times, std::string &database, std::string &collection)
Check if this pipeline instance is suitable for the given times.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
Wake threads on receiving a blackboard message.
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void close(Interface *interface)=0
Close interface.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Thread aspect to access MongoDB.
Definition: mongodb.h:39
mongocxx::client * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:54
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:47
void remove_pointcloud(const char *id)
Remove the point cloud.
void add_pointcloud(const char *id, RefPtr< pcl::PointCloud< PointT > > cloud)
Add point cloud.
void reset()
Reset pointer.
Definition: refptr.h:455
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
A class for handling time.
Definition: time.h:93
tf::Transformer * tf_listener
This is the transform listener which saves transforms published by other threads in the system.
Definition: tf.h:67
Fawkes library namespace.