23 #include "StreamSocket.h" 26 #include <Atlas/Codec.h> 27 #include <Atlas/Net/Stream.h> 28 #include <Atlas/Objects/Encoder.h> 32 static const int NEGOTIATE_TIMEOUT_SECONDS = 5;
36 StreamSocket::StreamSocket(io_service& io_service,
37 const std::string& client_name,
38 Atlas::Bridge& bridge,
39 Callbacks callbacks) :
40 m_io_service(io_service),
42 _callbacks(
std::move(callbacks)),
43 mWriteBuffer(new boost::asio::streambuf()),
44 mSendBuffer(new boost::asio::streambuf()),
45 mInStream(&mReadBuffer),
46 mOutStream(mWriteBuffer.get()),
49 _sc(new
Atlas::Net::StreamConnect(client_name, mInStream, mOutStream)),
50 _negotiateTimer(io_service), _connectTimer(io_service),
53 m_is_connected(false) {
56 StreamSocket::~StreamSocket() =
default;
62 void StreamSocket::startNegotiation() {
63 auto self(this->shared_from_this());
64 _negotiateTimer.expires_from_now(
65 std::chrono::seconds(NEGOTIATE_TIMEOUT_SECONDS));
66 _negotiateTimer.async_wait([
this,
self](
const boost::system::error_code& ec) {
70 if (_callbacks.stateChanged) {
71 debug() <<
"Client disconnected because of negotiation timeout.";
73 _callbacks.stateChanged(DISCONNECTING);
78 _callbacks.stateChanged(NEGOTIATE);
86 Atlas::Negotiate::State StreamSocket::negotiate() {
90 if (_sc->getState() == Atlas::Negotiate::IN_PROGRESS) {
91 return _sc->getState();
95 if (_sc->getState() == Atlas::Negotiate::FAILED) {
96 return _sc->getState();
100 _negotiateTimer.cancel();
103 m_codec = _sc->getCodec(_bridge);
108 if (m_codec ==
nullptr) {
109 error() <<
"Could not create codec during negotiation.";
110 return Atlas::Negotiate::FAILED;
113 m_encoder = std::make_unique<Atlas::Objects::ObjectsEncoder>(*m_codec);
116 m_codec->streamBegin();
118 _callbacks.stateChanged(CONNECTED);
120 return Atlas::Negotiate::SUCCEEDED;
void detach()
Detaches the callbacks.
Methods that are used as callbacks.
Atlas::Codec & getCodec()
Gets the codec object.
Atlas::Objects::ObjectsEncoder & getEncoder()
Gets the encoder object.