11#include "Redispatch.h"
13#include "EventService.h"
14#include "TypeService.h"
16#include <Atlas/Objects/Encoder.h>
17#include <Atlas/Objects/Operation.h>
18#include <Atlas/Objects/Entity.h>
19#include <sigc++/bind.h>
21#include <Atlas/Codecs/Bach.h>
28using namespace Atlas::Objects::Operation;
29using Atlas::Objects::Root;
30using Atlas::Objects::Entity::RootEntity;
31using Atlas::Objects::smart_dynamic_cast;
40 ObjectsDecoder(factories), m_connection(connection) {
43 void objectArrived(Root obj)
override {
44 m_connection.objectArrived(std::move(obj));
50 std::string clientName,
51 const std::string& host,
55 _eventService(eventService),
59 m_defaultRouter(nullptr),
68 std::string clientName,
72 _eventService(eventService),
75 _localSocket(std::move(socket)),
77 m_defaultRouter(nullptr),
85Connection::~Connection() {
92EventService& Connection::getEventService() {
97 if (!_localSocket.empty()) {
107 warning() <<
"duplicate disconnect on Connection that's already disconnecting";
112 warning() <<
"called disconnect on already disconnected Connection";
124 _socket->getEncoder().streamObjectsMessage(Logout());
147void Connection::dispatch() {
151 RootOperation op = std::move(
m_opDeque.front());
157 m_finishedRedispatches.clear();
162 error() <<
"called send on closed connection";
173 std::stringstream debugStream;
175 Atlas::Codecs::Bach debugCodec(debugStream, debugStream, *
this );
176 Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
177 debugEncoder.streamObjectsMessage(obj);
178 debugStream << std::flush;
180 debug() <<
"sending:" << debugStream.str();
183 _socket->getEncoder().streamObjectsMessage(obj);
187void Connection::registerRouterForTo(
Router* router,
const std::string& toId) {
188 m_toRouters[toId] = router;
191void Connection::unregisterRouterForTo(Router* router,
const std::string& toId) {
192 assert(m_toRouters[toId] == router);
193 m_toRouters.erase(toId);
196void Connection::registerRouterForFrom(Router* router,
const std::string& fromId) {
197 m_fromRouters[fromId] = router;
200void Connection::unregisterRouterForFrom(
const std::string& fromId) {
201 m_fromRouters.erase(fromId);
204void Connection::setDefaultRouter(Router* router) {
205 if (m_defaultRouter || !router) {
206 error() <<
"setDefaultRouter duplicate set or null argument";
210 m_defaultRouter = router;
213void Connection::clearDefaultRouter() {
214 m_defaultRouter =
nullptr;
229 debug() <<
"Connection unlocked in DISCONNECTING, closing socket";
236 warning() <<
"Connection unlocked in spurious state : this may cause a failure later";
248 warning() <<
"called refreshServerInfo while not connected, ignoring";
252 m_info.status = ServerInfo::QUERYING;
258void Connection::objectArrived(Root obj) {
260 std::stringstream debugStream;
261 Atlas::Codecs::Bach debugCodec(debugStream, debugStream, *
this );
262 Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
263 debugEncoder.streamObjectsMessage(obj);
264 debugStream << std::flush;
266 debug() <<
"received:" << debugStream.str();
268 debug() <<
"received op:" << obj->getParent();
270 RootOperation op = smart_dynamic_cast<RootOperation>(obj);
274 error() <<
"Con::objectArrived got non-op";
278void Connection::dispatchOp(
const RootOperation& op) {
280 bool anonymous = op->isDefaultTo();
282 Router::RouterResult rr = m_responder->handleOp(op);
283 if (rr == Router::HANDLED) {
288 if (!op->isDefaultFrom()) {
289 auto R = m_fromRouters.find(op->getFrom());
290 if (R != m_fromRouters.end()) {
291 rr = R->second->handleOperation(op);
292 if (rr == Router::HANDLED) {
300 auto R = m_toRouters.find(op->getTo());
301 if (R != m_toRouters.end()) {
302 rr = R->second->handleOperation(op);
303 if (rr == Router::HANDLED) {
306 }
else if (!m_toRouters.empty()) {
307 warning() <<
"received op with TO=" << op->getTo() <<
", but no router is registered for that id";
312 if (op->getClassNo() == INFO_NO && anonymous) {
313 handleServerInfo(op);
318 if (m_defaultRouter) {
319 rr = m_defaultRouter->handleOperation(op);
321 if (rr != Router::HANDLED) {
322 warning() <<
"no-one handled op:" << op;
324 }
catch (
const Atlas::Exception& ae) {
325 error() <<
"caught Atlas exception: '" << ae.getDescription() <<
326 "' while dispatching op:\n" << op;
342void Connection::handleTimeout(
const std::string& msg) {
346void Connection::handleServerInfo(
const RootOperation& op) {
347 if (!op->getArgs().empty()) {
348 RootEntity svr = smart_dynamic_cast<RootEntity>(op->getArgs().front());
349 if (!svr.isValid()) {
350 error() <<
"server INFO argument object is broken";
355 GotServerInfo.emit();
361 m_typeService->init();
365void Connection::onDisconnectTimeout() {
366 handleTimeout(
"timed out waiting for disconnection");
371 RootOperation op = smart_dynamic_cast<RootOperation>(obj);
372 assert(op.isValid());
376 std::stringstream debugStream;
377 Atlas::Codecs::Bach debugCodec(debugStream, debugStream, *
this );
378 Atlas::Objects::ObjectsEncoder debugEncoder(debugCodec);
379 debugEncoder.streamObjectsMessage(obj);
380 debugStream << std::flush;
382 debug() <<
"posted for re-dispatch:" << debugStream.str();
386void Connection::cleanupRedispatch(
Redispatch* r) {
387 m_finishedRedispatches.push_back(std::unique_ptr<Redispatch>(r));
391 static std::int64_t _nextSerial = 1001;
395 return _nextSerial++;
Underlying Atlas connection, providing a send interface, and receive (dispatch) system.
Status _status
current status of the connection
void hardDisconnect(bool emit)
virtual int connectLocal(const std::string &socket)
virtual void onConnect()
derived-class notification when connection and negotiation is completed
virtual int connectRemote(const std::string &host, short port)
Status
possible states for the connection
@ DISCONNECTING
clean disconnection in progress
@ DISCONNECTED
finished disconnection
@ CONNECTED
connection fully established
sigc::signal< bool > Disconnecting
int disconnect()
Initiate disconnection from the server.
void handleFailure(const std::string &msg) override
Process failures (to track when reconnection should be permitted)
sigc::signal< void, Status > StatusChanged
indicates a status change on the connection
virtual void send(const Atlas::Objects::Root &obj)
Transmit an Atlas::Objects instance to the server.
void postForDispatch(const Atlas::Objects::Root &obj)
void onConnect() override
derived-class notification when connection and negotiation is completed
void setStatus(Status sc) override
Connection(boost::asio::io_service &io_service, EventService &eventService, std::string clientName, const std::string &host, short port)
Create an unconnected instance.
sigc::signal< void, const std::string & > Failure
void getServerInfo(ServerInfo &) const
const short _port
port of the server
OpDeque m_opDeque
store of all the received ops waiting to be dispatched
Handles polling of the IO system as well as making sure that registered handlers are run on the main ...
std::int64_t getNewSerialno()
operation serial number sequencing
void processServer(const Atlas::Objects::Entity::RootEntity &svr)