Fawkes API Fawkes Development Version
mongodb_log_bb_thread.cpp
1
2/***************************************************************************
3 * mongodb_log_bb_thread.cpp - MongoDB blackboard logging Thread
4 *
5 * Created: Wed Dec 08 23:09:29 2010
6 * Copyright 2010-2017 Tim Niemueller [www.niemueller.de]
7 * 2012 Bastian Klingen
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library General Public License for more details.
19 *
20 * Read the full text in the LICENSE.GPL file in the doc directory.
21 */
22
23#include "mongodb_log_bb_thread.h"
24
25#include <core/threading/mutex_locker.h>
26#include <plugins/mongodb/aspect/mongodb_conncreator.h>
27
28#include <cstdlib>
29#include <fnmatch.h>
30#include <mongocxx/client.hpp>
31#include <mongocxx/exception/operation_exception.hpp>
32
33using namespace mongocxx;
34using namespace fawkes;
35
36/** @class MongoLogBlackboardThread "mongodb_thread.h"
37 * MongoDB Logging Thread.
38 * This thread registers to interfaces specified with patterns in the
39 * configurationa and logs any changes to MongoDB.
40 *
41 * @author Tim Niemueller
42 */
43
44/** Constructor. */
46: Thread("MongoLogBlackboardThread", Thread::OPMODE_WAITFORWAKEUP), MongoDBAspect("default")
47{
48}
49
50/** Destructor. */
52{
53}
54
55void
57{
58 now_ = new Time(clock);
59 database_ = "fflog";
60 try {
61 database_ = config->get_string("/plugins/mongodb-log/database");
62 } catch (Exception &e) {
63 logger->log_info(name(), "No database configured, writing to %s", database_.c_str());
64 }
65
66 std::vector<std::string> includes;
67 try {
68 includes = config->get_strings("/plugins/mongodb-log/blackboard/includes");
69 } catch (Exception &e) {
70 } // ignored, no include rules
71 try {
72 excludes_ = config->get_strings("/plugins/mongodb-log/blackboard/excludes");
73 } catch (Exception &e) {
74 } // ignored, no include rules
75
76 if (includes.empty()) {
77 includes.push_back("*");
78 }
79
80 std::vector<std::string>::iterator i;
81 std::vector<std::string>::iterator e;
82 for (i = includes.begin(); i != includes.end(); ++i) {
83 bbio_add_observed_create("*", i->c_str());
84
85 std::list<Interface *> current_interfaces =
86 blackboard->open_multiple_for_reading("*", i->c_str());
87
88 std::list<Interface *>::iterator i;
89 for (i = current_interfaces.begin(); i != current_interfaces.end(); ++i) {
90 bool exclude = false;
91 for (e = excludes_.begin(); e != excludes_.end(); ++e) {
92 if (fnmatch(e->c_str(), (*i)->id(), 0) != FNM_NOMATCH) {
93 logger->log_debug(name(), "Excluding '%s' by config rule", (*i)->uid());
94 blackboard->close(*i);
95 exclude = true;
96 break;
97 }
98 }
99 if (exclude)
100 continue;
101
102 logger->log_debug(name(), "Adding %s", (*i)->uid());
103 client * mc = mongodb_connmgr->create_client();
104 std::string agent_name = config->get_string_or_default("/fawkes/agent/name", "");
105 listeners_[(*i)->uid()] = new InterfaceListener(
106 blackboard, *i, mc, database_, collections_, agent_name, logger, now_);
107 }
108 }
109
111}
112
113void
115{
117
118 std::map<std::string, InterfaceListener *>::iterator i;
119 for (i = listeners_.begin(); i != listeners_.end(); ++i) {
120 client *mc = i->second->mongodb_client();
121 delete i->second;
123 }
124 listeners_.clear();
125}
126
127void
129{
130}
131
132// for BlackBoardInterfaceObserver
133void
134MongoLogBlackboardThread::bb_interface_created(const char *type, const char *id) noexcept
135{
136 MutexLocker lock(listeners_.mutex());
137
138 std::vector<std::string>::iterator e;
139 for (e = excludes_.begin(); e != excludes_.end(); ++e) {
140 if (fnmatch(e->c_str(), id, 0) != FNM_NOMATCH) {
141 logger->log_debug(name(), "Ignoring excluded interface '%s::%s'", type, id);
142 return;
143 }
144 }
145
146 try {
147 Interface *interface = blackboard->open_for_reading(type, id);
148 if (listeners_.find(interface->uid()) == listeners_.end()) {
149 logger->log_debug(name(), "Opening new %s", interface->uid());
150 client * mc = mongodb_connmgr->create_client();
151 std::string agent_name = config->get_string_or_default("/fawkes/agent/name", "");
152 listeners_[interface->uid()] = new InterfaceListener(
153 blackboard, interface, mc, database_, collections_, agent_name, logger, now_);
154 } else {
155 logger->log_warn(name(), "Interface %s already opened", interface->uid());
156 blackboard->close(interface);
157 }
158 } catch (Exception &e) {
159 logger->log_warn(name(), "Failed to open interface %s::%s, exception follows", type, id);
160 logger->log_warn(name(), e);
161 }
162}
163
164/** Constructor.
165 * @param blackboard blackboard
166 * @param interface interface to listen for
167 * @param mongodb MongoDB client to write to
168 * @param database name of database to write to
169 * @param colls collections
170 * @param agent_name agent belonging to the fawkes instance.
171 * @param logger logger
172 * @param now Time
173 */
174MongoLogBlackboardThread::InterfaceListener::InterfaceListener(BlackBoard * blackboard,
175 Interface * interface,
176 client * mongodb,
177 std::string & database,
179 const std::string & agent_name,
180 Logger * logger,
181 Time * now)
182: BlackBoardInterfaceListener("MongoLogListener-%s", interface->uid()),
183 database_(database),
184 collections_(colls),
185 agent_name_(agent_name)
186{
187 blackboard_ = blackboard;
188 interface_ = interface;
189 mongodb_ = mongodb;
190 logger_ = logger;
191 now_ = now;
192
193 // sanitize interface ID to be suitable for MongoDB
194 std::string id = interface->id();
195 size_t pos = 0;
196 while ((pos = id.find_first_of(" -", pos)) != std::string::npos) {
197 id.replace(pos, 1, "_");
198 pos = pos + 1;
199 }
200 collection_ = std::string(interface->type()) + "." + id;
201 if (collections_.find(collection_) != collections_.end()) {
202 throw Exception("Collection named %s already used, cannot log %s",
203 collection_.c_str(),
204 interface->uid());
205 }
206
207 bbil_add_data_interface(interface);
208 blackboard_->register_listener(this, BlackBoard::BBIL_FLAG_DATA);
209}
210
211/** Destructor. */
212MongoLogBlackboardThread::InterfaceListener::~InterfaceListener()
213{
214 blackboard_->unregister_listener(this);
215}
216
217void
218MongoLogBlackboardThread::InterfaceListener::bb_interface_data_refreshed(
219 Interface *interface) noexcept
220{
221 now_->stamp();
222 interface->read();
223
224 try {
225 // write interface data
226 using namespace bsoncxx::builder;
227 basic::document document;
228 document.append(basic::kvp("timestamp", static_cast<int64_t>(now_->in_msec())));
230 for (i = interface->fields(); i != interface->fields_end(); ++i) {
231 size_t length = i.get_length();
232 bool is_array = (length > 1);
233
234 std::string key{i.get_name()};
235 switch (i.get_type()) {
236 case IFT_BOOL:
237 if (is_array) {
238 bool *bools = i.get_bools();
239 document.append(basic::kvp(key, [bools, length](basic::sub_array subarray) {
240 for (size_t l = 0; l < length; ++l) {
241 subarray.append(bools[l]);
242 }
243 }));
244 } else {
245 document.append(basic::kvp(key, i.get_bool()));
246 }
247 break;
248
249 case IFT_INT8:
250 if (is_array) {
251 int8_t *ints = i.get_int8s();
252 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
253 for (size_t l = 0; l < length; ++l) {
254 subarray.append(ints[l]);
255 }
256 }));
257 } else {
258 document.append(basic::kvp(key, i.get_int8()));
259 }
260 break;
261
262 case IFT_UINT8:
263 if (is_array) {
264 uint8_t *ints = i.get_uint8s();
265 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
266 for (size_t l = 0; l < length; ++l) {
267 subarray.append(ints[l]);
268 }
269 }));
270 } else {
271 document.append(basic::kvp(key, i.get_uint8()));
272 }
273 break;
274
275 case IFT_INT16:
276 if (is_array) {
277 int16_t *ints = i.get_int16s();
278 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
279 for (size_t l = 0; l < length; ++l) {
280 subarray.append(ints[l]);
281 }
282 }));
283 } else {
284 document.append(basic::kvp(key, i.get_int16()));
285 }
286 break;
287
288 case IFT_UINT16:
289 if (is_array) {
290 uint16_t *ints = i.get_uint16s();
291 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
292 for (size_t l = 0; l < length; ++l) {
293 subarray.append(ints[l]);
294 }
295 }));
296 } else {
297 document.append(basic::kvp(key, i.get_uint16()));
298 }
299 break;
300
301 case IFT_INT32:
302 if (is_array) {
303 int32_t *ints = i.get_int32s();
304 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
305 for (size_t l = 0; l < length; ++l) {
306 subarray.append(ints[l]);
307 }
308 }));
309 } else {
310 document.append(basic::kvp(key, i.get_int32()));
311 }
312 break;
313
314 case IFT_UINT32:
315 if (is_array) {
316 uint32_t *ints = i.get_uint32s();
317 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
318 for (size_t l = 0; l < length; ++l) {
319 subarray.append(static_cast<int64_t>(ints[l]));
320 }
321 }));
322 } else {
323 document.append(basic::kvp(key, static_cast<int64_t>(i.get_uint32())));
324 }
325 break;
326
327 case IFT_INT64:
328 if (is_array) {
329 int64_t *ints = i.get_int64s();
330 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
331 for (size_t l = 0; l < length; ++l) {
332 subarray.append(ints[l]);
333 }
334 }));
335 } else {
336 document.append(basic::kvp(key, i.get_int64()));
337 }
338 break;
339
340 case IFT_UINT64:
341 if (is_array) {
342 uint64_t *ints = i.get_uint64s();
343 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
344 for (size_t l = 0; l < length; ++l) {
345 subarray.append(static_cast<int64_t>(ints[l]));
346 }
347 }));
348 } else {
349 document.append(basic::kvp(key, static_cast<int64_t>(i.get_uint64())));
350 }
351 break;
352
353 case IFT_FLOAT:
354 if (is_array) {
355 float *floats = i.get_floats();
356 document.append(basic::kvp(key, [floats, length](basic::sub_array subarray) {
357 for (size_t l = 0; l < length; ++l) {
358 subarray.append(floats[l]);
359 }
360 }));
361 } else {
362 document.append(basic::kvp(key, i.get_float()));
363 }
364 break;
365
366 case IFT_DOUBLE:
367 if (is_array) {
368 double *doubles = i.get_doubles();
369 document.append(basic::kvp(key, [doubles, length](basic::sub_array subarray) {
370 for (size_t l = 0; l < length; ++l) {
371 subarray.append(doubles[l]);
372 }
373 }));
374 } else {
375 document.append(basic::kvp(key, i.get_double()));
376 }
377 break;
378
379 case IFT_STRING: document.append(basic::kvp(key, i.get_string())); break;
380
381 case IFT_BYTE:
382 if (is_array) {
383 uint8_t *bytes = i.get_bytes();
384 document.append(basic::kvp(key, [bytes, length](basic::sub_array subarray) {
385 for (size_t l = 0; l < length; ++l) {
386 subarray.append(bytes[l]);
387 }
388 }));
389 } else {
390 document.append(basic::kvp(key, i.get_byte()));
391 }
392 break;
393
394 case IFT_ENUM:
395 if (is_array) {
396 int32_t *ints = i.get_enums();
397 document.append(basic::kvp(key, [ints, length](basic::sub_array subarray) {
398 for (size_t l = 0; l < length; ++l) {
399 subarray.append(ints[l]);
400 }
401 }));
402 } else {
403 document.append(basic::kvp(key, i.get_enum()));
404 }
405 break;
406 }
407 }
408
409 document.append(basic::kvp("agent-name", agent_name_));
410 mongodb_->database(database_)[collection_].insert_one(document.view());
411 } catch (operation_exception &e) {
412 logger_->log_warn(
413 bbil_name(), "Failed to log to %s.%s: %s", database_.c_str(), collection_.c_str(), e.what());
414 } catch (std::exception &e) {
415 logger_->log_warn(bbil_name(),
416 "Failed to log to %s.%s: %s (*)",
417 database_.c_str(),
418 collection_.c_str(),
419 e.what());
420 }
421}
virtual void finalize()
Finalize the thread.
virtual ~MongoLogBlackboardThread()
Destructor.
virtual void bb_interface_created(const char *type, const char *id) noexcept
BlackBoard interface created notification.
virtual void init()
Initialize the thread.
virtual void loop()
Code to execute in the thread.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
BlackBoard interface listener.
void bbio_add_observed_create(const char *type_pattern, const char *id_pattern="*") noexcept
Add interface creation type to watch list.
The BlackBoard abstract class.
Definition: blackboard.h:46
virtual void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
Definition: blackboard.cpp:240
virtual void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: blackboard.cpp:225
virtual std::list< Interface * > open_multiple_for_reading(const char *type_pattern, const char *id_pattern="*", const char *owner=NULL)=0
Open multiple interfaces for reading.
virtual void close(Interface *interface)=0
Close interface.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual std::string get_string_or_default(const char *path, const std::string &default_val)
Get value from configuration which is of type string, or the given default if the path does not exist...
Definition: config.cpp:736
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
Interface field iterator.
float get_float(unsigned int index=0) const
Get value of current field as float.
int16_t get_int16(unsigned int index=0) const
Get value of current field as integer.
int8_t get_int8(unsigned int index=0) const
Get value of current field as integer.
int8_t * get_int8s() const
Get value of current field as integer array.
float * get_floats() const
Get value of current field as float array.
int32_t get_int32(unsigned int index=0) const
Get value of current field as integer.
uint8_t * get_bytes() const
Get value of current field as byte array.
size_t get_length() const
Get length of current field.
int32_t * get_int32s() const
Get value of current field as integer array.
int64_t get_int64(unsigned int index=0) const
Get value of current field as integer.
uint64_t get_uint64(unsigned int index=0) const
Get value of current field as unsigned integer.
int32_t get_enum(unsigned int index=0) const
Get value of current enum field as integer.
uint16_t get_uint16(unsigned int index=0) const
Get value of current field as unsigned integer.
double get_double(unsigned int index=0) const
Get value of current field as double.
int32_t * get_enums() const
Get value of current enum field as integer array.
int64_t * get_int64s() const
Get value of current field as integer array.
uint64_t * get_uint64s() const
Get value of current field as unsigned integer array.
uint32_t get_uint32(unsigned int index=0) const
Get value of current field as unsigned integer.
interface_fieldtype_t get_type() const
Get type of current field.
const char * get_name() const
Get name of current field.
uint8_t * get_uint8s() const
Get value of current field as unsigned integer array.
uint16_t * get_uint16s() const
Get value of current field as unsigned integer array.
const char * get_string() const
Get value of current field as string.
uint8_t get_byte(unsigned int index=0) const
Get value of current field as byte.
bool * get_bools() const
Get value of current field as bool array.
uint8_t get_uint8(unsigned int index=0) const
Get value of current field as unsigned integer.
bool get_bool(unsigned int index=0) const
Get value of current field as bool.
uint32_t * get_uint32s() const
Get value of current field as unsigned integer array.
double * get_doubles() const
Get value of current field as double array.
int16_t * get_int16s() const
Get value of current field as integer array.
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
const char * type() const
Get type of interface.
Definition: interface.cpp:652
InterfaceFieldIterator fields_end()
Invalid iterator.
Definition: interface.cpp:1240
const char * id() const
Get identifier of interface.
Definition: interface.cpp:661
InterfaceFieldIterator fields()
Get iterator over all fields of this interface instance.
Definition: interface.cpp:1231
const char * uid() const
Get unique identifier of interface.
Definition: interface.cpp:686
void read()
Read from BlackBoard into local copy.
Definition: interface.cpp:479
Interface for logging.
Definition: logger.h:42
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Thread aspect to access MongoDB.
Definition: mongodb.h:39
MongoDBConnCreator * mongodb_connmgr
Connection manager to retrieve more client connections from if necessary.
Definition: mongodb.h:55
virtual mongocxx::client * create_client(const std::string &config_name="")=0
Create a new MongoDB client.
virtual void delete_client(mongocxx::client *client)=0
Delete a client.
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: multi.cpp:216
virtual void log_debug(const char *component, const char *format,...)
Log debug message.
Definition: multi.cpp:174
Mutex locking helper.
Definition: mutex_locker.h:34
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
A class for handling time.
Definition: time.h:93
Fawkes library namespace.
@ IFT_INT8
8 bit integer field
Definition: types.h:38
@ IFT_UINT32
32 bit unsigned integer field
Definition: types.h:43
@ IFT_FLOAT
float field
Definition: types.h:46
@ IFT_BYTE
byte field, alias for uint8
Definition: types.h:49
@ IFT_UINT64
64 bit unsigned integer field
Definition: types.h:45
@ IFT_UINT16
16 bit unsigned integer field
Definition: types.h:41
@ IFT_INT32
32 bit integer field
Definition: types.h:42
@ IFT_INT64
64 bit integer field
Definition: types.h:44
@ IFT_DOUBLE
double field
Definition: types.h:47
@ IFT_INT16
16 bit integer field
Definition: types.h:40
@ IFT_STRING
string field
Definition: types.h:48
@ IFT_BOOL
boolean field
Definition: types.h:37
@ IFT_ENUM
field with interface specific enum type
Definition: types.h:50
@ IFT_UINT8
8 bit unsigned integer field
Definition: types.h:39