Horizon Official Technical Documentation
NetworkThread.hpp
Go to the documentation of this file.
1/***************************************************
2 * _ _ _ *
3 * | | | | (_) *
4 * | |_| | ___ _ __ _ _______ _ __ *
5 * | _ |/ _ \| '__| |_ / _ \| '_ \ *
6 * | | | | (_) | | | |/ / (_) | | | | *
7 * \_| |_/\___/|_| |_/___\___/|_| |_| *
8 ***************************************************
9 * This file is part of Horizon (c).
10 *
11 * Copyright (c) 2019 Sagun K. (sagunxp@gmail.com).
12 * Copyright (c) 2019 Horizon Dev Team.
13 *
14 * Base Author - Sagun K. (sagunxp@gmail.com)
15 *
16 * This library is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation, either version 3 of the License, or
19 * (at your option) any later version.
20 *
21 * This library is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with this library. If not, see <http://www.gnu.org/licenses/>.
28 **************************************************/
29
30#ifndef HORIZON_NETWORKING_NETWORKTHREAD_HPP
31#define HORIZON_NETWORKING_NETWORKTHREAD_HPP
32
34
35#include <thread>
36#include <atomic>
37#include <mutex>
38#include <cassert>
39#include <boost/asio/ip/tcp.hpp>
40#include <boost/asio/deadline_timer.hpp>
41
42using boost::asio::ip::tcp;
43
44namespace Horizon
45{
46namespace Networking
47{
53template <class SocketType>
55{
56 typedef std::vector<std::shared_ptr<SocketType>> SocketContainer;
57public:
60 {
61 // Constructor
62 }
63
69 {
70 finalize();
71 }
72
76 virtual void finalize()
77 {
78 _finalizing.exchange(true);
79 }
80
81 void join()
82 {
83 try {
84 _thread->join();
85 } catch (std::system_error &error) {
86 HLog(error) << "Error joining network thread2: " << error.what();
87 }
88 }
89
94 virtual bool start(int segment_number = 1)
95 {
96 if (_thread != nullptr)
97 return false;
98
99 _segment_number = segment_number;
100 _thread.reset(new std::thread(&NetworkThread::run, this));
101 return true;
102 }
103
110 virtual void add_socket(std::shared_ptr<SocketType> sock)
111 {
112 std::lock_guard<std::mutex> lock(_new_socket_queue_lock);
113
114 _new_socket_queue.push_back(sock); // Add socket to queue.
115 }
116
123 std::shared_ptr<tcp::socket> get_new_socket() { return std::make_shared<tcp::socket>(_io_context); }
124
130 int32_t connection_count() const { return _connections; }
131
136 bool is_finalizing() { return _finalizing; }
137protected:
143 virtual void run()
144 {
145 _update_timer.expires_from_now(boost::posix_time::microseconds(500));
146 _update_timer.async_wait(std::bind(&NetworkThread<SocketType>::update, this));
147
148 _io_context.run();
149
150 _new_socket_queue.clear();
151 _active_sockets.clear();
152 }
153
154 virtual void on_socket_removed(std::shared_ptr<SocketType> sock) = 0;
155 virtual void on_socket_added(std::shared_ptr<SocketType> sock) = 0;
156
163 virtual void update()
164 {
165 _update_timer.expires_from_now(boost::posix_time::microseconds(500));
166 _update_timer.async_wait(std::bind(&NetworkThread<SocketType>::update, this));
167
169
170 // The following code removes all non-active from the active sockets list by shifting all
171 // active sockets to the left and then erasing the extra
172 _active_sockets.erase(std::remove_if(_active_sockets.begin(), _active_sockets.end(),
173 [this] (std::shared_ptr<SocketType> sock)
174 {
175 if (!sock->update() || is_finalizing()) {
176
177 if (sock->is_open())
178 sock->close_socket();
179
180 on_socket_removed(sock);
181
182 --_connections;
183
184 HLog(info) << "Socket closed in networking thread " << (void *) (_thread.get()) << " (Connections: " << _connections << ")";
185
186 return true;
187 }
188
189 return false;
190 }), _active_sockets.end());
191
192 if (is_finalizing()) {
193 _io_context.stop();
194 _finalizing.exchange(false);
195 HLog(info) << "Network thread " << (void *) (_thread.get()) << " has been finalized.";
196 }
197 }
198
206 virtual void add_new_sockets()
207 {
208 if (is_finalizing())
209 return;
210
211 std::lock_guard<std::mutex> lock(_new_socket_queue_lock);
212
213 if (_new_socket_queue.empty())
214 return;
215
216 for (std::shared_ptr<SocketType> sock : _new_socket_queue) {
217 if (sock->is_open()) {
218 _active_sockets.push_back(sock);
219 // Start receiving from the socket.
220 sock->start();
221
222 on_socket_added(sock);
223
224 ++_connections; // Increment network connections.
225
226 HLog(trace) << "A new socket has been added to network thread " << (void *) (_thread.get()) << " (Connections: " << _connections << ")";
227 }
228 }
229
230 _new_socket_queue.clear();
231 }
232
233protected:
235private:
236 int _segment_number{1};
237 std::atomic<int32_t> _connections;
238 std::atomic<bool> _finalizing;
239
240 std::unique_ptr<std::thread> _thread;
241
243
245
246 boost::asio::io_context _io_context;
247 boost::asio::deadline_timer _update_timer;
248};
249}
250}
251
252#endif // HORIZON_NETWORKING_NETWORKTHREAD_HPP
#define HLog(type)
Definition: Logger.hpp:122
A Network Thread object that handles a number of sockets.
Definition: NetworkThread.hpp:55
virtual void on_socket_added(std::shared_ptr< SocketType > sock)=0
std::atomic< int32_t > _connections
Definition: NetworkThread.hpp:237
void join()
Definition: NetworkThread.hpp:81
SocketContainer _new_socket_queue
Definition: NetworkThread.hpp:242
SocketContainer _active_sockets
Definition: NetworkThread.hpp:234
std::vector< std::shared_ptr< SocketType > > SocketContainer
Definition: NetworkThread.hpp:56
virtual void on_socket_removed(std::shared_ptr< SocketType > sock)=0
std::mutex _new_socket_queue_lock
Definition: NetworkThread.hpp:244
boost::asio::deadline_timer _update_timer
Definition: NetworkThread.hpp:247
std::shared_ptr< tcp::socket > get_new_socket()
Gets a socket for a new connection.
Definition: NetworkThread.hpp:123
virtual bool start(int segment_number=1)
Initializes the network thread and runs.
Definition: NetworkThread.hpp:94
NetworkThread()
Definition: NetworkThread.hpp:58
std::atomic< bool > _finalizing
Definition: NetworkThread.hpp:238
virtual void finalize()
Halts the IO Service and marks the network thread as stopped.
Definition: NetworkThread.hpp:76
boost::asio::io_context _io_context
Definition: NetworkThread.hpp:246
virtual ~NetworkThread()
Destructor of the network thread, performs a clean network finalisation routine before deleting.
Definition: NetworkThread.hpp:68
virtual void update()
Updates the network thread and schedules a recursive call to itself.
Definition: NetworkThread.hpp:163
virtual void add_new_sockets()
Processess the new socket queue.
Definition: NetworkThread.hpp:206
int32_t connection_count() const
Gets the total number of network connections or sockets handled by this network thread.
Definition: NetworkThread.hpp:130
virtual void add_socket(std::shared_ptr< SocketType > sock)
Adds a new socket to a queue that is processed frequently within this network thread.
Definition: NetworkThread.hpp:110
virtual void run()
Run the I/O Service loop within this network thread.
Definition: NetworkThread.hpp:143
std::unique_ptr< std::thread > _thread
Definition: NetworkThread.hpp:240
int _segment_number
Definition: NetworkThread.hpp:236
bool is_finalizing()
Issues the status of network thread whether it is finalizing or not.
Definition: NetworkThread.hpp:136
Definition: Element.hpp:7