eris 1.4.0
A WorldForge client library.
Connection.cpp
1#ifdef HAVE_CONFIG_H
2#include "config.h"
3#endif
4
5#include "Connection.h"
6
7#include "TypeInfo.h"
8#include "Log.h"
9#include "Exceptions.h"
10#include "Router.h"
11#include "Redispatch.h"
12#include "Response.h"
13#include "EventService.h"
14#include "TypeService.h"
15
16#include <Atlas/Objects/Encoder.h>
17#include <Atlas/Objects/Operation.h>
18#include <Atlas/Objects/Entity.h>
19#include <sigc++/bind.h>
20
21#include <Atlas/Codecs/Bach.h>
22
23#include <cassert>
24#include <algorithm>
25
26#define ATLAS_LOG 0
27
28using namespace Atlas::Objects::Operation;
29using Atlas::Objects::Root;
30using Atlas::Objects::Entity::RootEntity;
31using Atlas::Objects::smart_dynamic_cast;
32
33namespace Eris {
34
35
36struct ConnectionDecoder : Atlas::Objects::ObjectsDecoder {
37 Connection& m_connection;
38
39 ConnectionDecoder(Connection& connection, const Atlas::Objects::Factories& factories) :
40 ObjectsDecoder(factories), m_connection(connection) {
41 }
42
43 void objectArrived(Root obj) override {
44 m_connection.objectArrived(std::move(obj));
45 }
46};
47
48Connection::Connection(boost::asio::io_service& io_service,
49 EventService& eventService,
50 std::string clientName,
51 const std::string& host,
52 short port) :
53 BaseConnection(io_service, std::move(clientName), "game_"),
54 m_decoder(new ConnectionDecoder(*this, *_factories)),
55 _eventService(eventService),
56 _host(host),
57 _port(port),
58 m_typeService(new TypeService(*this)),
59 m_defaultRouter(nullptr),
60 m_lock(0),
61 m_info{host},
62 m_responder(new ResponseTracker) {
63 _bridge = m_decoder.get();
64}
65
66Connection::Connection(boost::asio::io_service& io_service,
67 EventService& eventService,
68 std::string clientName,
69 std::string socket) :
70 BaseConnection(io_service, std::move(clientName), "game_"),
71 m_decoder(new ConnectionDecoder(*this, *_factories)),
72 _eventService(eventService),
73 _host("local"),
74 _port(0),
75 _localSocket(std::move(socket)),
76 m_typeService(new TypeService(*this)),
77 m_defaultRouter(nullptr),
78 m_lock(0),
79 m_info{_host},
80 m_responder(new ResponseTracker) {
81 _bridge = m_decoder.get();
82}
83
84
85Connection::~Connection() {
86 // ensure we emit this before our vtable goes down, since we are the
87 // Bridge on the underlying Atlas codec, and otherwise we might get
88 // a pure virtual method call
89 hardDisconnect(true);
90}
91
92EventService& Connection::getEventService() {
93 return _eventService;
94}
95
97 if (!_localSocket.empty()) {
98 return BaseConnection::connectLocal(_localSocket);
99 }
100
102
103}
104
106 if (_status == DISCONNECTING) {
107 warning() << "duplicate disconnect on Connection that's already disconnecting";
108 return -1;
109 }
110
111 if (_status == DISCONNECTED) {
112 warning() << "called disconnect on already disconnected Connection";
113 return -1;
114 }
115
116 // This assert means that this function will always return early below
117 // where m_lock is checked. m_lock seems to be used by Account to prevent
118 // disconnecting when something is pending.
119 // FIXME Look into this.
120 assert(m_lock == 0);
121
122 if (_socket && _status == CONNECTED) {
123 //Be nice and send a Logout op to the connection when disconnecting down.
124 _socket->getEncoder().streamObjectsMessage(Logout());
125 _socket->write();
126 }
127
128 // this is a soft disconnect; it will give people a chance to do tear down and so on
129 // in response, people who need to hold the disconnect will lock() the
130 // connection, and unlock when their work is done. A timeout stops
131 // locks from preventing disconnection
133 Disconnecting.emit();
134
135 if (m_lock == 0) {
136 hardDisconnect(true);
137 return 0;
138 }
139
140 // fell through, so someone has locked =>
141 // start a disconnect timeout
142// _timeout = new Timeout(5000);
143// _timeout->Expired.connect(sigc::mem_fun(this, &Connection::onDisconnectTimeout));
144 return 0;
145}
146
147void Connection::dispatch() {
148
149 // now dispatch received ops
150 while (!m_opDeque.empty()) {
151 RootOperation op = std::move(m_opDeque.front());
152 m_opDeque.pop_front();
153 dispatchOp(op);
154 }
155
156 // finally, clean up any redispatches that fired (aka 'deleteLater')
157 m_finishedRedispatches.clear();
158}
159
160void Connection::send(const Atlas::Objects::Root& obj) {
161 if ((_status != CONNECTED) && (_status != DISCONNECTING)) {
162 error() << "called send on closed connection";
163 return;
164 }
165
166 if (!_socket) {
167 handleFailure("Connection::send: stream failed");
168 hardDisconnect(true);
169 return;
170 }
171
172#if ATLAS_LOG == 1
173 std::stringstream debugStream;
174
175 Atlas::Codecs::Bach debugCodec(debugStream, debugStream, *this /*dummy*/);
176 Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
177 debugEncoder.streamObjectsMessage(obj);
178 debugStream << std::flush;
179
180 debug() << "sending:" << debugStream.str();
181#endif
182
183 _socket->getEncoder().streamObjectsMessage(obj);
184 _socket->write();
185}
186
187void Connection::registerRouterForTo(Router* router, const std::string& toId) {
188 m_toRouters[toId] = router;
189}
190
191void Connection::unregisterRouterForTo(Router* router, const std::string& toId) {
192 assert(m_toRouters[toId] == router);
193 m_toRouters.erase(toId);
194}
195
196void Connection::registerRouterForFrom(Router* router, const std::string& fromId) {
197 m_fromRouters[fromId] = router;
198}
199
200void Connection::unregisterRouterForFrom(const std::string& fromId) {
201 m_fromRouters.erase(fromId);
202}
203
204void Connection::setDefaultRouter(Router* router) {
205 if (m_defaultRouter || !router) {
206 error() << "setDefaultRouter duplicate set or null argument";
207 return;
208 }
209
210 m_defaultRouter = router;
211}
212
213void Connection::clearDefaultRouter() {
214 m_defaultRouter = nullptr;
215}
216
218 ++m_lock;
219}
220
222 if (m_lock < 1) {
223 throw InvalidOperation("Imbalanced lock/unlock calls on Connection");
224 }
225
226 if (--m_lock == 0) {
227 switch (_status) {
228 case DISCONNECTING:
229 debug() << "Connection unlocked in DISCONNECTING, closing socket";
230 debug() << "have " << m_opDeque.size() << " ops waiting";
231 m_opDeque.clear();
232 hardDisconnect(true);
233 break;
234
235 default:
236 warning() << "Connection unlocked in spurious state : this may cause a failure later";
237 break;
238 }
239 }
240}
241
243 si = m_info;
244}
245
247 if (_status != CONNECTED) {
248 warning() << "called refreshServerInfo while not connected, ignoring";
249 return;
250 }
251
252 m_info.status = ServerInfo::QUERYING;
253 Get gt;
254 gt->setSerialno(getNewSerialno());
255 send(gt);
256}
257
258void Connection::objectArrived(Root obj) {
259#if ATLAS_LOG == 1
260 std::stringstream debugStream;
261 Atlas::Codecs::Bach debugCodec(debugStream, debugStream, *this /* dummy */);
262 Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
263 debugEncoder.streamObjectsMessage(obj);
264 debugStream << std::flush;
265
266 debug() << "received:" << debugStream.str();
267#else
268 debug() << "received op:" << obj->getParent();
269#endif
270 RootOperation op = smart_dynamic_cast<RootOperation>(obj);
271 if (op.isValid()) {
272 m_opDeque.push_back(std::move(op));
273 } else {
274 error() << "Con::objectArrived got non-op";
275 }
276}
277
278void Connection::dispatchOp(const RootOperation& op) {
279 try {
280 bool anonymous = op->isDefaultTo();
281
282 Router::RouterResult rr = m_responder->handleOp(op);
283 if (rr == Router::HANDLED) {
284 return;
285 }
286
287 // locate a router based on from
288 if (!op->isDefaultFrom()) {
289 auto R = m_fromRouters.find(op->getFrom());
290 if (R != m_fromRouters.end()) {
291 rr = R->second->handleOperation(op);
292 if (rr == Router::HANDLED) {
293 return;
294 }
295 }
296 }
297
298 // locate a router based on the op's TO value
299 if (!anonymous) {
300 auto R = m_toRouters.find(op->getTo());
301 if (R != m_toRouters.end()) {
302 rr = R->second->handleOperation(op);
303 if (rr == Router::HANDLED) {
304 return;
305 }
306 } else if (!m_toRouters.empty()) {
307 warning() << "received op with TO=" << op->getTo() << ", but no router is registered for that id";
308 }
309 }
310
311 // special-case, server info refreshes are handled here directly
312 if (op->getClassNo() == INFO_NO && anonymous) {
313 handleServerInfo(op);
314 return;
315 }
316
317 // go to the default router
318 if (m_defaultRouter) {
319 rr = m_defaultRouter->handleOperation(op);
320 }
321 if (rr != Router::HANDLED) {
322 warning() << "no-one handled op:" << op;
323 }
324 } catch (const Atlas::Exception& ae) {
325 error() << "caught Atlas exception: '" << ae.getDescription() <<
326 "' while dispatching op:\n" << op;
327 }
328}
329
330
332 if (_status != ns) StatusChanged.emit(ns);
333 _status = ns;
334}
335
336void Connection::handleFailure(const std::string& msg) {
337 Failure.emit(msg);
338 // FIXME - reset I think, but ensure this is safe
339 m_lock = 0;
340}
341
342void Connection::handleTimeout(const std::string& msg) {
343 handleFailure(msg); // all the same in the end
344}
345
346void Connection::handleServerInfo(const RootOperation& op) {
347 if (!op->getArgs().empty()) {
348 RootEntity svr = smart_dynamic_cast<RootEntity>(op->getArgs().front());
349 if (!svr.isValid()) {
350 error() << "server INFO argument object is broken";
351 return;
352 }
353
354 m_info.processServer(svr);
355 GotServerInfo.emit();
356 }
357}
358
361 m_typeService->init();
362 m_info = ServerInfo{_host};
363}
364
365void Connection::onDisconnectTimeout() {
366 handleTimeout("timed out waiting for disconnection");
367 hardDisconnect(true);
368}
369
370void Connection::postForDispatch(const Root& obj) {
371 RootOperation op = smart_dynamic_cast<RootOperation>(obj);
372 assert(op.isValid());
373 m_opDeque.push_back(op);
374
375#if ATLAS_LOG == 1
376 std::stringstream debugStream;
377 Atlas::Codecs::Bach debugCodec(debugStream, debugStream, *this /* dummy */);
378 Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
379 debugEncoder.streamObjectsMessage(obj);
380 debugStream << std::flush;
381
382 debug() << "posted for re-dispatch:" << debugStream.str();
383#endif
384}
385
386void Connection::cleanupRedispatch(Redispatch* r) {
387 m_finishedRedispatches.push_back(std::unique_ptr<Redispatch>(r));
388}
389
390std::int64_t getNewSerialno() {
391 static std::int64_t _nextSerial = 1001;
392 // note this will eventually loop (in theory), but that's okay
393 // FIXME - using the same intial starting offset is problematic
394 // if the client dies, and quickly reconnects
395 return _nextSerial++;
396}
397
398} // of namespace
Underlying Atlas connection, providing a send interface, and receive (dispatch) system.
Status _status
current status of the connection
void hardDisconnect(bool emit)
virtual int connectLocal(const std::string &socket)
Atlas::Bridge * _bridge
virtual void onConnect()
derived-class notification when connection and negotiation is completed
virtual int connectRemote(const std::string &host, short port)
Status
possible states for the connection
@ DISCONNECTING
clean disconnection in progress
@ DISCONNECTED
finished disconnection
@ CONNECTED
connection fully established
sigc::signal< bool > Disconnecting
Definition: Connection.h:134
int disconnect()
Initiate disconnection from the server.
Definition: Connection.cpp:105
void refreshServerInfo()
Definition: Connection.cpp:246
void handleFailure(const std::string &msg) override
Process failures (to track when reconnection should be permitted)
Definition: Connection.cpp:336
sigc::signal< void, Status > StatusChanged
indicates a status change on the connection
Definition: Connection.h:148
virtual void send(const Atlas::Objects::Root &obj)
Transmit an Atlas::Objects instance to the server.
Definition: Connection.cpp:160
void postForDispatch(const Atlas::Objects::Root &obj)
Definition: Connection.cpp:370
void onConnect() override
derived-class notification when connection and negotiation is completed
Definition: Connection.cpp:359
void setStatus(Status sc) override
Definition: Connection.cpp:331
Connection(boost::asio::io_service &io_service, EventService &eventService, std::string clientName, const std::string &host, short port)
Create an unconnected instance.
Definition: Connection.cpp:48
sigc::signal< void, const std::string & > Failure
Definition: Connection.h:142
void getServerInfo(ServerInfo &) const
Definition: Connection.cpp:242
const short _port
port of the server
Definition: Connection.h:169
OpDeque m_opDeque
store of all the received ops waiting to be dispatched
Definition: Connection.h:191
Handles polling of the IO system as well as making sure that registered handlers are run on the main ...
Definition: EventService.h:43
Definition: Account.cpp:33
std::int64_t getNewSerialno()
operation serial number sequencing
Definition: Connection.cpp:390
void processServer(const Atlas::Objects::Entity::RootEntity &svr)
Definition: ServerInfo.cpp:19