Fawkes API Fawkes Development Version
openprs_server_proxy.cpp
1
2/***************************************************************************
3 * openprs_server_proxy.h - OpenPRS server proxy
4 *
5 * Created: Tue Aug 19 16:59:27 2014
6 * Copyright 2014 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version. A runtime exception applies to
13 * this software (see LICENSE.GPL_WRE file mentioned below for details).
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library General Public License for more details.
19 *
20 * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
21 */
22
23#include "openprs_server_proxy.h"
24
25#include <core/exception.h>
26#include <core/exceptions/system.h>
27#include <core/threading/mutex_locker.h>
28#include <logging/logger.h>
29
30#include <boost/bind/bind.hpp>
31#include <boost/lexical_cast.hpp>
32
33using namespace boost::asio;
34
35// Types copied from OPRS because they are not public there
36/// @cond EXTERN
37namespace OPRS {
38typedef enum { MESSAGE_MT = 1, BROADCAST_MT, MULTICAST_MT, DISCONNECT_MT } Message_Type;
39typedef enum { REGISTER_OK, REGISTER_NAME_CONFLICT, REGISTER_DENIED } Register_Type;
40typedef enum { MESSAGES_PT, STRINGS_PT } Protocol_Type;
41} // namespace OPRS
42/// @endcond
43
44namespace fawkes {
45
46/** @class OpenPRSServerProxy "openprs_server_proxy.h"
47 * Proxy for the OpenPRS server communication.
48 * Using this proxy allows to inject commands into the communication between
49 * oprs-server and oprs (or xoprs).
50 * @author Tim Niemueller
51 */
52
53/** Constructor.
54 * @param tcp_port port to listen on for incoming connections
55 * @param server_host host of oprs-server to connect to
56 * @param server_port TCP port that oprs-server listens on
57 * @param logger logger for informational messages
58 */
60 const std::string &server_host,
61 unsigned short server_port,
62 fawkes::Logger * logger)
63: io_service_work_(io_service_),
64 acceptor_(io_service_, ip::tcp::endpoint(ip::tcp::v6(), tcp_port)),
65 server_host_(server_host),
66 server_port_(server_port),
67 logger_(logger)
68{
69 acceptor_.set_option(socket_base::reuse_address(true));
70 io_service_thread_ = std::thread([this]() { this->io_service_.run(); });
71 start_accept();
72}
73
74/** Destructor. */
76{
77 io_service_.stop();
78 io_service_thread_.join();
79}
80
81/** Check if a kernel connected to the proxy.
82 * @param kernel_name name of the kernel to look for
83 * @return true if the kernel connected, false otherwise
84 */
85bool
86OpenPRSServerProxy::has_kernel(const std::string &kernel_name)
87{
88 auto map_it =
89 find_if(mappings_.begin(), mappings_.end(), [&kernel_name](const Mapping::Ptr &mapping) {
90 return mapping->client_name == kernel_name;
91 });
92 return (map_it != mappings_.end());
93}
94
95OpenPRSServerProxy::Mapping::Ptr
96OpenPRSServerProxy::find_mapping(const std::string &recipient)
97{
98 auto map_it =
99 find_if(mappings_.begin(), mappings_.end(), [&recipient](const Mapping::Ptr &mapping) {
100 return mapping->client_name == recipient;
101 });
102 if (map_it != mappings_.end()) {
103 return *map_it;
104 } else {
105 throw Exception("Client %s is not connected to OpenPRS server proxy", recipient.c_str());
106 }
107}
108
109/** Transmit a command to an OpenPRS kernel.
110 * This works equivalent to the transmit oprs-server console command.
111 * @param recipient OpenPRS kernel name to send to
112 * @param command command to send, cf. OpenPRS manual for valid commands
113 */
114void
115OpenPRSServerProxy::transmit_command(const std::string &recipient, const std::string &command)
116{
117 MutexLocker lock(mappings_.mutex());
118 Mapping::Ptr mapping = find_mapping(recipient);
119 mapping->transmit_command(command);
120}
121
122/** Transmit a command to an OpenPRS kernel.
123 * This works equivalent to the transmit oprs-server console command.
124 * This function allows to pass a format according to the sprintf()
125 * format and its arguments.
126 * @param recipient OpenPRS kernel name to send to
127 * @param format format string for the command, must be followed by the
128 * appropriate number and types of arguments.
129 */
130void
131OpenPRSServerProxy::transmit_command_f(const std::string &recipient, const char *format, ...)
132{
133 MutexLocker lock(mappings_.mutex());
134 Mapping::Ptr mapping = find_mapping(recipient);
135
136 va_list arg;
137 va_start(arg, format);
138
139 char *msg;
140 if (vasprintf(&msg, format, arg) == -1) {
141 va_end(arg);
142 throw OutOfMemoryException("Cannot format OpenPRS client command string");
143 }
144 va_end(arg);
145 std::string command = msg;
146 free(msg);
147
148 mapping->transmit_command(command);
149}
150
151/** Transmit a command to an OpenPRS kernel.
152 * This works equivalent to the transmit oprs-server console command.
153 * This function allows to pass a format according to the sprintf()
154 * format and its arguments. The arguments are read from the @p arg list.
155 * @param recipient OpenPRS kernel name to send to
156 * @param format format string for the command, must be followed by the
157 * appropriate number and types of arguments.
158 * @param arg argument list for the string format
159 */
160void
161OpenPRSServerProxy::transmit_command_v(const std::string &recipient,
162 const char * format,
163 va_list arg)
164{
165 MutexLocker lock(mappings_.mutex());
166 Mapping::Ptr mapping = find_mapping(recipient);
167
168 char *msg;
169 if (vasprintf(&msg, format, arg) == -1) {
170 throw OutOfMemoryException("Cannot format OpenPRS client command string");
171 }
172 std::string command = msg;
173 free(msg);
174
175 mapping->transmit_command(command);
176}
177
178/** Start accepting connections. */
179void
180OpenPRSServerProxy::start_accept()
181{
182 Mapping::Ptr mapping(new Mapping(io_service_, server_host_, server_port_, logger_));
183 acceptor_.async_accept(mapping->client_socket,
184 boost::bind(&OpenPRSServerProxy::handle_accept,
185 this,
186 mapping,
187 boost::asio::placeholders::error));
188}
189
190void
191OpenPRSServerProxy::handle_accept(Mapping::Ptr mapping, const boost::system::error_code &error)
192{
193 if (!error) {
194 MutexLocker lock(mappings_.mutex());
195 mappings_.push_back(mapping);
196 mapping->start();
197 }
198
199 start_accept();
200}
201
202OpenPRSServerProxy::Mapping::Mapping(boost::asio::io_service &io_service,
203 const std::string & server_host,
204 unsigned short server_port,
205 fawkes::Logger * logger)
206: io_service_(io_service),
207 resolver_(io_service_),
208 server_host_(server_host),
209 server_port_(server_port),
210 logger_(logger),
211 client_socket(io_service_),
212 server_socket(io_service_)
213{
214}
215
216/** Destruct mapping.
217 * This closes both, client and server sockets. This destructor
218 * assumes that the io_service has been cancelled.
219 */
220OpenPRSServerProxy::Mapping::~Mapping()
221{
222 boost::system::error_code err;
223 client_socket.shutdown(ip::tcp::socket::shutdown_both, err);
224 client_socket.close();
225 server_socket.shutdown(ip::tcp::socket::shutdown_both, err);
226 server_socket.close();
227}
228
229/** A client has connected, start this mapping. */
230void
231OpenPRSServerProxy::Mapping::start()
232{
233 logger_->log_info("OPRS-server-proxy", "Client connected, connecting to server");
234 ip::tcp::resolver::query query(server_host_, boost::lexical_cast<std::string>(server_port_));
235 resolver_.async_resolve(query,
236 boost::bind(&OpenPRSServerProxy::Mapping::handle_resolve,
237 this,
238 boost::asio::placeholders::error,
239 boost::asio::placeholders::iterator));
240}
241
242bool
243OpenPRSServerProxy::Mapping::alive() const
244{
245 return client_socket.is_open();
246}
247
248void
249OpenPRSServerProxy::Mapping::disconnect()
250{
251 logger_->log_info("OPRS-server-proxy", "Disconnecting %s", client_name.c_str());
252 boost::system::error_code ec;
253 client_socket.shutdown(ip::tcp::socket::shutdown_both, ec);
254 client_socket.close();
255}
256
257void
258OpenPRSServerProxy::Mapping::handle_resolve(const boost::system::error_code &err,
259 ip::tcp::resolver::iterator endpoint_iterator)
260{
261 if (!err) {
262 // Attempt a connection to each endpoint in the list until we
263 // successfully establish a connection.
264#if BOOST_ASIO_VERSION > 100409
265 boost::asio::async_connect(server_socket,
266 endpoint_iterator,
267#else
268 server_socket.async_connect(*endpoint_iterator,
269#endif
270 boost::bind(&OpenPRSServerProxy::Mapping::handle_connect,
271 this,
272 boost::asio::placeholders::error));
273 } else {
274 disconnect();
275 }
276}
277
278void
279OpenPRSServerProxy::Mapping::handle_connect(const boost::system::error_code &err)
280{
281 if (!err) {
282 try {
283 // forward greeting
284 std::string greeting = read_string_from_socket(server_socket);
285 logger_->log_info("OPRS-server-proxy", "Forwarding greeting '%s'", greeting.c_str());
286 write_string_to_socket(client_socket, greeting);
287
288 int client_pid = 0;
289 int client_use_x = 0;
290
291 logger_->log_info("OPRS-server-proxy", "Reading client details");
292 // now read connection details
293 client_name = read_string_from_socket(client_socket);
294 client_pid = read_int_from_socket(client_socket);
295 client_use_x = read_int_from_socket(client_socket);
296
297 logger_->log_info("OPRS-server-proxy",
298 "Got client info: %s %i %s",
299 client_name.c_str(),
300 client_pid,
301 client_use_x ? "XOPRS" : "OPRS");
302
303 // forward to server
304 write_string_to_socket(server_socket, client_name);
305 write_int_to_socket(server_socket, client_pid);
306 write_int_to_socket(server_socket, client_use_x);
307
308 start_recv_client();
309 start_recv_server();
310 } catch (Exception &e) {
311 disconnect();
312 }
313 } else {
314 disconnect();
315 }
316}
317
318void
319OpenPRSServerProxy::Mapping::start_recv_client()
320{
321 boost::asio::async_read(client_socket,
322 boost::asio::buffer(&client_in_num_completions_,
323 sizeof(client_in_num_completions_)),
324 boost::bind(&OpenPRSServerProxy::Mapping::handle_recv_client,
325 this,
326 boost::asio::placeholders::error));
327}
328
329void
330OpenPRSServerProxy::Mapping::start_recv_server()
331{
332 boost::asio::async_read_until(server_socket,
333 server_buffer_,
334 '\n',
335 boost::bind(&OpenPRSServerProxy::Mapping::handle_recv_server,
336 this,
337 boost::asio::placeholders::error));
338}
339
340void
341OpenPRSServerProxy::Mapping::handle_recv_server(const boost::system::error_code &err)
342{
343 if (!err) {
344 std::string line;
345 std::istream in_stream(&server_buffer_);
346 std::getline(in_stream, line);
347
348 logger_->log_info("OPRS-server-proxy", "Forwarding S->C line '%s'", line.c_str());
349 write_string_newline_to_socket(client_socket, line);
350
351 start_recv_server();
352 } else {
353 disconnect();
354 }
355}
356
357void
358OpenPRSServerProxy::Mapping::handle_recv_client(const boost::system::error_code &err)
359{
360 if (!err) {
361 client_in_num_completions_ = ntohl(client_in_num_completions_);
362 for (int i = 0; i < client_in_num_completions_; ++i) {
363 std::string c = read_string_from_socket(client_socket);
364 write_string_to_socket(server_socket, c);
365 }
366
367 start_recv_client();
368 } else {
369 disconnect();
370 }
371}
372
373void
374OpenPRSServerProxy::Mapping::transmit_command(const std::string &command)
375{
376 write_string_newline_to_socket(client_socket, command);
377}
378
379/** Read an int from a given socket.
380 * @param socket socket to read from
381 * @return read value
382 */
383int
384OpenPRSServerProxy::read_int_from_socket(boost::asio::ip::tcp::socket &socket)
385{
386 int32_t value;
387 boost::system::error_code ec;
388 boost::asio::read(socket, boost::asio::buffer(&value, sizeof(value)), ec);
389 if (ec) {
390 throw Exception("Failed to read int from socket: %s", ec.message().c_str());
391 } else {
392 return ntohl(value);
393 }
394}
395
396/** Read a string from a given socket.
397 * @param socket socket to read from
398 * @return read value
399 */
400std::string
401OpenPRSServerProxy::read_string_from_socket(boost::asio::ip::tcp::socket &socket)
402{
403 uint32_t s_size = 0;
404 boost::system::error_code ec;
405 boost::asio::read(socket, boost::asio::buffer(&s_size, sizeof(s_size)), ec);
406 if (ec) {
407 throw Exception("Failed to read string size from socket: %s", ec.message().c_str());
408 }
409 s_size = ntohl(s_size);
410
411 char s[s_size + 1];
412 boost::asio::read(socket, boost::asio::buffer(s, s_size), ec);
413 if (ec) {
414 throw Exception("Failed to read string content from socket: %s", ec.message().c_str());
415 }
416 s[s_size] = 0;
417
418 return s;
419}
420
421/** Write an int to a given socket.
422 * @param socket socket to write to
423 * @param i value to write
424 */
425void
426OpenPRSServerProxy::write_int_to_socket(boost::asio::ip::tcp::socket &socket, int i)
427{
428 boost::system::error_code ec;
429 int32_t value = htonl(i);
430 boost::asio::write(socket, boost::asio::buffer(&value, sizeof(value)), ec);
431 if (ec) {
432 throw Exception("Failed to write int to socket: %s", ec.message().c_str());
433 }
434}
435
436/** Write a string to a given socket.
437 * @param socket socket to write to
438 * @param str string value to write
439 */
440void
441OpenPRSServerProxy::write_string_to_socket(boost::asio::ip::tcp::socket &socket,
442 const std::string & str)
443{
444 boost::system::error_code ec;
445 uint32_t s_size = htonl(str.size());
446 std::array<boost::asio::const_buffer, 2> buffers;
447 buffers[0] = boost::asio::buffer(&s_size, sizeof(s_size));
448 buffers[1] = boost::asio::buffer(str.c_str(), str.size());
449
450 boost::asio::write(socket, buffers, ec);
451 if (ec) {
452 throw Exception("Failed to write string to socket: %s", ec.message().c_str());
453 }
454}
455
456/** Write a string followed by a newline character to a given socket.
457 * @param socket socket to write to
458 * @param str string value to write
459 */
460void
461OpenPRSServerProxy::write_string_newline_to_socket(boost::asio::ip::tcp::socket &socket,
462 const std::string & str)
463{
464 boost::system::error_code ec;
465 std::string s = str + "\n";
466 boost::asio::write(socket, boost::asio::buffer(s.c_str(), s.size()), ec);
467 if (ec) {
468 throw Exception("Failed to write string to socket: %s", ec.message().c_str());
469 }
470}
471
472} // end namespace fawkes
Base class for exceptions in Fawkes.
Definition: exception.h:36
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_list.h:172
Interface for logging.
Definition: logger.h:42
Mutex locking helper.
Definition: mutex_locker.h:34
void transmit_command_f(const std::string &client_name, const char *format,...)
Transmit a command to an OpenPRS kernel.
virtual ~OpenPRSServerProxy()
Destructor.
static void write_string_newline_to_socket(boost::asio::ip::tcp::socket &socket, const std::string &str)
Write a string followed by a newline character to a given socket.
static void write_int_to_socket(boost::asio::ip::tcp::socket &socket, int i)
Write an int to a given socket.
static int read_int_from_socket(boost::asio::ip::tcp::socket &socket)
Read an int from a given socket.
static std::string read_string_from_socket(boost::asio::ip::tcp::socket &socket)
Read a string from a given socket.
void transmit_command(const std::string &client_name, const std::string &command)
Transmit a command to an OpenPRS kernel.
void transmit_command_v(const std::string &client_name, const char *format, va_list arg)
Transmit a command to an OpenPRS kernel.
bool has_kernel(const std::string &kernel_name)
Check if a kernel connected to the proxy.
OpenPRSServerProxy(unsigned short tcp_port, const std::string &server_host, unsigned short server_port, fawkes::Logger *logger)
Constructor.
static void write_string_to_socket(boost::asio::ip::tcp::socket &socket, const std::string &str)
Write a string to a given socket.
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
Fawkes library namespace.