eris  1.4.0
A WorldForge client library.
Connection.cpp
1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4 
5 #include "Connection.h"
6 
7 #include "TypeInfo.h"
8 #include "Log.h"
9 #include "Exceptions.h"
10 #include "Router.h"
11 #include "Redispatch.h"
12 #include "Response.h"
13 #include "EventService.h"
14 #include "TypeService.h"
15 
16 #include <Atlas/Objects/Encoder.h>
17 #include <Atlas/Objects/Operation.h>
18 #include <Atlas/Objects/Entity.h>
19 #include <sigc++/bind.h>
20 
21 #include <Atlas/Codecs/Bach.h>
22 
23 #include <cassert>
24 #include <algorithm>
25 
26 #define ATLAS_LOG 0
27 
28 using namespace Atlas::Objects::Operation;
29 using Atlas::Objects::Root;
30 using Atlas::Objects::Entity::RootEntity;
31 using Atlas::Objects::smart_dynamic_cast;
32 
33 namespace Eris {
34 
35 
36 struct ConnectionDecoder : Atlas::Objects::ObjectsDecoder {
37  Connection& m_connection;
38 
39  ConnectionDecoder(Connection& connection, const Atlas::Objects::Factories& factories) :
40  ObjectsDecoder(factories), m_connection(connection) {
41  }
42 
43  void objectArrived(Root obj) override {
44  m_connection.objectArrived(std::move(obj));
45  }
46 };
47 
48 Connection::Connection(boost::asio::io_service& io_service,
49  EventService& eventService,
50  std::string clientName,
51  const std::string& host,
52  short port) :
53  BaseConnection(io_service, std::move(clientName), "game_"),
54  m_decoder(new ConnectionDecoder(*this, *_factories)),
55  _eventService(eventService),
56  _host(host),
57  _port(port),
58  m_typeService(new TypeService(*this)),
59  m_defaultRouter(nullptr),
60  m_lock(0),
61  m_info{host},
62  m_responder(new ResponseTracker) {
63  _bridge = m_decoder.get();
64 }
65 
66 Connection::Connection(boost::asio::io_service& io_service,
67  EventService& eventService,
68  std::string clientName,
69  std::string socket) :
70  BaseConnection(io_service, std::move(clientName), "game_"),
71  m_decoder(new ConnectionDecoder(*this, *_factories)),
72  _eventService(eventService),
73  _host("local"),
74  _port(0),
75  _localSocket(std::move(socket)),
76  m_typeService(new TypeService(*this)),
77  m_defaultRouter(nullptr),
78  m_lock(0),
79  m_info{_host},
80  m_responder(new ResponseTracker) {
81  _bridge = m_decoder.get();
82 }
83 
84 
85 Connection::~Connection() {
86  // ensure we emit this before our vtable goes down, since we are the
87  // Bridge on the underlying Atlas codec, and otherwise we might get
88  // a pure virtual method call
89  hardDisconnect(true);
90 }
91 
92 EventService& Connection::getEventService() {
93  return _eventService;
94 }
95 
97  if (!_localSocket.empty()) {
98  return BaseConnection::connectLocal(_localSocket);
99  }
100 
101  return BaseConnection::connectRemote(_host, _port);
102 
103 }
104 
106  if (_status == DISCONNECTING) {
107  warning() << "duplicate disconnect on Connection that's already disconnecting";
108  return -1;
109  }
110 
111  if (_status == DISCONNECTED) {
112  warning() << "called disconnect on already disconnected Connection";
113  return -1;
114  }
115 
116  // This assert means that this function will always return early below
117  // where m_lock is checked. m_lock seems to be used by Account to prevent
118  // disconnecting when something is pending.
119  // FIXME Look into this.
120  assert(m_lock == 0);
121 
122  if (_socket && _status == CONNECTED) {
123  //Be nice and send a Logout op to the connection when disconnecting down.
124  _socket->getEncoder().streamObjectsMessage(Logout());
125  _socket->write();
126  }
127 
128  // this is a soft disconnect; it will give people a chance to do tear down and so on
129  // in response, people who need to hold the disconnect will lock() the
130  // connection, and unlock when their work is done. A timeout stops
131  // locks from preventing disconnection
133  Disconnecting.emit();
134 
135  if (m_lock == 0) {
136  hardDisconnect(true);
137  return 0;
138  }
139 
140  // fell through, so someone has locked =>
141  // start a disconnect timeout
142 // _timeout = new Timeout(5000);
143 // _timeout->Expired.connect(sigc::mem_fun(this, &Connection::onDisconnectTimeout));
144  return 0;
145 }
146 
147 void Connection::dispatch() {
148 
149  // now dispatch received ops
150  while (!m_opDeque.empty()) {
151  RootOperation op = std::move(m_opDeque.front());
152  m_opDeque.pop_front();
153  dispatchOp(op);
154  }
155 
156  // finally, clean up any redispatches that fired (aka 'deleteLater')
157  m_finishedRedispatches.clear();
158 }
159 
160 void Connection::send(const Atlas::Objects::Root& obj) {
161  if ((_status != CONNECTED) && (_status != DISCONNECTING)) {
162  error() << "called send on closed connection";
163  return;
164  }
165 
166  if (!_socket) {
167  handleFailure("Connection::send: stream failed");
168  hardDisconnect(true);
169  return;
170  }
171 
172 #if ATLAS_LOG == 1
173  std::stringstream debugStream;
174 
175  Atlas::Codecs::Bach debugCodec(debugStream, debugStream, *this /*dummy*/);
176  Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
177  debugEncoder.streamObjectsMessage(obj);
178  debugStream << std::flush;
179 
180  debug() << "sending:" << debugStream.str();
181 #endif
182 
183  _socket->getEncoder().streamObjectsMessage(obj);
184  _socket->write();
185 }
186 
187 void Connection::registerRouterForTo(Router* router, const std::string& toId) {
188  m_toRouters[toId] = router;
189 }
190 
191 void Connection::unregisterRouterForTo(Router* router, const std::string& toId) {
192  assert(m_toRouters[toId] == router);
193  m_toRouters.erase(toId);
194 }
195 
196 void Connection::registerRouterForFrom(Router* router, const std::string& fromId) {
197  m_fromRouters[fromId] = router;
198 }
199 
200 void Connection::unregisterRouterForFrom(const std::string& fromId) {
201  m_fromRouters.erase(fromId);
202 }
203 
204 void Connection::setDefaultRouter(Router* router) {
205  if (m_defaultRouter || !router) {
206  error() << "setDefaultRouter duplicate set or null argument";
207  return;
208  }
209 
210  m_defaultRouter = router;
211 }
212 
213 void Connection::clearDefaultRouter() {
214  m_defaultRouter = nullptr;
215 }
216 
218  ++m_lock;
219 }
220 
222  if (m_lock < 1) {
223  throw InvalidOperation("Imbalanced lock/unlock calls on Connection");
224  }
225 
226  if (--m_lock == 0) {
227  switch (_status) {
228  case DISCONNECTING:
229  debug() << "Connection unlocked in DISCONNECTING, closing socket";
230  debug() << "have " << m_opDeque.size() << " ops waiting";
231  m_opDeque.clear();
232  hardDisconnect(true);
233  break;
234 
235  default:
236  warning() << "Connection unlocked in spurious state : this may cause a failure later";
237  break;
238  }
239  }
240 }
241 
243  si = m_info;
244 }
245 
247  if (_status != CONNECTED) {
248  warning() << "called refreshServerInfo while not connected, ignoring";
249  return;
250  }
251 
252  m_info.status = ServerInfo::QUERYING;
253  Get gt;
254  gt->setSerialno(getNewSerialno());
255  send(gt);
256 }
257 
258 void Connection::objectArrived(Root obj) {
259 #if ATLAS_LOG == 1
260  std::stringstream debugStream;
261  Atlas::Codecs::Bach debugCodec(debugStream, debugStream, *this /* dummy */);
262  Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
263  debugEncoder.streamObjectsMessage(obj);
264  debugStream << std::flush;
265 
266  debug() << "received:" << debugStream.str();
267 #else
268  debug() << "received op:" << obj->getParent();
269 #endif
270  RootOperation op = smart_dynamic_cast<RootOperation>(obj);
271  if (op.isValid()) {
272  m_opDeque.push_back(std::move(op));
273  } else {
274  error() << "Con::objectArrived got non-op";
275  }
276 }
277 
278 void Connection::dispatchOp(const RootOperation& op) {
279  try {
280  bool anonymous = op->isDefaultTo();
281 
282  Router::RouterResult rr = m_responder->handleOp(op);
283  if (rr == Router::HANDLED) {
284  return;
285  }
286 
287  // locate a router based on from
288  if (!op->isDefaultFrom()) {
289  auto R = m_fromRouters.find(op->getFrom());
290  if (R != m_fromRouters.end()) {
291  rr = R->second->handleOperation(op);
292  if (rr == Router::HANDLED) {
293  return;
294  }
295  }
296  }
297 
298  // locate a router based on the op's TO value
299  if (!anonymous) {
300  auto R = m_toRouters.find(op->getTo());
301  if (R != m_toRouters.end()) {
302  rr = R->second->handleOperation(op);
303  if (rr == Router::HANDLED) {
304  return;
305  }
306  } else if (!m_toRouters.empty()) {
307  warning() << "received op with TO=" << op->getTo() << ", but no router is registered for that id";
308  }
309  }
310 
311  // special-case, server info refreshes are handled here directly
312  if (op->getClassNo() == INFO_NO && anonymous) {
313  handleServerInfo(op);
314  return;
315  }
316 
317  // go to the default router
318  if (m_defaultRouter) {
319  rr = m_defaultRouter->handleOperation(op);
320  }
321  if (rr != Router::HANDLED) {
322  warning() << "no-one handled op:" << op;
323  }
324  } catch (const Atlas::Exception& ae) {
325  error() << "caught Atlas exception: '" << ae.getDescription() <<
326  "' while dispatching op:\n" << op;
327  }
328 }
329 
330 
332  if (_status != ns) StatusChanged.emit(ns);
333  _status = ns;
334 }
335 
336 void Connection::handleFailure(const std::string& msg) {
337  Failure.emit(msg);
338  // FIXME - reset I think, but ensure this is safe
339  m_lock = 0;
340 }
341 
342 void Connection::handleTimeout(const std::string& msg) {
343  handleFailure(msg); // all the same in the end
344 }
345 
346 void Connection::handleServerInfo(const RootOperation& op) {
347  if (!op->getArgs().empty()) {
348  RootEntity svr = smart_dynamic_cast<RootEntity>(op->getArgs().front());
349  if (!svr.isValid()) {
350  error() << "server INFO argument object is broken";
351  return;
352  }
353 
354  m_info.processServer(svr);
355  GotServerInfo.emit();
356  }
357 }
358 
361  m_typeService->init();
362  m_info = ServerInfo{_host};
363 }
364 
365 void Connection::onDisconnectTimeout() {
366  handleTimeout("timed out waiting for disconnection");
367  hardDisconnect(true);
368 }
369 
370 void Connection::postForDispatch(const Root& obj) {
371  RootOperation op = smart_dynamic_cast<RootOperation>(obj);
372  assert(op.isValid());
373  m_opDeque.push_back(op);
374 
375 #if ATLAS_LOG == 1
376  std::stringstream debugStream;
377  Atlas::Codecs::Bach debugCodec(debugStream, debugStream, *this /* dummy */);
378  Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
379  debugEncoder.streamObjectsMessage(obj);
380  debugStream << std::flush;
381 
382  debug() << "posted for re-dispatch:" << debugStream.str();
383 #endif
384 }
385 
386 void Connection::cleanupRedispatch(Redispatch* r) {
387  m_finishedRedispatches.push_back(std::unique_ptr<Redispatch>(r));
388 }
389 
390 std::int64_t getNewSerialno() {
391  static std::int64_t _nextSerial = 1001;
392  // note this will eventually loop (in theory), but that's okay
393  // FIXME - using the same intial starting offset is problematic
394  // if the client dies, and quickly reconnects
395  return _nextSerial++;
396 }
397 
398 } // of namespace
Eris::Connection::StatusChanged
sigc::signal< void, Status > StatusChanged
indicates a status change on the connection
Definition: Connection.h:148
Eris::Connection::setStatus
void setStatus(Status sc) override
Definition: Connection.cpp:331
Eris::BaseConnection::hardDisconnect
void hardDisconnect(bool emit)
Definition: BaseConnection.cpp:162
Eris::EventService
Handles polling of the IO system as well as making sure that registered handlers are run on the main ...
Definition: EventService.h:43
Eris::BaseConnection::connectRemote
virtual int connectRemote(const std::string &host, short port)
Definition: BaseConnection.cpp:65
Eris::Connection::Failure
sigc::signal< void, const std::string & > Failure
Definition: Connection.h:142
Eris::BaseConnection::onConnect
virtual void onConnect()
derived-class notification when connection and negotiation is completed
Definition: BaseConnection.cpp:178
Eris::Connection::postForDispatch
void postForDispatch(const Atlas::Objects::Root &obj)
Definition: Connection.cpp:370
Eris::Connection::handleFailure
void handleFailure(const std::string &msg) override
Process failures (to track when reconnection should be permitted)
Definition: Connection.cpp:336
Eris::ConnectionDecoder
Definition: Connection.cpp:36
Eris::BaseConnection::_status
Status _status
current status of the connection
Definition: BaseConnection.h:132
Eris::Connection::_port
const short _port
port of the server
Definition: Connection.h:169
Eris::TypeService
Definition: TypeService.h:24
Eris::BaseConnection::DISCONNECTED
@ DISCONNECTED
finished disconnection
Definition: BaseConnection.h:60
Eris::Connection::Connection
Connection(boost::asio::io_service &io_service, EventService &eventService, std::string clientName, const std::string &host, short port)
Create an unconnected instance.
Definition: Connection.cpp:48
Eris::Router
Definition: Router.h:11
Eris::error
Definition: LogStream.h:66
Eris::Connection::m_opDeque
OpDeque m_opDeque
store of all the received ops waiting to be dispatched
Definition: Connection.h:191
Eris::ServerInfo::processServer
void processServer(const Atlas::Objects::Entity::RootEntity &svr)
Definition: ServerInfo.cpp:19
Eris::Connection::connect
int connect()
Definition: Connection.cpp:96
Eris::getNewSerialno
std::int64_t getNewSerialno()
operation serial number sequencing
Definition: Connection.cpp:390
Eris
Definition: Account.cpp:33
Eris::Connection::getServerInfo
void getServerInfo(ServerInfo &) const
Definition: Connection.cpp:242
Eris::Redispatch
Definition: Redispatch.h:16
Eris::InvalidOperation
Definition: Exceptions.h:26
Eris::ServerInfo
Definition: ServerInfo.h:23
Eris::Connection::unlock
void unlock()
Definition: Connection.cpp:221
Eris::BaseConnection::DISCONNECTING
@ DISCONNECTING
clean disconnection in progress
Definition: BaseConnection.h:61
Eris::Connection::Disconnecting
sigc::signal< bool > Disconnecting
Definition: Connection.h:134
Eris::Connection::onConnect
void onConnect() override
derived-class notification when connection and negotiation is completed
Definition: Connection.cpp:359
Eris::BaseConnection::CONNECTED
@ CONNECTED
connection fully established
Definition: BaseConnection.h:59
Eris::BaseConnection
Underlying Atlas connection, providing a send interface, and receive (dispatch) system.
Definition: BaseConnection.h:40
Eris::Connection::send
virtual void send(const Atlas::Objects::Root &obj)
Transmit an Atlas::Objects instance to the server.
Definition: Connection.cpp:160
Eris::Connection::disconnect
int disconnect()
Initiate disconnection from the server.
Definition: Connection.cpp:105
Eris::debug
Definition: LogStream.h:46
Eris::warning
Definition: LogStream.h:56
Eris::Connection
Definition: Connection.h:45
Eris::Connection::refreshServerInfo
void refreshServerInfo()
Definition: Connection.cpp:246
Eris::Connection::lock
void lock()
Definition: Connection.cpp:217
Eris::BaseConnection::connectLocal
virtual int connectLocal(const std::string &socket)
Definition: BaseConnection.cpp:98
Eris::BaseConnection::Status
Status
possible states for the connection
Definition: BaseConnection.h:55
Eris::ResponseTracker
Definition: Response.h:65