-
Notifications
You must be signed in to change notification settings - Fork 86
/
Copy pathsession_manager.hpp
267 lines (227 loc) · 8.22 KB
/
session_manager.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
//
// session_manager.hpp
//
// Copyright (c) 2019 2020 Andrea Bondavalli. All rights reserved.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
#ifndef _SESSION_MANAGER_HPP_
#define _SESSION_MANAGER_HPP_
#include <future>
#include <list>
#include <map>
#include <shared_mutex>
#include <thread>
#include <chrono>
#include "config.hpp"
#include "driver_interface.hpp"
#include "browser.hpp"
#include "igmp.hpp"
#include "sap.hpp"
struct StreamSource {
uint8_t id{0};
bool enabled{false};
std::string name;
std::string io;
uint32_t max_samples_per_packet{0};
std::string codec;
std::string address;
uint8_t ttl{0};
uint8_t payload_type{0};
uint8_t dscp{0};
bool refclk_ptp_traceable{false};
std::vector<uint8_t> map;
};
struct StreamSink {
uint8_t id;
std::string name;
std::string io;
bool use_sdp{false};
std::string source;
std::string sdp;
uint32_t delay{0};
bool ignore_refclk_gmid{false};
std::vector<uint8_t> map;
};
struct SinkStreamStatus {
bool is_rtp_seq_id_error{false};
bool is_rtp_ssrc_error{false};
bool is_rtp_payload_type_error{false};
bool is_rtp_sac_error{false};
bool is_receiving_rtp_packet{false};
bool is_muted{false};
bool is_some_muted{false};
bool is_all_muted{false};
int min_time{0};
};
struct PTPConfig {
uint8_t domain{0};
uint8_t dscp{0};
};
struct PTPStatus {
std::string status;
std::string gmid;
int32_t jitter{0};
};
struct StreamInfo {
TRTP_stream_info stream;
uint64_t handle{0};
bool enabled{false};
bool refclk_ptp_traceable{false};
bool ignore_refclk_gmid{false};
std::string io;
bool sink_use_sdp{true};
std::string sink_source;
std::string sink_sdp;
uint32_t session_id{0};
uint32_t session_version{0};
SDPOrigin origin;
};
class SessionManager {
public:
constexpr static uint8_t stream_id_max = 63;
static std::shared_ptr<SessionManager> create(
std::shared_ptr<DriverManager> driver,
std::shared_ptr<Browser> browser,
std::shared_ptr<Config> config);
SessionManager() = delete;
SessionManager(const SessionManager&) = delete;
SessionManager& operator=(const SessionManager&) = delete;
virtual ~SessionManager() = default;
// session manager interface
bool init() {
if (!running_) {
running_ = true;
g_session_version = std::chrono::system_clock::now().time_since_epoch() /
std::chrono::seconds(1);
// to have an increasing session versions between restarts
res_ = std::async(std::launch::async, &SessionManager::worker, this);
}
return true;
}
bool terminate() {
if (running_) {
running_ = false;
auto ret = res_.get();
for (const auto& source : get_sources()) {
remove_source(source.id);
}
for (const auto& sink : get_sinks()) {
remove_sink(sink.id);
}
return ret;
}
return true;
}
std::error_code add_source(const StreamSource& source);
std::error_code get_source(uint8_t id, StreamSource& source) const;
std::list<StreamSource> get_sources() const;
std::error_code get_source_sdp(uint32_t id, std::string& sdp) const;
std::error_code remove_source(uint32_t id);
uint8_t get_source_id(const std::string& name) const;
enum class SourceObserverType { add_source, remove_source, update_source };
using SourceObserver = std::function<
bool(uint8_t id, const std::string& name, const std::string& sdp)>;
void add_source_observer(SourceObserverType type, const SourceObserver& cb);
enum class SinkObserverType { add_sink, remove_sink };
using SinkObserver = std::function<
bool(uint8_t id, const std::string& name)>;
void add_sink_observer(SinkObserverType type, const SinkObserver& cb);
using PtpStatusObserver = std::function<bool(const std::string& status)>;
void add_ptp_status_observer(const PtpStatusObserver& cb);
std::error_code add_sink(const StreamSink& sink);
std::error_code get_sink(uint8_t id, StreamSink& sink) const;
std::list<StreamSink> get_sinks() const;
std::error_code get_sink_status(uint32_t id, SinkStreamStatus& status) const;
std::error_code remove_sink(uint32_t id);
uint8_t get_sink_id(const std::string& name) const;
std::error_code set_ptp_config(const PTPConfig& config);
std::error_code set_driver_config(std::string_view name,
uint32_t value) const;
void get_ptp_config(PTPConfig& config) const;
void get_ptp_status(PTPStatus& status) const;
bool load_status();
bool save_status() const;
size_t process_sap();
protected:
constexpr static const char ptp_primary_mcast_addr[] = "224.0.1.129";
constexpr static const char ptp_pdelay_mcast_addr[] = "224.0.1.107";
std::list<StreamSink> get_updated_sinks(
const std::list<RemoteSource>& sources_list);
void update_sinks();
void on_add_source(const StreamSource& source, const StreamInfo& info);
void on_remove_source(const StreamInfo& info);
void on_add_sink(const StreamSink& sink, const StreamInfo& info);
void on_remove_sink(const StreamInfo& info);
void on_ptp_status_changed(const std::string& status) const;
void on_update_sources();
std::string get_removed_source_sdp_(uint32_t id,
uint32_t src_addr,
uint32_t session_id,
uint32_t session_version) const;
std::string get_source_sdp_(uint32_t id, const StreamInfo& info) const;
StreamSource get_source_(uint8_t id, const StreamInfo& info) const;
StreamSink get_sink_(uint8_t id, const StreamInfo& info) const;
bool sink_is_still_valid(const std::string sdp,
const std::list<RemoteSource> sources_list) const;
bool parse_sdp(const std::string& sdp, StreamInfo& info) const;
bool worker();
// singleton, use create() to build
explicit SessionManager(std::shared_ptr<DriverManager> driver,
std::shared_ptr<Browser> browser,
std::shared_ptr<Config> config)
: browser_(browser), driver_(driver), config_(config) {
ptp_config_.domain = config->get_ptp_domain();
ptp_config_.dscp = config->get_ptp_dscp();
};
private:
std::shared_ptr<Browser> browser_;
std::shared_ptr<DriverManager> driver_;
std::shared_ptr<Config> config_;
std::future<bool> res_;
std::atomic_bool running_{false};
/* current sources */
std::map<uint8_t /* id */, StreamInfo> sources_;
std::map<std::string, uint8_t /* id */> source_names_;
mutable std::shared_mutex sources_mutex_;
/* current sinks */
std::map<uint8_t /* id */, StreamInfo> sinks_;
std::map<std::string, uint8_t /* id */> sink_names_;
mutable std::shared_mutex sinks_mutex_;
/* current announced sources */
std::map<uint32_t /* msg_id_hash */,
std::tuple<uint32_t /* src_addr */,
uint32_t /* session_id */,
uint32_t /* session_version */> >
announced_sources_;
/* number of deletions sent for a a deleted source */
std::unordered_map<uint32_t /* msg_id_hash */, int /* count */>
deleted_sources_count_;
PTPConfig ptp_config_;
PTPStatus ptp_status_;
mutable std::shared_mutex ptp_mutex_;
std::list<SourceObserver> add_source_observers_;
std::list<SourceObserver> remove_source_observers_;
std::list<SourceObserver> update_source_observers_;
std::list<PtpStatusObserver> ptp_status_observers_;
std::list<SinkObserver> add_sink_observers_;
std::list<SinkObserver> remove_sink_observers_;
std::list<SinkObserver> update_sink_observers_;
SAP sap_{config_->get_sap_mcast_addr()};
IGMP igmp_;
uint32_t last_sink_update_{0};
/* used to handle session versioning */
inline static std::atomic<uint32_t> g_session_version{0};
};
#endif