Fawkes API Fawkes Development Version
oprs_protobuf.h
1
2/***************************************************************************
3 * oprs_protobuf.h - protobuf network communication for OpenPRS
4 *
5 * Created: Tue Sep 02 16:34:09 2014 (based on CLIPS version)
6 * Copyright 2013-2014 Tim Niemueller [www.niemueller.de]
7 ****************************************************************************/
8
9/* Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 *
13 * - Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * - Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in
17 * the documentation and/or other materials provided with the
18 * distribution.
19 * - Neither the name of the authors nor the names of its contributors
20 * may be used to endorse or promote products derived from this
21 * software without specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27 * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34 * OF THE POSSIBILITY OF SUCH DAMAGE.
35 */
36
37#ifndef _OPENPRS_AGENT_OPRS_PROTOBUF_H_
38#define _OPENPRS_AGENT_OPRS_PROTOBUF_H_
39
40#include <core/threading/mutex.h>
41#include <core/utils/lock_queue.h>
42#include <protobuf_comm/server.h>
43
44#include <list>
45#include <map>
46#include <memory>
47#include <oprs-type-pub.h>
48#include <oprs-type_f-pub.h>
49
50namespace protobuf_comm {
51class ProtobufStreamClient;
52class ProtobufBroadcastPeer;
53} // namespace protobuf_comm
54
55namespace oprs_protobuf {
56
58{
59public:
60 OpenPRSProtobuf(std::vector<std::string> &proto_path);
62
63 /** Get Protobuf server.
64 * @return protobuf server */
65 protobuf_comm::ProtobufStreamServer *
66 server() const
67 {
68 return server_;
69 }
70
71 /** Get protobuf_comm peers.
72 * @return protobuf_comm peer */
73 const std::map<long int, protobuf_comm::ProtobufBroadcastPeer *> &
74 peers() const
75 {
76 return peers_;
77 }
78
79 /** Get the communicator's message register.
80 * @return message register */
81 protobuf_comm::MessageRegister &
83 {
84 return *message_register_;
85 }
86
87 /** Signal invoked for a message that has been sent to a server client.
88 * @return signal
89 */
90 boost::signals2::signal<void(protobuf_comm::ProtobufStreamServer::ClientID,
91 std::shared_ptr<google::protobuf::Message>)> &
93 {
94 return sig_server_sent_;
95 }
96
97 /** Signal invoked for a message that has been sent to a client.
98 * @return signal
99 */
100 boost::signals2::signal<
101 void(std::string, unsigned short, std::shared_ptr<google::protobuf::Message>)> &
103 {
104 return sig_client_sent_;
105 }
106
107 /** Signal invoked for a message that has been sent via broadcast.
108 * @return signal
109 */
110 boost::signals2::signal<void(long int, std::shared_ptr<google::protobuf::Message>)> &
112 {
113 return sig_peer_sent_;
114 }
115
116 bool oprs_pb_register_type(std::string full_name);
117 Term *oprs_pb_field_names(void *msgptr);
118 bool oprs_pb_has_field(void *msgptr, std::string field_name);
119 Term *oprs_pb_field_value(void *msgptr, std::string field_name);
120 Term *oprs_pb_field_type(void *msgptr, std::string field_name);
121 Term *oprs_pb_field_label(void *msgptr, std::string field_name);
122 Term *oprs_pb_field_list(void *msgptr, std::string field_name);
123 bool oprs_pb_field_is_list(void *msgptr, std::string field_name);
124 std::shared_ptr<google::protobuf::Message> *oprs_create_msg(std::string full_name);
125 Term * oprs_pb_ref(void *msgptr);
126 Term * oprs_pb_destroy(void *msgptr);
127 void oprs_pb_set_field(void *msgptr, std::string field_name, Term *value);
128 void oprs_pb_add_list(void *msgptr, std::string field_name, Term *value);
129 void oprs_pb_send(long int client_id, void *msgptr);
130 Term *oprs_pb_client_connect(std::string host, int port);
131 void oprs_pb_disconnect(long int client_id);
132 void oprs_pb_broadcast(long int peer_id, void *msgptr);
133 void oprs_pb_enable_server(int port);
135
136 Term *oprs_pb_peer_create(const std::string &host, int port);
137 Term *oprs_pb_peer_create_local(const std::string &host, int send_port, int recv_port);
138 Term *oprs_pb_peer_create_crypto(const std::string &host,
139 int port,
140 const std::string &crypto_key = "",
141 const std::string &cipher = "");
142 Term *oprs_pb_peer_create_local_crypto(const std::string &host,
143 int send_port,
144 int recv_port,
145 const std::string &crypto_key = "",
146 const std::string &cipher = "");
147 void oprs_pb_peer_destroy(long int peer_id);
148 void oprs_pb_peer_setup_crypto(long int peer_id,
149 const std::string &crypto_key,
150 const std::string &cipher);
151
153 void oprs_pb_process();
154
155private:
156 typedef enum { CT_SERVER, CT_CLIENT, CT_PEER } ClientType;
157 void clips_assert_message(std::pair<std::string, unsigned short> & endpoint,
158 uint16_t comp_id,
159 uint16_t msg_type,
160 std::shared_ptr<google::protobuf::Message> &msg,
161 ClientType ct,
162 unsigned int client_id = 0);
163 void handle_server_client_connected(protobuf_comm::ProtobufStreamServer::ClientID client,
164 boost::asio::ip::tcp::endpoint & endpoint);
165 void handle_server_client_disconnected(protobuf_comm::ProtobufStreamServer::ClientID client,
166 const boost::system::error_code & error);
167
168 void handle_server_client_msg(protobuf_comm::ProtobufStreamServer::ClientID client,
169 uint16_t component_id,
170 uint16_t msg_type,
171 std::shared_ptr<google::protobuf::Message> msg);
172
173 void handle_server_client_fail(protobuf_comm::ProtobufStreamServer::ClientID client,
174 uint16_t component_id,
175 uint16_t msg_type,
176 std::string msg);
177
178 void handle_peer_msg(long int peer_id,
179 boost::asio::ip::udp::endpoint & endpoint,
180 uint16_t component_id,
181 uint16_t msg_type,
182 std::shared_ptr<google::protobuf::Message> msg);
183 void handle_peer_recv_error(long int peer_id,
184 boost::asio::ip::udp::endpoint &endpoint,
185 std::string msg);
186 void handle_peer_send_error(long int peer_id, const std::string &msg);
187
188 void handle_client_connected(long int client_id);
189 void handle_client_disconnected(long int client_id, const boost::system::error_code &error);
190 void handle_client_msg(long int client_id,
191 uint16_t comp_id,
192 uint16_t msg_type,
193 std::shared_ptr<google::protobuf::Message> msg);
194 void handle_client_receive_fail(long int client_id,
195 uint16_t comp_id,
196 uint16_t msg_type,
197 const std::string &msg);
198 void oprs_assert_message(std::string & endpoint_host,
199 unsigned short endpoint_port,
200 uint16_t comp_id,
201 uint16_t msg_type,
202 std::shared_ptr<google::protobuf::Message> &msg,
203 OpenPRSProtobuf::ClientType ct,
204 unsigned int client_id);
205 void oprs_assert_server_client_event(long int client_id,
206 std::string & host,
207 unsigned short port,
208 bool connect);
209 void oprs_assert_client_event(long int client_id, bool connect);
210
211private:
212 std::shared_ptr<protobuf_comm::MessageRegister> message_register_;
213 protobuf_comm::ProtobufStreamServer * server_;
214
215 boost::signals2::signal<void(protobuf_comm::ProtobufStreamServer::ClientID,
216 std::shared_ptr<google::protobuf::Message>)>
217 sig_server_sent_;
218 boost::signals2::signal<
219 void(std::string, unsigned short, std::shared_ptr<google::protobuf::Message>)>
220 sig_client_sent_;
221 boost::signals2::signal<void(long int, std::shared_ptr<google::protobuf::Message>)>
222 sig_peer_sent_;
223
224 fawkes::Mutex map_mutex_;
225 long int next_client_id_;
226
227 std::map<long int, protobuf_comm::ProtobufStreamServer::ClientID> server_clients_;
228 typedef std::map<protobuf_comm::ProtobufStreamServer::ClientID, long int> RevServerClientMap;
229 RevServerClientMap rev_server_clients_;
230 std::map<long int, protobuf_comm::ProtobufStreamClient *> clients_;
231 std::map<long int, protobuf_comm::ProtobufBroadcastPeer *> peers_;
232
233 std::map<long int, std::pair<std::string, unsigned short>> client_endpoints_;
234
235 fawkes::LockQueue<std::tuple<std::string,
236 unsigned short,
237 uint16_t,
238 uint16_t,
239 std::shared_ptr<google::protobuf::Message>,
240 ClientType,
241 unsigned int>>
242 q_msgs_;
245};
246
247} // namespace oprs_protobuf
248
249#endif
Queue with a lock.
Definition: lock_queue.h:45
Mutex mutual exclusion lock.
Definition: mutex.h:33
OpenPRS protobuf integration class.
Definition: oprs_protobuf.h:58
Term * oprs_pb_peer_create_local_crypto(const std::string &host, int send_port, int recv_port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
Term * oprs_pb_destroy(void *msgptr)
Destroy given message (reference).
bool oprs_pb_register_type(std::string full_name)
Register a new message type.
void oprs_pb_broadcast(long int peer_id, void *msgptr)
Broadcast a message through a peer.
Term * oprs_pb_ref(void *msgptr)
Create new reference to message.
Term * oprs_pb_peer_create_local(const std::string &host, int send_port, int recv_port)
Enable protobuf peer.
Term * oprs_pb_peer_create_crypto(const std::string &host, int port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
const std::map< long int, protobuf_comm::ProtobufBroadcastPeer * > & peers() const
Get protobuf_comm peers.
Definition: oprs_protobuf.h:74
Term * oprs_pb_field_label(void *msgptr, std::string field_name)
Get a fields label.
void oprs_pb_set_field(void *msgptr, std::string field_name, Term *value)
Set a field.
std::shared_ptr< google::protobuf::Message > * oprs_create_msg(std::string full_name)
Create a new message of given type.
bool oprs_pb_field_is_list(void *msgptr, std::string field_name)
Check if a given field is a list (repeated field).
OpenPRSProtobuf(std::vector< std::string > &proto_path)
Constructor.
void oprs_pb_process()
Process all pending events.
protobuf_comm::ProtobufStreamServer * server() const
Get Protobuf server.
Definition: oprs_protobuf.h:66
bool oprs_pb_events_pending()
Check if there are pending events.
void oprs_pb_disable_server()
Disable protobuf stream server.
void oprs_pb_send(long int client_id, void *msgptr)
Send message to a specific client.
void oprs_pb_enable_server(int port)
Enable protobuf stream server.
Term * oprs_pb_client_connect(std::string host, int port)
Connect as a client to the given server.
Term * oprs_pb_peer_create(const std::string &host, int port)
Enable protobuf peer.
bool oprs_pb_has_field(void *msgptr, std::string field_name)
Check if message has a specific field.
boost::signals2::signal< void(long int, std::shared_ptr< google::protobuf::Message >)> & signal_peer_sent()
Signal invoked for a message that has been sent via broadcast.
void oprs_pb_peer_setup_crypto(long int peer_id, const std::string &crypto_key, const std::string &cipher)
Setup crypto for peer.
boost::signals2::signal< void(std::string, unsigned short, std::shared_ptr< google::protobuf::Message >)> & signal_client_sent()
Signal invoked for a message that has been sent to a client.
Term * oprs_pb_field_value(void *msgptr, std::string field_name)
Get properly typed field value.
protobuf_comm::MessageRegister & message_register()
Get the communicator's message register.
Definition: oprs_protobuf.h:82
Term * oprs_pb_field_names(void *msgptr)
Get field names of message.
boost::signals2::signal< void(protobuf_comm::ProtobufStreamServer::ClientID, std::shared_ptr< google::protobuf::Message >)> & signal_server_sent()
Signal invoked for a message that has been sent to a server client.
Definition: oprs_protobuf.h:92
void oprs_pb_add_list(void *msgptr, std::string field_name, Term *value)
Add value to a repeated field.
Term * oprs_pb_field_type(void *msgptr, std::string field_name)
Get type if a specific field.
void oprs_pb_disconnect(long int client_id)
Disconnect a given client.
Term * oprs_pb_field_list(void *msgptr, std::string field_name)
Get list of values of a given message field.
void oprs_pb_peer_destroy(long int peer_id)
Disable peer.