Fawkes API Fawkes Development Version
mongodb_log_image_thread.cpp
1
2/***************************************************************************
3 * mongodb_log_image_thread.cpp - Thread to log images to MongoDB
4 *
5 * Created: Tue Apr 10 22:12:38 2012
6 * Copyright 2011-2017 Tim Niemueller [www.niemueller.de]
7 * 2012 Bastian Klingen
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library General Public License for more details.
19 *
20 * Read the full text in the LICENSE.GPL file in the doc directory.
21 */
22
23#include "mongodb_log_image_thread.h"
24
25#include <core/threading/mutex_locker.h>
26#include <fvutils/color/colorspaces.h>
27#include <fvutils/ipc/shm_image.h>
28#include <utils/time/wait.h>
29
30// from MongoDB
31#include <bsoncxx/builder/basic/document.hpp>
32#include <fnmatch.h>
33#include <mongocxx/client.hpp>
34#include <mongocxx/exception/operation_exception.hpp>
35#include <mongocxx/gridfs/uploader.hpp>
36
37using namespace fawkes;
38using namespace firevision;
39using namespace mongocxx;
40
41/** @class MongoLogImagesThread "mongodb_log_image_thread.h"
42 * Thread to export Fawkes images to MongoDB.
43 * @author Tim Niemueller
44 * @author Bastian Klingen
45 */
46
47/** Constructor. */
49: Thread("MongoLogImagesThread", Thread::OPMODE_CONTINUOUS), MongoDBAspect("default")
50{
52}
53
54/** Destructor. */
56{
57}
58
59void
61{
62 database_ = "fflog";
63 try {
64 database_ = config->get_string("/plugins/mongodb-log/database");
65 } catch (Exception &e) {
66 logger->log_info(name(), "No database configured, writing to %s", database_.c_str());
67 }
68
69 cfg_storage_interval_ = config->get_float("/plugins/mongodb-log/images/storage-interval");
70
71 cfg_chunk_size_ = 2097152; // 2 MB
72 try {
73 cfg_chunk_size_ = config->get_uint("/plugins/mongodb-log/images/chunk-size");
74 } catch (Exception &e) {
75 } // ignored, use default
76 logger->log_info(name(), "Chunk size: %u", cfg_chunk_size_);
77
78 try {
79 includes_ = config->get_strings("/plugins/mongodb-log/images/includes");
80 } catch (Exception &e) {
81 } // ignored, no include rules
82 try {
83 excludes_ = config->get_strings("/plugins/mongodb-log/images/excludes");
84 } catch (Exception &e) {
85 } // ignored, no include rules
86
87 mongodb_ = mongodb_client;
88 gridfs_ = mongodb_->database(database_).gridfs_bucket();
89
90 last_update_ = new Time(clock);
91 now_ = new Time(clock);
92 wait_ = new TimeWait(clock, cfg_storage_interval_ * 1000000.);
93 mutex_ = new Mutex();
94 update_images();
95}
96
97bool
99{
100 mutex_->lock();
101 return true;
102}
103
104void
106{
107 logger->log_debug(name(), "Finalizing MongoLogImagesThread");
108 std::map<std::string, ImageInfo>::iterator p;
109 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
110 delete p->second.img;
111 }
112 imgs_.clear();
113 delete wait_;
114 delete mutex_;
115 delete now_;
116 delete last_update_;
117}
118
119void
121{
122 MutexLocker lock(mutex_);
123 fawkes::Time loop_start(clock);
124 wait_->mark_start();
125 unsigned int num_stored = 0;
126
127 now_->stamp();
128 if (*now_ - last_update_ >= 5.0) {
129 *last_update_ = now_;
130 update_images();
131 }
132
133 std::map<std::string, ImageInfo>::iterator p;
134 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
135 ImageInfo &imginfo = p->second;
136
137 fawkes::Time cap_time = imginfo.img->capture_time();
138
139 if ((imginfo.last_sent != cap_time)) {
140 using namespace bsoncxx::builder;
141 basic::document document;
142 imginfo.last_sent = cap_time;
143 document.append(basic::kvp("timestamp", static_cast<int64_t>(cap_time.in_msec())));
144
145 document.append(basic::kvp("image", [&](basic::sub_document subdoc) {
146 subdoc.append(basic::kvp("image_id", imginfo.img->image_id()));
147 subdoc.append(basic::kvp("width", static_cast<int32_t>(imginfo.img->width())));
148 subdoc.append(basic::kvp("height", static_cast<int32_t>(imginfo.img->height())));
149 subdoc.append(basic::kvp("colorspace", colorspace_to_string(imginfo.img->colorspace())));
150
151 std::stringstream name;
152 name << imginfo.topic_name << "_" << cap_time.in_msec();
153 auto uploader = gridfs_.open_upload_stream(name.str());
154 uploader.write((uint8_t *)imginfo.img->buffer(), imginfo.img->data_size());
155 auto result = uploader.close();
156 subdoc.append(basic::kvp("data", [&](basic::sub_document subdoc) {
157 subdoc.append(basic::kvp("id", result.id()));
158 subdoc.append(basic::kvp("filename", name.str()));
159 }));
160 }));
161
162 try {
163 mongodb_->database(database_)[imginfo.topic_name].insert_one(document.view());
164 ++num_stored;
165 } catch (operation_exception &e) {
166 logger->log_warn(this->name(),
167 "Failed to insert image %s into %s.%s: %s",
168 imginfo.img->image_id(),
169 database_.c_str(),
170 imginfo.topic_name.c_str(),
171 e.what());
172 }
173 }
174 }
175
176 mutex_->unlock();
177 fawkes::Time loop_end(clock);
179 "Stored %u of %zu images in %.1f ms",
180 num_stored,
181 imgs_.size(),
182 (loop_end - &loop_start) * 1000.);
183 wait_->wait();
184}
185
186void
187MongoLogImagesThread::update_images()
188{
189 std::set<std::string> missing_images;
190 std::set<std::string> unbacked_images;
191 get_sets(missing_images, unbacked_images);
192
193 if (!unbacked_images.empty()) {
194 std::set<std::string>::iterator i;
195 for (i = unbacked_images.begin(); i != unbacked_images.end(); ++i) {
197 "Shutting down MongoLog for no longer available image %s",
198 i->c_str());
199 ImageInfo &imginfo = imgs_[*i];
200 delete imginfo.img;
201 imgs_.erase(*i);
202 }
203 }
204
205 if (!missing_images.empty()) {
206 std::set<std::string>::iterator i;
207 for (i = missing_images.begin(); i != missing_images.end(); ++i) {
208 std::vector<std::string>::iterator f;
209 bool include = includes_.empty();
210 if (!include) {
211 for (f = includes_.begin(); f != includes_.end(); ++f) {
212 if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
213 include = true;
214 break;
215 }
216 }
217 }
218 if (include) {
219 for (f = excludes_.begin(); f != excludes_.end(); ++f) {
220 if (fnmatch(f->c_str(), i->c_str(), 0) != FNM_NOMATCH) {
221 include = false;
222 break;
223 }
224 }
225 }
226 if (!include) {
227 //logger->log_info(name(), "Excluding image %s", i->c_str());
228 continue;
229 }
230
231 logger->log_info(name(), "Starting to log image %s", i->c_str());
232
233 std::string topic_name = std::string("Images.") + *i;
234 size_t pos = 0;
235 while ((pos = topic_name.find_first_of(" -", pos)) != std::string::npos) {
236 topic_name.replace(pos, 1, "_");
237 pos = pos + 1;
238 }
239
240 ImageInfo imginfo;
241 imginfo.topic_name = topic_name;
242 imginfo.img = new SharedMemoryImageBuffer(i->c_str());
243 imgs_[*i] = imginfo;
244 }
245 }
246}
247
248void
249MongoLogImagesThread::get_sets(std::set<std::string> &missing_images,
250 std::set<std::string> &unbacked_images)
251{
252 std::set<std::string> published_images;
253 std::map<std::string, ImageInfo>::iterator p;
254 for (p = imgs_.begin(); p != imgs_.end(); ++p) {
255 if (p->second.img->num_attached() > 1) {
256 published_images.insert(p->first);
257 }
258 }
259
260 std::set<std::string> image_buffers;
262 SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h);
263 SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
264
265 while (i != endi) {
267 dynamic_cast<const SharedMemoryImageBufferHeader *>(*i);
268 if (ih) {
269 image_buffers.insert(ih->image_id());
270 }
271 ++i;
272 }
273 delete h;
274
275 missing_images.clear();
276 unbacked_images.clear();
277
278 std::set_difference(image_buffers.begin(),
279 image_buffers.end(),
280 published_images.begin(),
281 published_images.end(),
282 std::inserter(missing_images, missing_images.end()));
283
284 std::set_difference(published_images.begin(),
285 published_images.end(),
286 image_buffers.begin(),
287 image_buffers.end(),
288 std::inserter(unbacked_images, unbacked_images.end()));
289}
ImageInfo representation for JSON transfer.
Definition: ImageInfo.h:28
std::optional< std::string > colorspace() const
Get colorspace value.
Definition: ImageInfo.h:143
std::optional< int64_t > width() const
Get width value.
Definition: ImageInfo.h:177
std::optional< int64_t > height() const
Get height value.
Definition: ImageInfo.h:194
virtual void finalize()
Finalize the thread.
virtual void init()
Initialize the thread.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
virtual ~MongoLogImagesThread()
Destructor.
virtual void loop()
Code to execute in the thread.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Thread aspect to access MongoDB.
Definition: mongodb.h:39
mongocxx::client * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:54
Mutex locking helper.
Definition: mutex_locker.h:34
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
Shared Memory iterator.
Definition: shm.h:119
Thread class encapsulation of pthreads.
Definition: thread.h:46
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
const char * name() const
Get name of thread.
Definition: thread.h:100
Time wait utility.
Definition: wait.h:33
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
void wait()
Wait until minimum loop time has been reached.
Definition: wait.cpp:78
A class for handling time.
Definition: time.h:93
Time & stamp()
Set this time to the current time.
Definition: time.cpp:704
long in_msec() const
Convert the stored time into milli-seconds.
Definition: time.cpp:228
Shared memory image buffer header.
Definition: shm_image.h:67
const char * image_id() const
Get image number.
Definition: shm_image.cpp:838
Shared memory image buffer.
Definition: shm_image.h:184
Fawkes library namespace.