Horizon Official Technical Documentation
Socket.hpp
Go to the documentation of this file.
1/***************************************************
2 * _ _ _ *
3 * | | | | (_) *
4 * | |_| | ___ _ __ _ _______ _ __ *
5 * | _ |/ _ \| '__| |_ / _ \| '_ \ *
6 * | | | | (_) | | | |/ / (_) | | | | *
7 * \_| |_/\___/|_| |_/___\___/|_| |_| *
8 ***************************************************
9 * This file is part of Horizon (c).
10 *
11 * Copyright (c) 2019 Sagun K. (sagunxp@gmail.com).
12 * Copyright (c) 2019 Horizon Dev Team.
13 *
14 * Base Author - Sagun K. (sagunxp@gmail.com)
15 *
16 * This library is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation, either version 3 of the License, or
19 * (at your option) any later version.
20 *
21 * This library is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with this library. If not, see <http://www.gnu.org/licenses/>.
28 **************************************************/
29
30#ifndef HORIZON_NETWORKING_SOCKET_HPP
31#define HORIZON_NETWORKING_SOCKET_HPP
32
34#include "Buffer/ByteBuffer.hpp"
36
37#include <atomic>
38#include <queue>
39#include <memory>
40#include <functional>
41#include <future>
42#include <type_traits>
43#include <boost/asio/ip/tcp.hpp>
44#include <boost/bind/bind.hpp>
45#include <boost/asio/write.hpp>
46#include <boost/asio/use_future.hpp>
47
48#include <iostream>
49
50using boost::asio::ip::tcp;
51
52#define READ_BLOCK_SIZE 0x1000
53#define BOOST_BIND_GLOBAL_PLACEHOLDERS 1
54
55namespace Horizon
56{
57namespace Networking
58{
80template <class SocketType>
81class Socket : public std::enable_shared_from_this<SocketType>
82{
83public:
84 explicit Socket(uint64_t socket_id)
85 : _socket_id(socket_id), _socket(nullptr), _read_buffer(), _closed(false), _closing(false), _is_writing_async(false)
86 {
88 }
89
90 explicit Socket(uint64_t socket_id, std::shared_ptr<tcp::socket> socket)
91 : _socket_id(socket_id), _socket(socket), _remote_ip_address(_socket->remote_endpoint().address().to_string()),
92 _remote_port(_socket->remote_endpoint().port()), _read_buffer(), _closed(false), _closing(false), _is_writing_async(false)
93 {
95 }
96
97 virtual ~Socket()
98 {
99 boost::system::error_code error;
100 _socket->close(error);
101 }
102
107 virtual void start() = 0;
108
114 virtual bool update()
115 {
116 if (_closed)
117 return false;
118
120 return true;
121 }
122
123 while (handle_queue())
124 ;
125
126 return true;
127 }
128
129 /* Socket Id */
130 uint64_t get_socket_id() { return _socket_id; }
131
132 /* Remote IP and Port */
133 std::string &remote_ip_address() { return _remote_ip_address; }
134 uint16_t remote_port() const { return _remote_port; }
135
141 {
142 if (!is_open())
143 return;
144
147
148 _socket->async_read_some(boost::asio::buffer(_read_buffer.get_write_pointer(), _read_buffer.remaining_space()),
149 boost::bind(&Socket<SocketType>::read_handler_internal, this, boost::placeholders::_1, boost::placeholders::_2));
150 }
151
156 void async_read_with_callback(ByteBuffer &buf, void (Socket<SocketType>::*)(boost::system::error_code, std::size_t))
157 {
158 if (!is_open())
159 return;
160
163
164 _socket->async_read_some(boost::asio::buffer(buf.get_write_pointer(), buf.remaining_space()),
165 boost::bind(&Socket<SocketType>::read_handler_internal, this, boost::placeholders::_1, boost::placeholders::_2));
166 }
167
168 virtual void queue_buffer(ByteBuffer &&buffer) { _write_queue.push(std::move(buffer)); }
169
170 bool is_open() { return !_closed && !_closing; }
171
177 {
178 boost::system::error_code socket_error;
179
180 if (_closed.exchange(true))
181 return;
182
183 // Finalise the child-class socket first.
184 on_close();
185
189 // Shutdown the socket.
190 _socket->shutdown(boost::asio::socket_base::shutdown_send, socket_error);
191 // Close the socket.
192 _socket->close();
193
194 if (socket_error) {
195 HLog(error) << "Error when shutting down socket from IP " <<
196 remote_ip_address() << "(error code:" << socket_error.value() << " - " << socket_error.message().c_str() << ")";
197 }
198 }
199
200 void delayed_close_socket() { if (_closing.exchange(true)) return; }
201
203
204protected:
205 virtual void on_close() = 0;
206 virtual void read_handler() = 0;
207 virtual void on_error() = 0;
208
214 {
216 return false;
217
218 _is_writing_async = true;
219
220 _socket->async_write_some(boost::asio::null_buffers(),
222 this, boost::placeholders::_1, boost::placeholders::_2));
223
224 return true;
225 }
226
231 void set_no_delay(bool enable)
232 {
233 boost::system::error_code error;
234 _socket->set_option(tcp::no_delay(enable), error);
235 if (error) {
236 HLog(error) << "Networking: Socket::set_no_delay: failed to set_option(boost::asio::ip::tcp::no_delay) for IP " << remote_ip_address() << " (error_code: " << error.value() << " - " << error.message().c_str() << ")";
237 }
238 }
239
246 std::size_t write_buffer_and_send(ByteBuffer &to_send, boost::system::error_code &error)
247 {
248 int16_t packet_id;
249 std::size_t bytes_to_send = to_send.active_length();
250 memcpy(&packet_id, to_send.get_read_pointer(), sizeof(int16_t));
251 std::size_t bytes_sent = _socket->write_some(boost::asio::buffer(to_send.get_read_pointer(), bytes_to_send), error);
252 return bytes_sent;
253 }
254private:
255
262 void read_handler_internal(boost::system::error_code error, size_t transferredBytes)
263 {
264 // If there is an error on the socket, we need to close it.
265 if (error) {
266 // If the socket is already closed, we don't need to do anything.
267 if (error.value() == boost::asio::error::eof) {
268 close_socket();
269 // If the connection was reset or timed out, we need to close the socket.
270 } else if (error.value() == boost::system::errc::connection_reset
271 || error.value() == boost::system::errc::timed_out) {
272 on_error();
273 close_socket();
274 // If there was any unknown error on the socket, we need to close it.
275 } else {
276 on_error();
277 close_socket();
278 }
279 // Retreive the error code and handle it accordingly.
280 switch (error.value())
281 {
282 case 2: // End of file
283 break;
284 default:
285 HLog(debug) << "Socket::read_handler_internal: " << error.value() << " (Code: " << error.message() << ").";
286 }
287 return;
288 }
289
290 if (transferredBytes > 0) {
291 // write_completed will move the write pointer forward, so we need to save the current position of the write pointer.
292 _read_buffer.write_completed(transferredBytes);
293 // read handler will process the data that has been written to the buffer.
294 read_handler();
295 }
296
297 // Invoke the next read operation.
298 async_read();
299 }
300
306 void write_handler_wrapper(boost::system::error_code /*error*/, std::size_t /*transferedBytes*/)
307 {
308 _is_writing_async = false;
309 handle_queue();
310 }
311
318 {
319 boost::system::error_code error;
320
321 if (_write_queue.empty())
322 return false;
323
324 std::shared_ptr<ByteBuffer> to_send = _write_queue.front();
325
326 //HLog(debug) << "Sent bytes: " << to_send->to_string();
327
328 std::size_t bytes_sent = write_buffer_and_send(*to_send, error);
329
333 if (error == boost::asio::error::would_block || error == boost::asio::error::try_again)
334 return async_process_queue();
335
339 if (!error && bytes_sent < to_send->active_length()) {
340 to_send->read_completed(bytes_sent);
341 return async_process_queue();
342 }
343
344 // Pop the front element.
346
347 // Close if required.
348 if (_closing && _write_queue.empty())
349 close_socket();
350
351 // Return true if the queue is not empty. Indicating that the sending operation was successful.
352 return !_write_queue.empty() && bytes_sent;
353 }
354
355 std::shared_ptr<tcp::socket> get_socket() { return _socket; }
356
357private:
358 uint64_t _socket_id;
359 std::shared_ptr<tcp::socket> _socket;
361 uint16_t _remote_port;
364 std::atomic<bool> _closed;
365 std::atomic<bool> _closing;
367};
368}
369}
370
371#endif //HORIZON_NETWORKING_SOCKET_HPP
#define HLog(type)
Definition: Logger.hpp:122
#define READ_BLOCK_SIZE
Definition: Socket.hpp:52
Definition: ByteBuffer.hpp:78
void ensure_free_space()
Definition: ByteBuffer.hpp:350
size_t remaining_space() const
Definition: ByteBuffer.hpp:334
void resize(size_t new_size)
Definition: ByteBuffer.hpp:357
uint8_t * get_write_pointer()
Definition: ByteBuffer.hpp:328
uint8_t * get_read_pointer()
Definition: ByteBuffer.hpp:327
void flush()
Definition: ByteBuffer.hpp:339
void write_completed(size_t bytes)
Definition: ByteBuffer.hpp:322
size_t active_length() const
Definition: ByteBuffer.hpp:333
A Socket object that handles a single connection.
Definition: Socket.hpp:82
ByteBuffer & get_read_buffer()
Definition: Socket.hpp:202
virtual void start()=0
Initial method invoked once from the network thread that handles the AuthSocket.
ThreadSafeQueue< ByteBuffer > _write_queue
Definition: Socket.hpp:363
virtual ~Socket()
Definition: Socket.hpp:97
uint16_t _remote_port
Definition: Socket.hpp:361
bool _is_writing_async
Definition: Socket.hpp:366
void close_socket()
Socket close operation that performs cleanups before shutting down the connection.
Definition: Socket.hpp:176
bool async_process_queue()
Socket write operation.
Definition: Socket.hpp:213
virtual void on_close()=0
Socket(uint64_t socket_id, std::shared_ptr< tcp::socket > socket)
Definition: Socket.hpp:90
ByteBuffer _read_buffer
Definition: Socket.hpp:362
uint64_t get_socket_id()
Definition: Socket.hpp:130
void async_read_with_callback(ByteBuffer &buf, void(Socket< SocketType >::*)(boost::system::error_code, std::size_t))
Asynchronous read operation with callback handler @thread NetworkThread.
Definition: Socket.hpp:156
Socket(uint64_t socket_id)
Definition: Socket.hpp:84
void delayed_close_socket()
Definition: Socket.hpp:200
void write_handler_wrapper(boost::system::error_code, std::size_t)
Write handler wrapper.
Definition: Socket.hpp:306
uint16_t remote_port() const
Definition: Socket.hpp:134
virtual void read_handler()=0
virtual void on_error()=0
void async_read()
Asynchronous read operation @thread NetworkThread.
Definition: Socket.hpp:140
void read_handler_internal(boost::system::error_code error, size_t transferredBytes)
Aysnchronous reading handler method.
Definition: Socket.hpp:262
std::size_t write_buffer_and_send(ByteBuffer &to_send, boost::system::error_code &error)
Write a message to the buffer.
Definition: Socket.hpp:246
uint64_t _socket_id
Definition: Socket.hpp:358
std::atomic< bool > _closing
Definition: Socket.hpp:365
std::string & remote_ip_address()
Definition: Socket.hpp:133
std::shared_ptr< tcp::socket > get_socket()
Definition: Socket.hpp:355
std::string _remote_ip_address
Definition: Socket.hpp:360
bool handle_queue()
Handle the queue.
Definition: Socket.hpp:317
void set_no_delay(bool enable)
Disable the Nagle Algorithm on our socket.
Definition: Socket.hpp:231
bool is_open()
Definition: Socket.hpp:170
std::atomic< bool > _closed
Definition: Socket.hpp:364
virtual void queue_buffer(ByteBuffer &&buffer)
Definition: Socket.hpp:168
std::shared_ptr< tcp::socket > _socket
After accepting, the reference count of this pointer should be 1.
Definition: Socket.hpp:359
virtual bool update()
Socket update loop called from its NetworkThread every n nanoseconds.
Definition: Socket.hpp:114
bool empty()
Definition: ThreadSafeQueue.hpp:116
std::shared_ptr< T > front()
Definition: ThreadSafeQueue.hpp:126
void push(T &&new_value)
Definition: ThreadSafeQueue.hpp:90
std::shared_ptr< T > try_pop()
Definition: ThreadSafeQueue.hpp:84
Definition: Element.hpp:7