Fawkes API Fawkes Development Version
tf_thread.cpp
1
2/***************************************************************************
3 * tf_thread.cpp - Thread to exchange transforms
4 *
5 * Created: Wed Oct 26 01:02:59 2011
6 * Copyright 2011 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU Library General Public License for more details.
18 *
19 * Read the full text in the LICENSE.GPL file in the doc directory.
20 */
21
22#include "tf_thread.h"
23
24#include <core/threading/mutex_locker.h>
25#include <ros/this_node.h>
26
27using namespace fawkes;
28
29/** @class RosTfThread "tf_thread.h"
30 * Thread to exchange transforms between Fawkes and ROS.
31 * This threads connects to Fawkes and ROS to read and write transforms.
32 * Transforms received on one end are republished to the other side. To
33 * Fawkes new frames are published during the sensor hook.
34 * @author Tim Niemueller
35 */
36
37/** Constructor. */
39: Thread("RosTfThread", Thread::OPMODE_WAITFORWAKEUP),
40 BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_ACQUIRE),
41 TransformAspect(TransformAspect::DEFER_PUBLISHER),
42 BlackBoardInterfaceListener("RosTfThread")
43{
44 tf_msg_queue_mutex_ = new Mutex();
45 seq_num_mutex_ = new Mutex();
46 last_update_ = new Time();
47}
48
49/** Destructor. */
51{
52 delete tf_msg_queue_mutex_;
53 delete seq_num_mutex_;
54 delete last_update_;
55}
56
57void
59{
60 active_queue_ = 0;
61 seq_num_ = 0;
62 last_update_->set_clock(clock);
63 last_update_->set_time(0, 0);
64
65 cfg_use_tf2_ = config->get_bool("/ros/tf/use_tf2");
66 cfg_update_interval_ = 1.0;
67 try {
68 cfg_update_interval_ = config->get_float("/ros/tf/static-update-interval");
69 } catch (Exception &e) {
70 } // ignored, use default
71
72 // Must do that before registering listener because we might already
73 // get events right away
74 if (cfg_use_tf2_) {
75#ifndef HAVE_TF2_MSGS
76 throw Exception("tf2 enabled in config but not available at compile time");
77#else
78 sub_tf_ = rosnode->subscribe("tf", 100, &RosTfThread::tf_message_cb_dynamic, this);
79 sub_static_tf_ = rosnode->subscribe("tf_static", 100, &RosTfThread::tf_message_cb_static, this);
80 pub_tf_ = rosnode->advertise<tf2_msgs::TFMessage>("tf", 100);
81 pub_static_tf_ = rosnode->advertise<tf2_msgs::TFMessage>("tf_static", 100, /* latch */ true);
82#endif
83 } else {
84 sub_tf_ = rosnode->subscribe("tf", 100, &RosTfThread::tf_message_cb, this);
85 pub_tf_ = rosnode->advertise<::tf::tfMessage>("tf", 100);
86 }
87
89 std::list<TransformInterface *>::iterator i;
90 for (i = tfifs_.begin(); i != tfifs_.end(); ++i) {
94 }
96
97 publish_static_transforms_to_ros();
98
99 bbio_add_observed_create("TransformInterface", "/tf*");
101}
102
103void
105{
108
109 sub_tf_.shutdown();
110 pub_tf_.shutdown();
111
112 std::list<TransformInterface *>::iterator i;
113 for (i = tfifs_.begin(); i != tfifs_.end(); ++i) {
114 blackboard->close(*i);
115 }
116 tfifs_.clear();
117}
118
119void
121{
122 tf_msg_queue_mutex_->lock();
123 unsigned int queue = active_queue_;
124 active_queue_ = 1 - active_queue_;
125 tf_msg_queue_mutex_->unlock();
126
127 if (cfg_use_tf2_) {
128#ifdef HAVE_TF2_MSGS
129 while (!tf2_msg_queues_[queue].empty()) {
130 const std::pair<bool, tf2_msgs::TFMessage::ConstPtr> &q = tf2_msg_queues_[queue].front();
131 const tf2_msgs::TFMessage::ConstPtr & msg = q.second;
132 const size_t tsize = msg->transforms.size();
133 for (size_t i = 0; i < tsize; ++i) {
134 publish_transform_to_fawkes(msg->transforms[i], q.first);
135 }
136 tf2_msg_queues_[queue].pop();
137 }
138#endif
139 } else {
140 while (!tf_msg_queues_[queue].empty()) {
141 const ::tf::tfMessage::ConstPtr &msg = tf_msg_queues_[queue].front();
142 const size_t tsize = msg->transforms.size();
143 for (size_t i = 0; i < tsize; ++i) {
144 geometry_msgs::TransformStamped ts = msg->transforms[i];
145 if (!ts.header.frame_id.empty() && ts.header.frame_id[0] == '/') {
146 ts.header.frame_id = ts.header.frame_id.substr(1);
147 }
148 if (!ts.child_frame_id.empty() && ts.child_frame_id[0] == '/') {
149 ts.child_frame_id = ts.child_frame_id.substr(1);
150 }
151 publish_transform_to_fawkes(ts);
152 }
153 tf_msg_queues_[queue].pop();
154 }
155
156 fawkes::Time now(clock);
157 if ((now - last_update_) > cfg_update_interval_) {
158 last_update_->stamp();
159
160 publish_static_transforms_to_ros();
161 }
162 }
163}
164
165void
167{
168 TransformInterface *tfif = dynamic_cast<TransformInterface *>(interface);
169 if (!tfif)
170 return;
171
172 tfif->read();
173
174 if (cfg_use_tf2_ && tfif->is_static_transform()) {
175 publish_static_transforms_to_ros();
176 } else {
177 if (cfg_use_tf2_) {
178#ifdef HAVE_TF2_MSGS
179 tf2_msgs::TFMessage tmsg;
180 tmsg.transforms.push_back(create_transform_stamped(tfif));
181 pub_tf_.publish(tmsg);
182#endif
183 } else {
184 ::tf::tfMessage tmsg;
185 if (tfif->is_static_transform()) {
186 // date time stamps slightly into the future so they are valid
187 // for longer and need less frequent updates.
188 fawkes::Time timestamp = fawkes::Time(clock) + (cfg_update_interval_ * 1.1);
189
190 tmsg.transforms.push_back(create_transform_stamped(tfif, &timestamp));
191 } else {
192 tmsg.transforms.push_back(create_transform_stamped(tfif));
193 }
194 pub_tf_.publish(tmsg);
195 }
196 }
197}
198
199void
200RosTfThread::bb_interface_created(const char *type, const char *id) noexcept
201{
202 if (strncmp(type, "TransformInterface", INTERFACE_TYPE_SIZE_) != 0)
203 return;
204
205 for (const auto &f : ros_frames_) {
206 // ignore interfaces that we publish ourself
207 if (f == id)
208 return;
209 }
210
211 TransformInterface *tfif;
212 try {
213 //logger->log_info(name(), "Opening %s:%s", type, id);
214 tfif = blackboard->open_for_reading<TransformInterface>(id);
215 } catch (Exception &e) {
216 // ignored
217 logger->log_warn(name(), "Failed to open %s:%s: %s", type, id, e.what());
218 return;
219 }
220
221 try {
222 bbil_add_data_interface(tfif);
223 bbil_add_reader_interface(tfif);
224 bbil_add_writer_interface(tfif);
225 blackboard->update_listener(this);
226 tfifs_.push_back(tfif);
227 } catch (Exception &e) {
228 blackboard->close(tfif);
229 logger->log_warn(name(), "Failed to register for %s:%s: %s", type, id, e.what());
230 return;
231 }
232}
233
234void
236 fawkes::Uuid instance_serial) noexcept
237{
238 conditional_close(interface);
239}
240
241void
243 fawkes::Uuid instance_serial) noexcept
244{
245 conditional_close(interface);
246}
247
248void
249RosTfThread::conditional_close(Interface *interface) noexcept
250{
251 // Verify it's a TransformInterface
252 TransformInterface *tfif = dynamic_cast<TransformInterface *>(interface);
253 if (!tfif)
254 return;
255
256 std::list<TransformInterface *>::iterator i;
257 for (i = tfifs_.begin(); i != tfifs_.end(); ++i) {
258 if (*interface == **i) {
259 if (!interface->has_writer() && (interface->num_readers() == 1)) {
260 // It's only us
261 logger->log_info(name(), "Last on %s, closing", interface->uid());
262 bbil_remove_data_interface(*i);
263 bbil_remove_reader_interface(*i);
264 bbil_remove_writer_interface(*i);
265 blackboard->update_listener(this);
266 blackboard->close(*i);
267 tfifs_.erase(i);
268 break;
269 }
270 }
271 }
272}
273
274geometry_msgs::TransformStamped
275RosTfThread::create_transform_stamped(TransformInterface *tfif, const Time *time)
276{
277 double *translation = tfif->translation();
278 double *rotation = tfif->rotation();
279 if (!time)
280 time = tfif->timestamp();
281
282 geometry_msgs::Vector3 t;
283 t.x = translation[0];
284 t.y = translation[1];
285 t.z = translation[2];
286 geometry_msgs::Quaternion r;
287 r.x = rotation[0];
288 r.y = rotation[1];
289 r.z = rotation[2];
290 r.w = rotation[3];
291 geometry_msgs::Transform tr;
292 tr.translation = t;
293 tr.rotation = r;
294
295 geometry_msgs::TransformStamped ts;
296 seq_num_mutex_->lock();
297 ts.header.seq = ++seq_num_;
298 seq_num_mutex_->unlock();
299 ts.header.stamp = ros::Time(time->get_sec(), time->get_nsec());
300 ts.header.frame_id = tfif->frame();
301 ts.child_frame_id = tfif->child_frame();
302 ts.transform = tr;
303
304 return ts;
305}
306
307void
308RosTfThread::publish_static_transforms_to_ros()
309{
310 std::list<fawkes::TransformInterface *>::iterator t;
311 fawkes::Time now(clock);
312
313 if (cfg_use_tf2_) {
314#ifdef HAVE_TF2_MSGS
315 tf2_msgs::TFMessage tmsg;
316 for (t = tfifs_.begin(); t != tfifs_.end(); ++t) {
318 tfif->read();
319 if (tfif->is_static_transform()) {
320 tmsg.transforms.push_back(create_transform_stamped(tfif, &now));
321 }
322 }
323 pub_static_tf_.publish(tmsg);
324#endif
325 } else {
326 // date time stamps slightly into the future so they are valid
327 // for longer and need less frequent updates.
328 fawkes::Time timestamp = now + (cfg_update_interval_ * 1.1);
329
330 ::tf::tfMessage tmsg;
331 for (t = tfifs_.begin(); t != tfifs_.end(); ++t) {
333 tfif->read();
334 if (tfif->is_static_transform()) {
335 tmsg.transforms.push_back(create_transform_stamped(tfif, &timestamp));
336 }
337 }
338 pub_tf_.publish(tmsg);
339 }
340}
341
342void
343RosTfThread::publish_transform_to_fawkes(const geometry_msgs::TransformStamped &ts, bool static_tf)
344{
345 const geometry_msgs::Vector3 & t = ts.transform.translation;
346 const geometry_msgs::Quaternion &r = ts.transform.rotation;
347
348 fawkes::Time time(ts.header.stamp.sec, ts.header.stamp.nsec / 1000);
349
350 fawkes::tf::Transform tr(fawkes::tf::Quaternion(r.x, r.y, r.z, r.w),
351 fawkes::tf::Vector3(t.x, t.y, t.z));
352 fawkes::tf::StampedTransform st(tr, time, ts.header.frame_id, ts.child_frame_id);
353
354 if (tf_publishers.find(ts.child_frame_id) == tf_publishers.end()) {
355 try {
356 ros_frames_.push_back(std::string("/tf/") + ts.child_frame_id);
357 tf_add_publisher("%s", ts.child_frame_id.c_str());
358 tf_publishers[ts.child_frame_id]->send_transform(st, static_tf);
359 } catch (Exception &e) {
360 ros_frames_.pop_back();
362 "Failed to create Fawkes transform publisher for frame %s from ROS",
363 ts.child_frame_id.c_str());
364 logger->log_warn(name(), e);
365 }
366 } else {
367 tf_publishers[ts.child_frame_id]->send_transform(st, static_tf);
368 }
369}
370
371/** Callback function for ROS tf message subscription.
372 * @param msg incoming message
373 */
374void
375RosTfThread::tf_message_cb(const ros::MessageEvent<::tf::tfMessage const> &msg_evt)
376{
377 MutexLocker lock(tf_msg_queue_mutex_);
378
379 const ::tf::tfMessage::ConstPtr &msg = msg_evt.getConstMessage();
380 std::string authority = msg_evt.getPublisherName();
381
382 if (authority == "") {
383 logger->log_warn(name(), "Message received without callerid");
384 } else if (authority != ros::this_node::getName()) {
385 tf_msg_queues_[active_queue_].push(msg);
386 }
387}
388
389#ifdef HAVE_TF2_MSGS
390void
391RosTfThread::tf_message_cb_static(const ros::MessageEvent<tf2_msgs::TFMessage const> &msg_evt)
392{
393 tf_message_cb(msg_evt, true);
394}
395
396void
397RosTfThread::tf_message_cb_dynamic(const ros::MessageEvent<tf2_msgs::TFMessage const> &msg_evt)
398{
399 tf_message_cb(msg_evt, false);
400}
401
402void
403RosTfThread::tf_message_cb(const ros::MessageEvent<tf2_msgs::TFMessage const> &msg_evt,
404 bool static_tf)
405{
406 MutexLocker lock(tf_msg_queue_mutex_);
407
408 const tf2_msgs::TFMessage::ConstPtr &msg = msg_evt.getConstMessage();
409 std::string authority = msg_evt.getPublisherName();
410
411 if (authority == "") {
412 logger->log_warn(name(), "Message received without callerid");
413 } else if (authority != ros::this_node::getName()) {
414 tf2_msg_queues_[active_queue_].push(std::make_pair(static_tf, msg));
415 }
416}
417#endif
virtual void bb_interface_reader_removed(fawkes::Interface *interface, fawkes::Uuid instance_serial) noexcept
A reading instance has been closed for a watched interface.
Definition: tf_thread.cpp:242
RosTfThread()
Constructor.
Definition: tf_thread.cpp:38
virtual void loop()
Code to execute in the thread.
Definition: tf_thread.cpp:120
virtual void init()
Initialize the thread.
Definition: tf_thread.cpp:58
virtual void bb_interface_writer_removed(fawkes::Interface *interface, fawkes::Uuid instance_serial) noexcept
A writing instance has been closed for a watched interface.
Definition: tf_thread.cpp:235
virtual void finalize()
Finalize the thread.
Definition: tf_thread.cpp:104
virtual void bb_interface_created(const char *type, const char *id) noexcept
BlackBoard interface created notification.
Definition: tf_thread.cpp:200
virtual ~RosTfThread()
Destructor.
Definition: tf_thread.cpp:50
virtual void bb_interface_data_refreshed(fawkes::Interface *interface) noexcept
BlackBoard data refreshed notification.
Definition: tf_thread.cpp:166
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
BlackBoard interface listener.
void bbil_add_reader_interface(Interface *interface)
Add an interface to the reader addition/removal watch list.
void bbil_add_writer_interface(Interface *interface)
Add an interface to the writer addition/removal watch list.
void bbil_add_data_interface(Interface *interface)
Add an interface to the data modification watch list.
void bbio_add_observed_create(const char *type_pattern, const char *id_pattern="*") noexcept
Add interface creation type to watch list.
virtual void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
Definition: blackboard.cpp:240
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
virtual void update_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Update BB event listener.
Definition: blackboard.cpp:197
virtual void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: blackboard.cpp:225
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: blackboard.cpp:212
virtual std::list< Interface * > open_multiple_for_reading(const char *type_pattern, const char *id_pattern="*", const char *owner=NULL)=0
Open multiple interfaces for reading.
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:185
virtual void close(Interface *interface)=0
Close interface.
Thread aspect to use blocked timing.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual const char * what() const noexcept
Get primary string.
Definition: exception.cpp:639
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
const Time * timestamp() const
Get timestamp of last write.
Definition: interface.cpp:714
void read()
Read from BlackBoard into local copy.
Definition: interface.cpp:479
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
virtual void log_info(const char *component, const char *format,...)
Log informational message.
Definition: multi.cpp:195
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: multi.cpp:216
Mutex locking helper.
Definition: mutex_locker.h:34
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
LockPtr< ros::NodeHandle > rosnode
Central ROS node handle.
Definition: ros.h:47
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
A class for handling time.
Definition: time.h:93
void set_clock(Clock *clock)
Set clock for this instance.
Definition: time.cpp:308
Time & stamp()
Set this time to the current time.
Definition: time.cpp:704
long get_nsec() const
Get nanoseconds.
Definition: time.h:132
void set_time(const timeval *tv)
Sets the time.
Definition: time.cpp:246
long get_sec() const
Get seconds.
Definition: time.h:117
Thread aspect to access the transform system.
Definition: tf.h:39
std::map< std::string, tf::TransformPublisher * > tf_publishers
Map of transform publishers created through the aspect.
Definition: tf.h:70
void tf_add_publisher(const char *frame_id_format,...)
Late add of publisher.
Definition: tf.cpp:186
TransformInterface Fawkes BlackBoard Interface.
char * frame() const
Get frame value.
char * child_frame() const
Get child_frame value.
bool is_static_transform() const
Get static_transform value.
double * rotation() const
Get rotation value.
double * translation() const
Get translation value.
A convenience class for universally unique identifiers (UUIDs).
Definition: uuid.h:29
Transform that contains a timestamp and frame IDs.
Definition: types.h:92
Fawkes library namespace.