BlinkenArea - GitList
Repositories
Blog
Wiki
Blinker
Code
Commits
Branches
Tags
Search
Tree:
a4e709d
Branches
Tags
master
Blinker
src
noarch
Sender_impl.h
converted sender to use list tracker
Stefan Schuermans
commited
a4e709d
at 2011-12-11 01:03:08
Sender_impl.h
Blame
History
Raw
/* Blinker Copyright 2011 Stefan Schuermans <stefan@blinkenarea.org> Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html a blinkenarea.org project */ #ifndef SENDER_IMPL_H #define SENDER_IMPL_H #include <list> #include <map> #include <string> #include <BlinkenLib/BlinkenProto.h> #include <BlinkenLib/BlinkenFrame.h> #include "CallMgr.h" #include "Directory.h" #include "File.h" #include "InStreamFile.h" #include "IoCallee.h" #include "ListTracker.h" #include "ListTracker_impl.h" #include "Module.h" #include "Protocol.h" #include "ProtocolFile.h" #include "Sender.h" #include "SenderDest.h" #include "SenderDest_impl.h" #include "SettingFile.h" #include "StreamMgr.h" #include "StreamRecv.h" #include "Time.h" #include "TimeCallee.h" namespace Blinker { /** * @brief constructor * @param[in] callMgr callback manager * @param[in] streamMgr stream manager * @param[in] dirBase base directory */ template<typename ADDR, typename SOCK> Sender<ADDR, SOCK>::Sender(CallMgr &callMgr, StreamMgr &streamMgr, const Directory &dirBase): Module(callMgr, streamMgr, dirBase), m_fileInStream(dirBase.getFile("instream"), streamMgr), m_fileBind(dirBase.getFile("bind")), m_fileProtocol(dirBase.getFile("protocol")), m_pSock(NULL), m_destListTracker(*this, dirBase.getSubdir("destinations")) { // read protocol readProto(); // attac to input stream m_fileInStream.setStreamRecv(this); // create and bind socket createSock(); // load static destinations m_destListTracker.init(); } /// virtual destructor template<typename ADDR, typename SOCK> Sender<ADDR, SOCK>::~Sender() { // send "no frame" to all destinations sendAllNoFrame(); // free static destinations m_destListTracker.clear(); // destroy socket destroySock(); // detach from input stream m_fileInStream.setStreamRecv(NULL); // cancel time callback m_callMgr.cancelTimeCall(this); } /// check for update of configuration template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::updateConfig() { // input stream name file was modified -> re-get input stream if (m_fileInStream.checkModified()) m_fileInStream.update(); // bind address file was modified -> re-create socket if (m_fileBind.checkModified()) { destroySock(); createSock(); } // protocol file was modified -> re-read protocol if (m_fileProtocol.checkModified()) readProto(); // static destinations update m_destListTracker.updateConfig(); } /** * @brief set current frame * @param[in] stream stream name * @param[in] pFrame current frame (NULL for none) */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::setFrame(const std::string &stream, stBlinkenFrame *pFrame) { // convert new frame to protocol data frame2data(pFrame, m_data); // send new protocol data to all destinations sendAllProto(); (void)stream; // unused } /// callback when requested time reached template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::timeCall() { // repeat current protocol data to all destinations sendAllProto(); } /** * @brief callback when I/O object is readable * @param[in] io I/O object that is readable */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::ioReadCall(Io *io) { // reception on socket if (io == m_pSock) receiveFromSock(); } /** * @brief callback when I/O object is writable * @param[in] io I/O object that is writable */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::ioWriteCall(Io *io) { (void)io; // unused } /// (re-)read protocol template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::readProto() { // send "no frame" to all destinations // (stream with old protocol will stop now) sendAllNoFrame(); // clear dynamic destinations // (they registered with old protocol, which is out of service now) m_dynDests.clear(); // clear old frame data and old no frame data m_noFrameData.clear(); m_data.clear(); // read new protocol from file m_fileProtocol.update(); // create new no frame protocol data and new protocol data frame2data(NULL, m_noFrameData); frame2data(m_fileInStream.getCurFrame(), m_data); // send current protocol data to all destinations sendAllProto(); } /// create socket and bind it template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::createSock() { // create socket if (!m_pSock) { m_pSock = new SOCK(); if (!m_pSock) return; } // get bind address from bind address setting file m_fileBind.update(); if (!m_fileBind.m_valid) { delete m_pSock; m_pSock = NULL; return; } // bind socket if (!m_pSock->bind(m_fileBind.m_obj)) { delete m_pSock; m_pSock = NULL; return; } // request callback on recepetion m_callMgr.requestIoReadCall(this, m_pSock); // send current protocol data to all destinations sendAllProto(); } /// destroy socket template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::destroySock() { // send "no frame" to all destinations // (stream from this socket will stop now) sendAllNoFrame(); // clear dynamic destinations // (they registered with this socket and this socket is gone) m_dynDests.clear(); // cancel callback request m_callMgr.cancelIoReadCall(this, m_pSock); // destroy socket if (m_pSock) { delete m_pSock; m_pSock = NULL; } } /// remove timed-out dynamic destinations template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::removeTimedOutDynDests() { Time now, timeout; typename DynDests::iterator itDyn; now = Time::now(); timeout = Time(30); for (itDyn = m_dynDests.begin(); itDyn != m_dynDests.end(); ) if (itDyn->second + timeout < now) m_dynDests.erase(itDyn++); else ++itDyn; } /// send current protocol data to all destinations template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::sendAllProto() { // remove timed-out dynamic destinations removeTimedOutDynDests(); // send current protocol data to all static/dynamic destinations sendDests(&m_data); // request time callback in one second m_callMgr.requestTimeCall(this, Time::now() + Time(1)); } /// send "no frame" to all destinations template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::sendAllNoFrame() { // remove timed-out dynamic destinations removeTimedOutDynDests(); // get "no frame" protocol data and send to all static/dynamic destinations sendDests(&m_noFrameData); // request time callback in one second m_callMgr.requestTimeCall(this, Time::now() + Time(1)); } /** * @brief send data to static/dynamic destinations * @param[in] data *pData protocol data to send */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::sendDests(const std::string *pData) { // send data to static destinations typename DestListTracker::ListIt itDest; for (itDest = m_destListTracker.m_list.begin(); itDest != m_destListTracker.m_list.end(); ++itDest) itDest->m_pObj->setProtoData(pData); // send data to all dynamic destinations typename DynDests::const_iterator itDyn; for (itDyn = m_dynDests.begin(); itDyn != m_dynDests.end(); ++itDyn) sendProto(*pData, itDyn->first); } /** * @brief send protocol data to address * @param[in] data protocol data of frame (empty if unknown) * @param[in] addr address to send to */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::sendProto(const std::string &data, const ADDR &addr) const { if (m_pSock && !data.empty()) m_pSock->send(data, addr); } /** * @brief convert frame to protocol data * @param[in] pFrame frame (NULL for none) * @param[out] data protocol data */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::frame2data(stBlinkenFrame *pFrame, std::string &data) const { etBlinkenProto proto; char buf[65536]; int len; // no protocol -> leave with empty data if (!m_fileProtocol.m_valid) { data.clear(); return; } proto = m_fileProtocol.m_obj.m_proto; // convert frame to protcol data if (pFrame) len = BlinkenFrameToNetwork(pFrame, proto, buf, sizeof(buf)); else len = BlinkenProtoMakePacket(proto, BlinkenPacketStreamEnd, buf, sizeof(buf)); if (len < 0) len = 0; data.assign(buf, len); } /// receive data from socket template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::receiveFromSock() { etBlinkenProto proto; etBlinkenPacket packet; std::string data; ADDR addr; // make sure socket exists if (!m_pSock) return; // receive (leave if no reception) if (!m_pSock->recv(data, addr)) return; // detect packet type and protocol BlinkenProtoDetectPacket(data.c_str(), data.size(), &proto, &packet); if (m_fileProtocol.m_valid && proto == m_fileProtocol.m_obj.m_proto) { switch (packet) { // request -> add to dynamic destinations and send current frame case BlinkenPacketRequest: m_dynDests[addr] = Time::now(); sendProto(m_data, addr); break; // end request -> remove from dynamic destinations case BlinkenPacketEndRequest: m_dynDests.erase(addr); break; default: break; } // switch (packet) } // if (m_fileProtocol.m_valid ... } } // namespace Blinker #endif // #ifndef SENDER_H