eris 1.4.0
A WorldForge client library.
StreamSocket_impl.h
1/*
2 Copyright (C) 2014 Erik Ogenvik
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software Foundation,
16 Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19#ifndef STREAMSOCKET_IMPL_H_
20#define STREAMSOCKET_IMPL_H_
21
22#ifdef HAVE_CONFIG_H
23#include "config.h"
24#endif
25
26#include "StreamSocket.h"
27
28#include <Atlas/Codec.h>
29
30static const int CONNECT_TIMEOUT_SECONDS = 5;
31
32namespace Eris
33{
34
35template<typename ProtocolT>
36AsioStreamSocket<ProtocolT>::AsioStreamSocket(
37 boost::asio::io_service& io_service, const std::string& client_name,
38 Atlas::Bridge& bridge, StreamSocket::Callbacks callbacks) :
39 StreamSocket(io_service, client_name, bridge, std::move(callbacks)),
40 m_socket(io_service)
41{
42}
43
44template<typename ProtocolT>
45AsioStreamSocket<ProtocolT>::~AsioStreamSocket()
46{
47 if (m_socket.is_open()) {
48 if (m_is_connected) {
49 try {
50 m_socket.shutdown(ProtocolT::socket::shutdown_both);
51 } catch (const std::exception& e) {
52 warning() << "Error when shutting down socket: " << e.what();
53 }
54 }
55 try {
56 m_socket.close();
57 } catch (const std::exception&) {
58 warning() << "Error when closing socket.";
59 }
60 }
61}
62
63template<typename ProtocolT>
64typename ProtocolT::socket& AsioStreamSocket<ProtocolT>::getAsioSocket()
65{
66 return m_socket;
67}
68
69template<typename ProtocolT>
70ResolvableAsioStreamSocket<ProtocolT>::ResolvableAsioStreamSocket(
71 boost::asio::io_service& io_service, const std::string& client_name,
72 Atlas::Bridge& bridge, StreamSocket::Callbacks callbacks) :
73 AsioStreamSocket<ProtocolT>(io_service, client_name, bridge, std::move(callbacks)),
74 m_resolver(io_service)
75{
76}
77
78
79
80template<typename ProtocolT>
81void ResolvableAsioStreamSocket<ProtocolT>::connectWithQuery(
82 const typename ProtocolT::resolver::query& query)
83{
84 auto self(this->shared_from_this());
85 m_resolver.async_resolve(query,
86 [&, self](const boost::system::error_code& ec, typename ProtocolT::resolver::iterator iterator) {
87 if (this->_callbacks.stateChanged) {
88 if (!ec && iterator != typename ProtocolT::resolver::iterator()) {
89 this->connect(*iterator);
90 } else {
91 this->_callbacks.stateChanged(StreamSocket::CONNECTING_FAILED);
92 }
93 }
94 });
95}
96
97template<typename ProtocolT>
98void AsioStreamSocket<ProtocolT>::connect(
99 const typename ProtocolT::endpoint& endpoint)
100{
101 _connectTimer.expires_from_now(
102 std::chrono::seconds(CONNECT_TIMEOUT_SECONDS));
103 auto self(this->shared_from_this());
104 _connectTimer.async_wait([&, self](boost::system::error_code ec) {
105 if (!ec) {
106 if (_callbacks.stateChanged) {
107 _callbacks.stateChanged(CONNECTING_TIMEOUT);
108 }
109 }
110 });
111
112 m_socket.async_connect(endpoint,
113 [this, self](boost::system::error_code ec) {
114 if (_callbacks.stateChanged) {
115 if (!ec) {
116 this->_connectTimer.cancel();
117 m_is_connected = true;
118 this->startNegotiation();
119 } else {
120 _callbacks.stateChanged(CONNECTING_FAILED);
121 }
122 }
123 });
124}
125
126template<typename ProtocolT>
127void AsioStreamSocket<ProtocolT>::negotiate_read()
128{
129 auto self(this->shared_from_this());
130 m_socket.async_read_some(mReadBuffer.prepare(read_buffer_size),
131 [this, self](boost::system::error_code ec, std::size_t length)
132 {
133 if (_callbacks.stateChanged) {
134 if (!ec)
135 {
136 mReadBuffer.commit(length);
137 if (length > 0) {
138 auto negotiateResult = this->negotiate();
139 if (negotiateResult == Atlas::Negotiate::FAILED) {
140 m_socket.close();
141 _callbacks.stateChanged(NEGOTIATE_FAILED);
142 return;
143 }
144 }
145
146 //If the _sc instance is removed we're done with negotiation and should start the main loop.
147 if (_sc == nullptr) {
148 this->write();
149 this->do_read();
150 } else {
151 this->negotiate_write();
152 this->negotiate_read();
153 }
154 } else {
155 if (ec != boost::asio::error::operation_aborted) {
156 _callbacks.stateChanged(CONNECTION_FAILED);
157 } else {
158 warning() << "Error when reading from socket while negotiating: (" << ec << ") " << ec.message();
159 }
160 }
161 }
162 });
163}
164
165template<typename ProtocolT>
166void AsioStreamSocket<ProtocolT>::do_read()
167{
168 auto self(this->shared_from_this());
169 m_socket.async_read_some(mReadBuffer.prepare(read_buffer_size),
170 [this, self](boost::system::error_code ec, std::size_t length)
171 {
172 if (_callbacks.stateChanged) {
173 if (!ec)
174 {
175 mReadBuffer.commit(length);
176 m_codec->poll();
177 _callbacks.dispatch();
178 this->do_read();
179 } else {
180 if (ec != boost::asio::error::operation_aborted) {
181 _callbacks.stateChanged(CONNECTION_FAILED);
182 } else {
183 warning() << "Error when reading from socket: (" << ec << ") " << ec.message();
184 }
185 }
186 }
187 });
188}
189
190template<typename ProtocolT>
192{
193 if (mWriteBuffer->size() != 0) {
194 if (mIsSending) {
195 //We're already sending in the background.
196 //Make that we should send again once we've completed sending.
197 mShouldSend = true;
198 return;
199 }
200
201 mShouldSend = false;
202
203 //We'll use a self reference to make sure that the client isn't deleted while sending.
204 auto self(this->shared_from_this());
205 //Swap places between writing buffer and sending buffer, and attach new write buffer to the out stream.
206 std::swap(mWriteBuffer, mSendBuffer);
207 mOutStream.rdbuf(mWriteBuffer.get());
208 mIsSending = true;
209
210 async_write(m_socket, mSendBuffer->data(),
211 [this, self](boost::system::error_code ec, std::size_t length)
212 {
213 mSendBuffer->consume(length);
214 mIsSending = false;
215 if (!ec) {
216 //Is there data queued for transmission which we should send right away?
217 if (mShouldSend) {
218 this->write();
219 }
220 } else {
221 if (ec != boost::asio::error::operation_aborted) {
222 if (_callbacks.stateChanged) {
223 _callbacks.stateChanged(CONNECTION_FAILED);
224 }
225 } else {
226 warning() << "Error when writing to socket: (" << ec << ") " << ec.message();
227 }
228 }
229 });
230 }
231
232}
233
234template<typename ProtocolT>
235void AsioStreamSocket<ProtocolT>::negotiate_write()
236{
237
238 if (mWriteBuffer->size() != 0) {
239 auto self(this->shared_from_this());
240 boost::asio::async_write(m_socket, mWriteBuffer->data(),
241 [this, self](boost::system::error_code ec, std::size_t length)
242 {
243 if (!ec)
244 {
245 this->mWriteBuffer->consume(length);
246 } else {
247 warning() << "Error when writing to socket while negotiating: (" << ec << ") " << ec.message();
248 }
249 });
250 }
251}
252
253}
254
255#endif /* STREAMSOCKET_IMPL_H_ */
256
Template specialization which uses boost::asio sockets.
Definition: StreamSocket.h:185
Definition: Account.cpp:33