-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsubscribe_server.cpp
327 lines (262 loc) · 8.16 KB
/
subscribe_server.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
#include <websocketpp/config/asio_no_tls.hpp>
#include <websocketpp/server.hpp>
#include <iostream>
/*#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>*/
#include <websocketpp/common/thread.hpp>
#include <chrono>
#define NUMBEROFTOPICS 6 //defines the number of possible subscribe topics
#include "actions.h"
#include "jsonparser.h"
std::string scenefile;
server::message_ptr initial_scene;
class SubscribeServer
{
public:
SubscribeServer()
{
// Initialize Asio Transport
_server.init_asio();
// Register handler callbacks
_server.set_open_handler(bind(&SubscribeServer::on_open
, this, ::_1));
_server.set_close_handler(bind(&SubscribeServer::on_close
, this, ::_1));
_server.set_message_handler(bind(&SubscribeServer::on_message
, this, ::_1, ::_2));
_server.set_http_handler(bind(&SubscribeServer::on_http
, this, ::_1));
}
/**
*
*
**/
void run(uint16_t port)
{
// listen on specified port
_server.listen(port);
// Start the server accept loop
_server.start_accept();
// Start the ASIO io_service run loop
try
{
_server.run();
}
catch (const std::exception & e)
{
std::cout << e.what() << std::endl;
}
catch (websocketpp::lib::error_code e)
{
std::cout << e.message() << std::endl;
}
catch (...)
{
std::cout << "other exception" << std::endl;
}
}
void on_http(connection_hdl hdl)
{
server::connection_ptr con = _server.get_con_from_hdl(hdl);
std::stringstream out;
out << "<!doctype html><html><body>Request: "
<< con->get_resource()
<< "</body></html>";
con->set_status(websocketpp::http::status_code::ok);
con->set_body(out.str());
std::cout << "Sorry, Websocket Connections only." << std::endl;
}
void on_open(connection_hdl hdl)
{
std::cout << "on_open" <<std::endl;
_connections.insert(hdl);
// unique_lock<mutex> lock(_action_lock);
// //std::cout << "on_open" << std::endl;
// _actions.push(action(SIGNIN,hdl));
// lock.unlock();
// _action_cond.notify_one();
}
void on_close(connection_hdl hdl)
{
std::cout << "on_close" <<std::endl;
_connections.erase(hdl);
// unique_lock<mutex> lock(_action_lock);
// //std::cout << "on_close" << std::endl;
// _actions.push(action(SIGNOUT,hdl));
// lock.unlock();
// _action_cond.notify_one();
}
void on_message(connection_hdl hdl, server::message_ptr msg)
{
try
{
std::cout << "on_message" <<std::endl;
std::string incoming = msg->get_payload();
std::cout << incoming <<std::endl;
struct action newAction = parseMessage(incoming, hdl);
std::cout << "Message parsed" << std::endl;
process_action(newAction);
}
catch (std::string e)
{
std::cout << e << std::endl;
}
catch (std::exception& e)
{
std::cout << e.what() << std::endl;
}
// // queue message up for sending by processing thread
// unique_lock<mutex> lock(_action_lock);
// //std::cout << "on_message" << std::endl;
// _actions.push(action(MESSAGE,msg));
// lock.unlock();
// _action_cond.notify_one();
}
void process_action(struct action a)
{
if (a.type == SUBSCRIBE)
{
std::cout << "Adding Client to Subscriberlists." <<std::endl;
for (std::vector<subscribe_topic>::iterator it = a.topics.begin()
; it != a.topics.end(); ++it)
{
if(*it == SOURCES) _source_subs.insert(a.hdl);
else if(*it == GLOBAL) _global_subs.insert(a.hdl);
else if(*it == REFERENCE) _reference_subs.insert(a.hdl);
else if(*it == MASTERLEVEL) _masterlevel_subs.insert(a.hdl);
else if(*it == SOURCELEVEL) _sourcelevel_subs.insert(a.hdl);
else if(*it == LOUDSPEAKERLEVEL) _loudspeakerlevel_subs.insert(a.hdl);
}
}
if (a.type == UNSUBSCRIBE)
{
std::cout << "Removing Client from Subscriberlists." <<std::endl;
for (std::vector<subscribe_topic>::iterator it = a.topics.begin()
; it != a.topics.end(); ++it)
{
if(*it == SOURCES) _source_subs.erase(a.hdl);
else if(*it == GLOBAL) _global_subs.erase(a.hdl);
else if(*it == REFERENCE) _reference_subs.erase(a.hdl);
else if(*it == MASTERLEVEL) _masterlevel_subs.erase(a.hdl);
else if(*it == SOURCELEVEL) _sourcelevel_subs.erase(a.hdl);
else if(*it == LOUDSPEAKERLEVEL) _loudspeakerlevel_subs.erase(a.hdl);
}
}
if (a.type == MESSAGE)
{
for (std::map<std::string, map_value>::iterator it = a.load.begin()
; it != a.load.end(); ++it)
{
std::cout << it->first << " : ";
if (it->second.type == BOOL)
std::cout << it->second.getValue<bool>() << std::endl;
if (it->second.type == STRING)
std::cout << it->second.getValue<std::string>() << std::endl;
if (it->second.type == INTEGER)
std::cout << it->second.getValue<int>() << std::endl;
if (it->second.type == DOUBLE)
std::cout << it->second.getValue<double>() << std::endl;
}
}
}
// void process_messages()
// {
// while(1)
// {
// unique_lock<mutex> lock(_action_lock);
// while(_actions.empty()) _action_cond.wait(lock);
// action a = _actions.front();
// _actions.pop();
// lock.unlock();
// if (a.type == SIGNIN) //add new Client and send it the current scene
// {
// unique_lock<mutex> lock(_connection_lock);
// _connections.insert(a.hdl);
// //TODO: wait until client sends
// //add the client to the corresponding subscriber lists
// for (std::vector<subscribe_topic>::iterator it = a.topics.begin()
// ; it != a.topics.end(); ++it)
// {
// }
// //send current scene to new Subscriber
// _server.send(a.hdl,initial_scene);
// lock.unlock();
// }
// else if (a.type == SIGNOUT) //remove client from list
// {
// unique_lock<mutex> lock(_connection_lock);
// _connections.erase(a.hdl);
// lock.unlock();
// }
// else if (a.type == MESSAGE) //store changes to scene in queue
// {
// unique_lock<mutex> lock(_msg_lock);
// _msgs.push(a.msg);
// lock.unlock();
// }
// else
// {
// // undefined.
// }
// }
// }
// void update_scene()
// {
// while (1)
// {
// server::message_ptr messageToSend;
// //std::string teststring = "This is a Test.";
// //messageToSend->set_payload(teststring);
// //wait for 100 ms
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
// unique_lock<mutex> lock(_connection_lock);
// con_list::iterator it;
// for (it = _connections.begin(); it != _connections.end(); ++it)
// {
// _server.send(*it,messageToSend);
// }
// }
// }
private:
typedef std::set<connection_hdl,std::owner_less<connection_hdl>> con_list;
server _server;
con_list _connections;
std::queue<action> _actions;
std::queue<server::message_ptr> _msgs;
mutex _action_lock;
mutex _connection_lock;
mutex _msg_lock;
condition_variable _action_cond;
//For every Subscribe topic, theres a seperate list containing all clients
//who subscribed to that topic
con_list _source_subs;
con_list _global_subs;
con_list _reference_subs;
con_list _masterlevel_subs;
con_list _sourcelevel_subs;
con_list _loudspeakerlevel_subs;
};
int main()
{
try
{
::scenefile = "enviroment.json";
SubscribeServer server;
// Start a thread to run the processing loop
// thread processing_t(bind(&SubscribeServer::process_messages,&server));
// Start a thread to run the update loop
// thread update_t(bind(&SubscribeServer::update_scene,&server));
// Run the asio loop with the main thread
std::cout << "TestServer running..." << std::endl;
server.run(9002);
// processing_t.join();
// update_t.join();
}
catch (std::exception & e)
{
std::cout << e.what() << std::endl;
}
}
// Settings for Vim (http://www.vim.org/), please do not remove:
// vim:softtabstop=2:shiftwidth=2:expandtab:textwidth=80:cindent