eris  1.4.0
A WorldForge client library.
Metaserver.cpp
1 #include "MetaQuery.h"
2 
3 #ifdef HAVE_CONFIG_H
4 #include "config.h"
5 #endif
6 
7 #include "Metaserver.h"
8 
9 #include "ServerInfo.h"
10 #include "Log.h"
11 #include "EventService.h"
12 #include "Exceptions.h"
13 
14 #include <Atlas/Objects/Operation.h>
15 #include <Atlas/Objects/RootEntity.h>
16 #include <sigc++/slot.h>
17 
18 #include <algorithm>
19 
20 #include <cassert>
21 #include <cstdio>
22 #include <cstring>
23 #include <memory>
24 #include <utility>
25 
26 #ifdef _WIN32
27 
28 #ifndef snprintf
29 #define snprintf _snprintf
30 #endif
31 
32 #endif // _WIN32
33 
34 using namespace Atlas::Objects::Operation;
35 using Atlas::Objects::smart_dynamic_cast;
36 using Atlas::Objects::Root;
37 using Atlas::Objects::Entity::RootEntity;
38 
39 namespace Eris {
40 
41 char* pack_uint32(uint32_t data, char* buffer, unsigned int& size);
42 
43 char* unpack_uint32(uint32_t& dest, char* buffer);
44 
45 const char* META_SERVER_PORT = "8453";
46 
47 // meta-server protocol commands
48 const uint32_t CKEEP_ALIVE = 2,
49  HANDSHAKE = 3,
50  CLIENTSHAKE = 5,
51  LIST_REQ = 7,
52  LIST_RESP = 8,
53  PROTO_ERANGE = 9,
54  LAST = 10;
55 
56 // special command value to track LIST_RESP processing
57 const uint32_t LIST_RESP2 = 999;
58 
59 struct MetaDecoder : Atlas::Objects::ObjectsDecoder {
60  Meta& m_meta;
61 
62  MetaDecoder(Meta& meta, const Atlas::Objects::Factories& factories) :
63  ObjectsDecoder(factories), m_meta(meta) {
64  }
65 
66  void objectArrived(Root obj) override {
67  m_meta.objectArrived(std::move(obj));
68  }
69 };
70 
71 Meta::Meta(boost::asio::io_service& io_service,
72  EventService& eventService,
73  std::string metaServer,
74  unsigned int maxQueries) :
75  m_factories(new Atlas::Objects::Factories()),
76  m_io_service(io_service),
77  m_event_service(eventService),
78  m_decoder(new MetaDecoder(*this, *m_factories)),
79  m_status(INVALID),
80  m_metaHost(std::move(metaServer)),
81  m_maxActiveQueries(maxQueries),
82  m_nextQuery(0),
83  m_resolver(io_service),
84  m_socket(io_service),
85  m_metaTimer(io_service),
86  m_receive_stream(&m_receive_buffer),
87  m_send_buffer(new boost::asio::streambuf()),
88  m_send_stream(m_send_buffer.get()),
89  m_dataPtr(nullptr),
90  m_bytesToRecv(0),
91  m_totalServers(0),
92  m_packed(0),
93  m_recvCmd(false),
94  m_gotCmd(0) {
95  unsigned int max_half_open = FD_SETSIZE;
96  if (m_maxActiveQueries > (max_half_open - 2)) {
97  m_maxActiveQueries = max_half_open - 2;
98  }
99 }
100 
101 Meta::~Meta() {
102  disconnect();
103 }
104 
105 /*
106 void Meta::queryServer(const std::string &ip)
107 {
108  m_status = QUERYING;
109 
110  if (m_activeQueries.size() < m_maxActiveQueries)
111  {
112  MetaQuery *q = new MetaQuery(this, ip);
113  if (q->isComplete())
114  {
115  // indicated a failure occurred, so we'll kill it now and say no more
116  delete q;
117  } else
118  m_activeQueries.insert(q);
119  }
120 }
121 */
122 
123 void Meta::queryServerByIndex(size_t index) {
124  if (m_status == INVALID) {
125  error() << "called queryServerByIndex with invalid server list";
126  return;
127  }
128 
129  if (index >= m_gameServers.size()) {
130  error() << "called queryServerByIndex with bad server index " << index;
131  return;
132  }
133 
134  if (m_gameServers[index].status == ServerInfo::QUERYING) {
135  warning() << "called queryServerByIndex on server already being queried";
136  return;
137  }
138 
139  internalQuery(index);
140 }
141 
143  if (!m_activeQueries.empty()) {
144  warning() << "called meta::refresh() while doing another query, ignoring";
145  return;
146  }
147 
148  if (m_status == VALID) {
149  // save the current list in case we fail
150  m_lastValidList = m_gameServers;
151  }
152 
153  m_gameServers.clear();
154  m_nextQuery = 0;
155  disconnect();
156  connect();
157 }
158 
159 void Meta::cancel() {
160  m_activeQueries.clear();
161 
162  disconnect();
163 
164  // revert to the last valid list if possible
165  if (!m_lastValidList.empty()) {
166  m_gameServers = m_lastValidList;
167  m_status = VALID;
168  } else {
169  m_status = INVALID;
170  m_gameServers.clear();
171  }
172  m_nextQuery = m_gameServers.size();
173 }
174 
175 const ServerInfo& Meta::getInfoForServer(size_t index) const {
176  if (index >= m_gameServers.size()) {
177  error() << "passed out-of-range index " << index <<
178  " to getInfoForServer";
179  throw BaseException("Out of bounds exception when getting server info.");
180  } else {
181  return m_gameServers[index];
182  }
183 }
184 
185 size_t Meta::getGameServerCount() const {
186  return m_gameServers.size();
187 }
188 
190  boost::asio::ip::udp::resolver::query query(m_metaHost, META_SERVER_PORT);
191  m_resolver.async_resolve(query,
192  [&](const boost::system::error_code& ec, boost::asio::ip::udp::resolver::iterator iterator) {
193  if (!ec && iterator != boost::asio::ip::udp::resolver::iterator()) {
194  this->connect(*iterator);
195  } else {
196  this->disconnect();
197  }
198  });
199 }
200 
201 void Meta::connect(const boost::asio::ip::udp::endpoint& endpoint) {
202  m_socket.open(boost::asio::ip::udp::v4());
203  m_socket.async_connect(endpoint, [&](boost::system::error_code ec) {
204  if (!ec) {
205  do_read();
206 
207  // build the initial 'ping' and send
208  unsigned int dsz = 0;
209  pack_uint32(CKEEP_ALIVE, m_data.data(), dsz);
210  this->m_send_stream << std::string(m_data.data(), dsz) << std::flush;
211  this->write();
212  this->setupRecvCmd();
213 
214  this->m_status = GETTING_LIST;
215  this->startTimeout();
216  } else {
217  this->doFailure("Couldn't open connection to metaserver " + this->m_metaHost);
218  }
219  });
220 }
221 
223  if (m_socket.is_open()) {
224  m_socket.close();
225  }
226  m_metaTimer.cancel();
227 }
228 
229 void Meta::startTimeout() {
230  m_metaTimer.cancel();
231  m_metaTimer.expires_from_now(std::chrono::seconds(8));
232  m_metaTimer.async_wait([&](boost::system::error_code ec) {
233  if (!ec) {
234  this->metaTimeout();
235  }
236  });
237 }
238 
239 
240 void Meta::do_read() {
241  if (m_socket.is_open()) {
242  m_socket.async_receive(m_receive_buffer.prepare(DATA_BUFFER_SIZE),
243  [this](boost::system::error_code ec, std::size_t length) {
244  if (!ec) {
245  m_receive_buffer.commit(length);
246  if (length > 0) {
247  this->gotData();
248  }
249  this->write();
250  this->do_read();
251  } else {
252  if (ec != boost::asio::error::operation_aborted) {
253  this->doFailure(std::string("Connection to the meta-server failed: ") + ec.message());
254  }
255  }
256  });
257  }
258 }
259 
260 void Meta::write() {
261  if (m_socket.is_open()) {
262  if (m_send_buffer->size() != 0) {
263  std::shared_ptr<boost::asio::streambuf> send_buffer(std::move(m_send_buffer));
264  m_send_buffer = std::make_unique<boost::asio::streambuf>();
265  m_send_stream.rdbuf(m_send_buffer.get());
266  m_socket.async_send(send_buffer->data(),
267  [&, send_buffer](boost::system::error_code ec, std::size_t length) {
268  if (!ec) {
269  send_buffer->consume(length);
270  } else {
271  if (ec != boost::asio::error::operation_aborted) {
272  this->doFailure(std::string("Connection to the meta-server failed: ") + ec.message());
273  }
274  }
275  });
276  }
277  }
278 }
279 
280 void Meta::gotData() {
281  recv();
282 }
283 
284 void Meta::deleteQuery(MetaQuery* query) {
285  auto I = std::find_if(m_activeQueries.begin(), m_activeQueries.end(), [&](const std::unique_ptr<MetaQuery>& entry){return entry.get() == query;});
286 
287  if (I != m_activeQueries.end()) {
288  auto containedQuery = I->release();
289  m_activeQueries.erase(I);
290 
291  //Delay destruction.
292  m_event_service.runOnMainThread([containedQuery]() {
293  delete containedQuery;
294  });
295 
296  if (m_activeQueries.empty() && m_nextQuery == m_gameServers.size()) {
297  m_status = VALID;
298  // we're all done, emit the signal
299  AllQueriesDone.emit();
300  }
301  } else {
302  error() << "Tried to delete meta server query which wasn't "
303  "among the active queries. This indicates an error "
304  "with the flow in Metaserver.";
305  }
306 }
307 
308 void Meta::recv() {
309  if (m_bytesToRecv == 0) {
310  error() << "No bytes to receive when calling recv().";
311  return;
312  }
313 
314  m_receive_stream.peek();
315  std::streambuf* iobuf = m_receive_stream.rdbuf();
316  std::streamsize len = std::min(m_bytesToRecv, iobuf->in_avail());
317  if (len > 0) {
318  iobuf->sgetn(m_dataPtr, len);
319  m_bytesToRecv -= len;
320  m_dataPtr += len;
321  }
322 // do {
323 // int d = m_stream.get();
324 // *(m_dataPtr++) = static_cast<char>(d);
325 // m_bytesToRecv--;
326 // } while (iobuf->in_avail() && m_bytesToRecv);
327 
328  if (m_bytesToRecv > 0) {
329  error() << "Fragment data received by Meta::recv";
330  return; // can't do anything till we get more data
331  }
332 
333  if (m_recvCmd) {
334  uint32_t op;
335  unpack_uint32(op, m_data.data());
336  recvCmd(op);
337  } else {
338  processCmd();
339  }
340 
341  // try and read more
342  if (m_bytesToRecv && m_receive_stream.rdbuf()->in_avail())
343  recv();
344 }
345 
346 void Meta::recvCmd(uint32_t op) {
347  switch (op) {
348  case HANDSHAKE:
349  setupRecvData(1, HANDSHAKE);
350  break;
351 
352  case PROTO_ERANGE:
353  doFailure("Got list range error from Metaserver");
354  break;
355 
356  case LIST_RESP:
357  setupRecvData(2, LIST_RESP);
358  break;
359 
360  default:
361  doFailure("Unknown Meta server command");
362  break;
363  }
364 }
365 
366 void Meta::processCmd() {
367  if (m_status != GETTING_LIST) {
368  error() << "Command received when not expecting any. It will be ignored. The command was: " << m_gotCmd;
369  return;
370  }
371 
372  switch (m_gotCmd) {
373  case HANDSHAKE: {
374  uint32_t stamp;
375  unpack_uint32(stamp, m_data.data());
376 
377  unsigned int dsz = 0;
378  m_dataPtr = pack_uint32(CLIENTSHAKE, m_data.data(), dsz);
379  pack_uint32(stamp, m_dataPtr, dsz);
380 
381  m_send_stream << std::string(m_data.data(), dsz) << std::flush;
382  write();
383 
384  m_metaTimer.cancel();
385  // send the initial list request
386  listReq(0);
387  }
388  break;
389 
390  case LIST_RESP: {
391  //uint32_t m_totalServers, m_packed;
392  uint32_t total_servers;
393  m_dataPtr = unpack_uint32(total_servers, m_data.data());
394  if (!m_gameServers.empty()) {
395  if (total_servers != m_totalServers) {
396  warning() << "Server total in new packet has changed. " << total_servers << ":" << m_totalServers;
397  }
398  } else {
399  m_totalServers = total_servers;
400  }
401  unpack_uint32(m_packed, m_dataPtr);
402  // FIXME This assumes that the data received so far is all the servers, which
403  // in the case of fragmented server list it is not. Currently this code is generally
404  // of the size of packet receieved. As there should only ever be one packet incoming
405  // we should be able to make assumptions based on the amount of data in the buffer.
406  // The buffer should also contain a complete packet if it contains any, so retrieving
407  // data one byte at a time is less efficient than it might be.
408  setupRecvData(m_packed, LIST_RESP2);
409 
410  // If this is the first response, allocate the space
411  if (m_gameServers.empty()) {
412 
413  assert(m_nextQuery == 0);
414  m_gameServers.reserve(m_totalServers);
415  }
416  }
417  break;
418 
419  case LIST_RESP2: {
420  m_dataPtr = m_data.data();
421  while (m_packed--) {
422  uint32_t ip;
423  m_dataPtr = unpack_uint32(ip, m_dataPtr);
424 
425  char buf[32];
426  snprintf(buf, 32, "%u.%u.%u.%u",
427  (ip & 0x000000FF),
428  (ip & 0x0000FF00) >> 8u,
429  (ip & 0x00FF0000) >> 16u,
430  (ip & 0xFF000000) >> 24u
431  );
432 
433  // FIXME - decide whether a reverse name lookup is necessary here or not
434  m_gameServers.push_back(ServerInfo{buf});
435  }
436 
437  if (m_gameServers.size() < m_totalServers) {
438  // request some more
439  listReq((unsigned int) m_gameServers.size());
440  } else {
441  // allow progress bars to setup, etc, etc
442  CompletedServerList.emit(m_totalServers);
443  m_status = QUERYING;
444  // all done, clean up
445  disconnect();
446  }
447  query();
448 
449  }
450  break;
451 
452  default:
453  std::stringstream ss;
454  ss << "Unknown Meta server command: " << m_gotCmd;
455  doFailure(ss.str());
456  break;
457  }
458 }
459 
460 void Meta::listReq(unsigned int base) {
461  unsigned int dsz = 0;
462  char* _dataPtr = pack_uint32(LIST_REQ, m_data.data(), dsz);
463  pack_uint32(base, _dataPtr, dsz);
464 
465  m_send_stream << std::string(m_data.data(), dsz) << std::flush;
466  write();
467  setupRecvCmd();
468 
469  startTimeout();
470 }
471 
472 void Meta::setupRecvCmd() {
473  m_recvCmd = true;
474  m_bytesToRecv = sizeof(uint32_t);
475  m_dataPtr = m_data.data();
476 }
477 
478 void Meta::setupRecvData(int words, uint32_t got) {
479  m_recvCmd = false;
480  m_bytesToRecv = words * sizeof(uint32_t);
481  m_dataPtr = m_data.data();
482  m_gotCmd = got;
483 }
484 
485 /* pack the data into the specified buffer, update the buffer size, and return
486 the new buffer insert pointer */
487 
488 char* pack_uint32(uint32_t data, char* buffer, unsigned int& size) {
489  uint32_t netorder;
490 
491  netorder = htonl(data);
492  memcpy(buffer, &netorder, sizeof(uint32_t));
493  size += sizeof(uint32_t);
494  return buffer + sizeof(uint32_t);
495 }
496 
497 /* unpack one data from the buffer, and return the next extract pointer */
498 
499 char* unpack_uint32(uint32_t& dest, char* buffer) {
500  uint32_t netorder;
501 
502  memcpy(&netorder, buffer, sizeof(uint32_t));
503  dest = ntohl(netorder);
504  return buffer + sizeof(uint32_t);
505 }
506 
507 void Meta::internalQuery(size_t index) {
508  assert(index < m_gameServers.size());
509 
510  ServerInfo& sv = m_gameServers[index];
511  auto q = std::make_unique<MetaQuery>(m_io_service, *m_decoder, *this, sv.host, index);
512  if (q->getStatus() != BaseConnection::CONNECTING &&
513  q->getStatus() != BaseConnection::NEGOTIATE) {
514  // indicates a failure occurred, so we'll kill it now and say no more
515  sv.status = ServerInfo::INVALID;
516  } else {
517  m_activeQueries.emplace_back(std::move(q));
518  sv.status = ServerInfo::QUERYING;
519  }
520 }
521 
522 void Meta::objectArrived(Root obj) {
523  Info info = smart_dynamic_cast<Info>(obj);
524  if (!info.isValid()) {
525  error() << "Meta::objectArrived, failed to convert object to INFO op";
526  return;
527  }
528 
529 // work out which query this is
530  auto refno = info->getRefno();
531  QuerySet::iterator Q;
532 
533  for (Q = m_activeQueries.begin(); Q != m_activeQueries.end(); ++Q)
534  if ((*Q)->getQueryNo() == refno) break;
535 
536  if (Q == m_activeQueries.end()) {
537  error() << "Couldn't locate query for meta-query reply";
538  } else {
539  (*Q)->setComplete();
540 
541  RootEntity svr = smart_dynamic_cast<RootEntity>(info->getArgs().front());
542  if (!svr.isValid()) {
543  error() << "Query INFO argument object is broken";
544  } else {
545  if ((*Q)->getServerIndex() >= m_gameServers.size()) {
546  error() << "Got server info with out of bounds index.";
547  } else {
548  ServerInfo& sv = m_gameServers[(*Q)->getServerIndex()];
549 
550  sv.processServer(svr);
551  sv.ping = (int) (*Q)->getElapsed();
552 
553  // emit the signal
554  ReceivedServerInfo.emit(sv);
555  }
556  }
557  deleteQuery(Q->get());
558  }
559  query();
560 }
561 
562 void Meta::doFailure(const std::string& msg) {
563  Failure.emit(msg);
564  cancel();
565 }
566 
567 void Meta::dispatch() {
568 
569 }
570 
571 void Meta::metaTimeout() {
572  // cancel calls disconnect, which will kill upfront without this
573  m_metaTimer.cancel();
574 
575  // might want different behaviour in the future, I suppose
576  doFailure("Connection to the meta-server timed out");
577 }
578 
579 void Meta::queryFailure(MetaQuery* q, const std::string& msg) {
580  // we do NOT emit a failure signal here (because that would probably cause the
581  // host app to pop up a dialog or something) since query failures are likely to
582  // be very frequent.
583  m_gameServers[q->getServerIndex()].status = ServerInfo::INVALID;
584  q->setComplete();
585  deleteQuery(q);
586  query();
587 }
588 
589 void Meta::query() {
590  while ((m_activeQueries.size() < m_maxActiveQueries) && (m_nextQuery < m_gameServers.size())) {
591  internalQuery(m_nextQuery++);
592  }
593 }
594 
595 void Meta::queryTimeout(MetaQuery* q) {
596  m_gameServers[q->getServerIndex()].status = ServerInfo::TIMEOUT;
597  deleteQuery(q);
598  query();
599 }
600 
601 } // of Eris namespace
602 
sigc::signal< void > AllQueriesDone
Emitted when the entire server list has been refreshed.
Definition: Metaserver.h:111
server query timed out
Definition: ServerInfo.h:29
void connect()
Definition: Metaserver.cpp:189
STL namespace.
Querying game servers for information.
Definition: Metaserver.h:46
const ServerInfo & getInfoForServer(size_t index) const
Definition: Metaserver.cpp:175
void disconnect()
Definition: Metaserver.cpp:222
stream / socket connection in progress
The server list is not valid.
Definition: Metaserver.h:43
void queryServerByIndex(size_t index)
Query a specific game server; emits a signal when complete.
Definition: Metaserver.cpp:123
Definition: Account.cpp:33
Handles polling of the IO system as well as making sure that registered handlers are run on the main ...
Definition: EventService.h:42
size_t getGameServerCount() const
Definition: Metaserver.cpp:185
Atlas negotiation in progress.
Retrieving the list of game servers from the metaserver.
Definition: Metaserver.h:45
The list is valid and completed.
Definition: Metaserver.h:44
void cancel()
Definition: Metaserver.cpp:159
void refresh()
Definition: Metaserver.cpp:142
sigc::signal< void, const std::string & > Failure
Definition: Metaserver.h:117
Meta encapsulates the meta-game system, including the meta-server protocol and queries.
Definition: Metaserver.h:39
void processServer(const Atlas::Objects::Entity::RootEntity &svr)
Definition: ServerInfo.cpp:19
sigc::signal< void, const ServerInfo & > ReceivedServerInfo
Emitted when information about a server is received.
Definition: Metaserver.h:102
sigc::signal< void, int > CompletedServerList
Definition: Metaserver.h:108
void runOnMainThread(const std::function< void()> &handler, std::shared_ptr< bool > activeMarker=std::make_shared< bool >(true))
Adds a handler which will be run on the main thread.