Fawkes API Fawkes Development Version
metrics_thread.cpp
1/***************************************************************************
2 * metrics_thread.cpp - Metrics exporter for Prometheus plugin
3 *
4 * Created: Sat May 06 19:44:55 2017 (German Open 2017)
5 * Copyright 2017 Tim Niemueller [www.niemueller.de]
6 ****************************************************************************/
7
8/* This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU Library General Public License for more details.
17 *
18 * Read the full text in the LICENSE.GPL file in the doc directory.
19 */
20
21#include "metrics_thread.h"
22
23#include "metrics_processor.h"
24
25#include <core/threading/mutex_locker.h>
26#include <interfaces/MetricCounterInterface.h>
27#include <interfaces/MetricGaugeInterface.h>
28#include <interfaces/MetricHistogramInterface.h>
29#include <interfaces/MetricUntypedInterface.h>
30#include <utils/misc/string_split.h>
31#include <webview/url_manager.h>
32
33#include <algorithm>
34#include <chrono>
35#include <functional>
36
37using namespace fawkes;
38
39#define CFG_PREFIX "/metrics/"
40#define URL_PREFIX "/metrics"
41
42/** @class MetricsThread "metrics_thread.h"
43 * Thread to export metrics for Prometheus.
44 * @author Tim Niemueller
45 */
46
47/** Constructor. */
49: Thread("MetricsThread", Thread::OPMODE_WAITFORWAKEUP),
50 BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_POST_LOOP),
51 AspectProviderAspect(&metrics_aspect_inifin_),
52 BlackBoardInterfaceListener("MetricsThread")
53{
54}
55
56/** Destructor. */
58{
59}
60
61void
63{
64 metrics_aspect_inifin_.set_manager(this);
65
66 bbio_add_observed_create("MetricFamilyInterface", "*");
68
69 MutexLocker lock(metric_bbs_.mutex());
70 std::list<MetricFamilyInterface *> ifaces =
71 blackboard->open_multiple_for_reading<MetricFamilyInterface>("*");
72
73 for (auto &i : ifaces) {
74 logger->log_info(name(), "Got metric family %s", i->id());
75 i->read();
76 MetricFamilyBB mfbb{.metric_family = i, .metric_type = i->metric_type()};
77 metric_bbs_[i->id()] = mfbb;
78
79 if (!conditional_open(i->id(), metric_bbs_[i->id()])) {
81 }
82 }
83
85
86 lock.unlock();
87
88 imf_loop_count_ = std::make_shared<io::prometheus::client::MetricFamily>();
89 imf_loop_count_->set_name("fawkes_loop_count");
90 imf_loop_count_->set_help("Number of Fawkes main loop iterations");
91 imf_loop_count_->set_type(io::prometheus::client::COUNTER);
92 imf_loop_count_->add_metric();
93 internal_metrics_.push_back(imf_loop_count_);
94
95 imf_metrics_requests_ = std::make_shared<io::prometheus::client::MetricFamily>();
96 imf_metrics_requests_->set_name("fawkes_metrics_requests");
97 imf_metrics_requests_->set_help("Number of requests for metrics");
98 imf_metrics_requests_->set_type(io::prometheus::client::COUNTER);
99 imf_metrics_requests_->add_metric();
100 internal_metrics_.push_back(imf_metrics_requests_);
101
102 try {
103 std::vector<float> buckets_le =
104 config->get_floats("/metrics/internal/metrics_requests/buckets");
105
106 if (!buckets_le.empty()) {
107 std::sort(buckets_le.begin(), buckets_le.end());
108
109 imf_metrics_proctime_ = std::make_shared<io::prometheus::client::MetricFamily>();
110 imf_metrics_proctime_->set_name("fawkes_metrics_proctime");
111 imf_metrics_proctime_->set_help("Time required to process metrics");
112 imf_metrics_proctime_->set_type(io::prometheus::client::HISTOGRAM);
113 auto m = imf_metrics_proctime_->add_metric();
114 auto h = m->mutable_histogram();
115 for (float &b : buckets_le) {
116 h->add_bucket()->set_upper_bound(b);
117 }
118 internal_metrics_.push_back(imf_metrics_proctime_);
119 }
120 } catch (Exception &e) {
122 "Internal metric metrics_proctime bucket bounds not configured, disabling");
123 }
124
125 metrics_suppliers_.push_back(this);
126
127 req_proc_ = new MetricsRequestProcessor(this, logger, URL_PREFIX);
128 webview_url_manager->add_handler(WebRequest::METHOD_GET,
129 URL_PREFIX,
131 req_proc_,
132 std::placeholders::_1));
133}
134
135void
137{
138 webview_url_manager->remove_handler(WebRequest::METHOD_GET, URL_PREFIX);
139 delete req_proc_;
140}
141
142void
144{
145 imf_loop_count_->mutable_metric(0)->mutable_counter()->set_value(
146 imf_loop_count_->metric(0).counter().value() + 1);
147}
148
149void
150MetricsThread::bb_interface_created(const char *type, const char *id) noexcept
151{
152 MutexLocker lock(metric_bbs_.mutex());
153 MetricFamilyInterface *mfi;
154 try {
155 mfi = blackboard->open_for_reading<MetricFamilyInterface>(id);
156 logger->log_info(name(), "Opened %s:%s", type, id);
157 } catch (Exception &e) {
158 // ignored
159 logger->log_warn(name(), "Failed to open %s:%s: %s", type, id, e.what_no_backtrace());
160 return;
161 }
162
163 try {
164 bbil_add_reader_interface(mfi);
165 bbil_add_writer_interface(mfi);
166 bbil_add_data_interface(mfi);
167 blackboard->update_listener(this);
168 } catch (Exception &e) {
169 logger->log_warn(name(), "Failed to register for %s:%s: %s", type, id, e.what());
170 try {
171 bbil_remove_reader_interface(mfi);
172 bbil_remove_writer_interface(mfi);
173 blackboard->update_listener(this);
174 blackboard->close(mfi);
175 } catch (Exception &e) {
176 logger->log_error(
177 name(), "Failed to deregister %s:%s during error recovery: %s", type, id, e.what());
178 }
179 return;
180 }
181 MetricFamilyBB mfbb{.metric_family = mfi, .metric_type = mfi->metric_type()};
182 metric_bbs_[id] = mfbb;
183}
184
185std::list<io::prometheus::client::MetricFamily>
186MetricsThread::metrics()
187{
188 std::chrono::high_resolution_clock::time_point proc_start =
189 std::chrono::high_resolution_clock::now();
190
191 imf_metrics_requests_->mutable_metric(0)->mutable_counter()->set_value(
192 imf_metrics_requests_->metric(0).counter().value() + 1);
193
194 std::list<io::prometheus::client::MetricFamily> rv;
195
196 MutexLocker lock(metric_bbs_.mutex());
197 for (auto &mbbp : metric_bbs_) {
198 auto &mfbb = mbbp.second;
199
200 io::prometheus::client::MetricFamily mf;
201 mfbb.metric_family->read();
202 mf.set_name(mfbb.metric_family->name());
203 mf.set_help(mfbb.metric_family->help());
204
205 switch (mfbb.metric_type) {
206 case MetricFamilyInterface::COUNTER:
207 mf.set_type(io::prometheus::client::COUNTER);
208 for (const auto &d : mfbb.data) {
209 d.counter->read();
210 io::prometheus::client::Metric *m = mf.add_metric();
211 parse_labels(d.counter->labels(), m);
212 m->mutable_counter()->set_value(d.counter->value());
213 }
214 break;
215
216 case MetricFamilyInterface::GAUGE:
217 mf.set_type(io::prometheus::client::GAUGE);
218 for (const auto &d : mfbb.data) {
219 d.gauge->read();
220 io::prometheus::client::Metric *m = mf.add_metric();
221 parse_labels(d.gauge->labels(), m);
222 m->mutable_gauge()->set_value(d.gauge->value());
223 }
224 break;
225
226 case MetricFamilyInterface::UNTYPED:
227 mf.set_type(io::prometheus::client::UNTYPED);
228 for (const auto &d : mfbb.data) {
229 d.untyped->read();
230 io::prometheus::client::Metric *m = mf.add_metric();
231 parse_labels(d.untyped->labels(), m);
232 m->mutable_untyped()->set_value(d.untyped->value());
233 }
234 break;
235
236 case MetricFamilyInterface::HISTOGRAM:
237 mf.set_type(io::prometheus::client::HISTOGRAM);
238 for (const auto &d : mfbb.data) {
239 d.histogram->read();
240 io::prometheus::client::Metric *m = mf.add_metric();
241 parse_labels(d.histogram->labels(), m);
242 io::prometheus::client::Histogram *h = m->mutable_histogram();
243 h->set_sample_count(d.histogram->sample_count());
244 h->set_sample_sum(d.histogram->sample_sum());
245 for (unsigned int i = 0; i < d.histogram->bucket_count(); ++i) {
246 io::prometheus::client::Bucket *b = h->add_bucket();
247 b->set_cumulative_count(d.histogram->bucket_cumulative_count(i));
248 b->set_upper_bound(d.histogram->bucket_upper_bound(i));
249 }
250 }
251 break;
252
253 case MetricFamilyInterface::NOT_INITIALIZED:
254 // ignore
255 break;
256 }
257 rv.push_back(std::move(mf));
258 }
259
260 if (imf_metrics_proctime_) {
261 std::chrono::high_resolution_clock::time_point proc_end =
262 std::chrono::high_resolution_clock::now();
263 const std::chrono::duration<double> proc_diff = proc_end - proc_start;
264 for (int i = 0; i < imf_metrics_proctime_->metric(0).histogram().bucket_size(); ++i) {
265 io::prometheus::client::Histogram *h =
266 imf_metrics_proctime_->mutable_metric(0)->mutable_histogram();
267 if (proc_diff.count() < h->bucket(i).upper_bound()) {
268 io::prometheus::client::Bucket *b = h->mutable_bucket(i);
269 b->set_cumulative_count(b->cumulative_count() + 1);
270 h->set_sample_count(h->sample_count() + 1);
271 h->set_sample_sum(h->sample_sum() + proc_diff.count());
272 break;
273 }
274 }
275 }
276
277 for (auto &im : internal_metrics_) {
278 rv.push_back(std::move(*im));
279 }
280
281 return rv;
282}
283
284std::list<io::prometheus::client::MetricFamily>
285MetricsThread::all_metrics()
286{
287 std::list<io::prometheus::client::MetricFamily> metrics;
288
289 for (auto &s : metrics_suppliers_) {
290 metrics.splice(metrics.begin(), std::move(s->metrics()));
291 }
292
293 return metrics;
294}
295
296void
297MetricsThread::add_supplier(MetricsSupplier *supplier)
298{
299 MutexLocker lock(metrics_suppliers_.mutex());
300 auto i = std::find(metrics_suppliers_.begin(), metrics_suppliers_.end(), supplier);
301 if (i == metrics_suppliers_.end()) {
302 metrics_suppliers_.push_back(supplier);
303 }
304}
305
306void
307MetricsThread::remove_supplier(MetricsSupplier *supplier)
308{
309 MutexLocker lock(metrics_suppliers_.mutex());
310 auto i = std::find(metrics_suppliers_.begin(), metrics_suppliers_.end(), supplier);
311 if (i != metrics_suppliers_.end()) {
312 metrics_suppliers_.erase(i);
313 }
314}
315
317MetricsThread::metrics_suppliers() const
318{
319 return metrics_suppliers_;
320}
321
322void
323MetricsThread::parse_labels(const std::string &labels, io::prometheus::client::Metric *m)
324{
325 std::vector<std::string> labelv = str_split(labels, ',');
326 for (const std::string &l : labelv) {
327 std::vector<std::string> key_value = str_split(l, '=');
328 if (key_value.size() == 2) {
329 io::prometheus::client::LabelPair *lp = m->add_label();
330 lp->set_name(key_value[0]);
331 lp->set_value(key_value[1]);
332 } else {
333 logger->log_warn(name(), "Invalid label '%s'", l.c_str());
334 }
335 }
336}
337
338void
339MetricsThread::bb_interface_writer_removed(fawkes::Interface *interface,
340 unsigned int instance_serial) noexcept
341{
342 conditional_close(interface);
343}
344
345void
346MetricsThread::bb_interface_reader_removed(fawkes::Interface *interface,
347 unsigned int instance_serial) noexcept
348{
349 conditional_close(interface);
350}
351
352void
353MetricsThread::bb_interface_data_refreshed(fawkes::Interface *interface) noexcept
354{
355 MetricFamilyInterface *mfi = dynamic_cast<MetricFamilyInterface *>(interface);
356 if (!mfi)
357 return;
358 if (!mfi->has_writer())
359 return;
360
361 mfi->read();
362 if (mfi->metric_type() == MetricFamilyInterface::NOT_INITIALIZED) {
363 logger->log_warn(name(),
364 "Got data changed event for %s which is not yet initialized",
365 mfi->uid());
366 return;
367 }
368
369 MutexLocker lock(metric_bbs_.mutex());
370 if (metric_bbs_.find(mfi->id()) == metric_bbs_.end()) {
371 logger->log_warn(name(), "Got data changed event for %s which is not registered", mfi->uid());
372 return;
373 }
374
375 metric_bbs_[mfi->id()].metric_type = mfi->metric_type();
376 if (conditional_open(mfi->id(), metric_bbs_[mfi->id()])) {
377 bbil_remove_data_interface(mfi);
378 blackboard->update_listener(this);
379 }
380}
381
382bool
383MetricsThread::conditional_open(const std::string &id, MetricFamilyBB &mfbb)
384{
385 mfbb.metric_family->read();
386
387 std::string data_id_pattern = id + "/*";
388
389 switch (mfbb.metric_type) {
390 case MetricFamilyInterface::COUNTER: {
391 std::list<MetricCounterInterface *> ifaces =
392 blackboard->open_multiple_for_reading<MetricCounterInterface>(data_id_pattern.c_str());
393 if (ifaces.empty())
394 return false;
395 std::transform(ifaces.begin(),
396 ifaces.end(),
397 std::back_inserter(mfbb.data),
398 [](MetricCounterInterface *iface) {
399 return MetricFamilyData{.counter = iface};
400 });
401 } break;
402
403 case MetricFamilyInterface::GAUGE: {
404 std::list<MetricGaugeInterface *> ifaces =
405 blackboard->open_multiple_for_reading<MetricGaugeInterface>(data_id_pattern.c_str());
406 if (ifaces.empty())
407 return false;
408 std::transform(ifaces.begin(),
409 ifaces.end(),
410 std::back_inserter(mfbb.data),
411 [](MetricGaugeInterface *iface) { return MetricFamilyData{.gauge = iface}; });
412 } break;
413
414 case MetricFamilyInterface::UNTYPED: {
415 std::list<MetricUntypedInterface *> ifaces =
416 blackboard->open_multiple_for_reading<MetricUntypedInterface>(data_id_pattern.c_str());
417 if (ifaces.empty())
418 return false;
419 std::transform(ifaces.begin(),
420 ifaces.end(),
421 std::back_inserter(mfbb.data),
422 [](MetricUntypedInterface *iface) {
423 return MetricFamilyData{.untyped = iface};
424 });
425 } break;
426
427 case MetricFamilyInterface::HISTOGRAM: {
428 std::list<MetricHistogramInterface *> ifaces =
429 blackboard->open_multiple_for_reading<MetricHistogramInterface>(data_id_pattern.c_str());
430 if (ifaces.empty())
431 return false;
432 std::transform(ifaces.begin(),
433 ifaces.end(),
434 std::back_inserter(mfbb.data),
435 [](MetricHistogramInterface *iface) {
436 return MetricFamilyData{.histogram = iface};
437 });
438 } break;
439
440 case MetricFamilyInterface::NOT_INITIALIZED:
441 logger->log_info(name(), "Metric family %s not yet initialized", id.c_str());
442 return false;
443 }
444
445 logger->log_info(name(), "Initialized metric %s", id.c_str());
446 return true;
447}
448
449void
450MetricsThread::conditional_close(Interface *interface) noexcept
451{
452 MetricFamilyInterface *mfi = dynamic_cast<MetricFamilyInterface *>(interface);
453 if (!mfi)
454 return;
455
456 MutexLocker lock(metric_bbs_.mutex());
457
458 if (metric_bbs_.find(mfi->id()) == metric_bbs_.end()) {
459 logger->log_warn(name(), "Called to close %s whic was not opened", mfi->uid());
460 return;
461 }
462
463 logger->log_info(name(), "Last on metric family %s, closing", interface->id());
464 auto &mfbb(metric_bbs_[mfi->id()]);
465
466 switch (mfbb.metric_type) {
467 case MetricFamilyInterface::COUNTER:
468 std::for_each(mfbb.data.begin(), mfbb.data.end(), [this](auto &d) {
469 this->blackboard->close(d.counter);
470 });
471 break;
472
473 case MetricFamilyInterface::GAUGE:
474 std::for_each(mfbb.data.begin(), mfbb.data.end(), [this](auto &d) {
475 this->blackboard->close(d.gauge);
476 });
477 break;
478 case MetricFamilyInterface::UNTYPED:
479 std::for_each(mfbb.data.begin(), mfbb.data.end(), [this](auto &d) {
480 this->blackboard->close(d.untyped);
481 });
482 break;
483 case MetricFamilyInterface::HISTOGRAM:
484 std::for_each(mfbb.data.begin(), mfbb.data.end(), [this](auto &d) {
485 this->blackboard->close(d.histogram);
486 });
487 break;
488 case MetricFamilyInterface::NOT_INITIALIZED: bbil_remove_data_interface(mfi); break;
489 }
490
491 metric_bbs_.erase(mfi->id());
492 lock.unlock();
493
494 std::string uid = interface->uid();
495 try {
496 bbil_remove_reader_interface(interface);
497 bbil_remove_writer_interface(interface);
498 blackboard->update_listener(this);
499 blackboard->close(interface);
500 } catch (Exception &e) {
501 logger->log_error(name(), "Failed to unregister or close %s: %s", uid.c_str(), e.what());
502 }
503}
Metrics web request processor.
fawkes::WebReply * process_request(const fawkes::WebRequest *request)
Process request.
virtual ~MetricsThread()
Destructor.
MetricsThread()
Constructor.
virtual void finalize()
Finalize the thread.
virtual void loop()
Code to execute in the thread.
virtual void init()
Initialize the thread.
Thread aspect provide a new aspect.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
BlackBoard interface listener.
void bbil_add_data_interface(Interface *interface)
Add an interface to the data modification watch list.
void bbio_add_observed_create(const char *type_pattern, const char *id_pattern="*") noexcept
Add interface creation type to watch list.
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
virtual void update_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Update BB event listener.
Definition: blackboard.cpp:197
virtual void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: blackboard.cpp:225
virtual std::list< Interface * > open_multiple_for_reading(const char *type_pattern, const char *id_pattern="*", const char *owner=NULL)=0
Open multiple interfaces for reading.
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:185
virtual void close(Interface *interface)=0
Close interface.
Thread aspect to use blocked timing.
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual std::vector< float > get_floats(const char *path)=0
Get list of values from configuration which is of type float.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual const char * what() const noexcept
Get primary string.
Definition: exception.cpp:639
virtual const char * what_no_backtrace() const noexcept
Get primary string (does not implicitly print the back trace).
Definition: exception.cpp:663
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_map.h:133
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
void set_manager(MetricsManager *supplier_mgr)
Set Metrics environment manger.
Metrics supplier class.
virtual void log_info(const char *component, const char *format,...)
Log informational message.
Definition: multi.cpp:195
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: multi.cpp:216
virtual void log_error(const char *component, const char *format,...)
Log error message.
Definition: multi.cpp:237
Mutex locking helper.
Definition: mutex_locker.h:34
void unlock()
Unlock the mutex.
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
void remove_handler(WebRequest::Method method, const std::string &path)
Remove a request processor.
Definition: url_manager.cpp:84
void add_handler(WebRequest::Method method, const std::string &path, Handler handler)
Add a request processor.
Definition: url_manager.cpp:54
WebUrlManager * webview_url_manager
Webview request processor manager.
Definition: webview.h:49
Fawkes library namespace.