BlinkenArea - GitList
Repositories
Blog
Wiki
Blinker
Code
Commits
Branches
Tags
Search
Tree:
be034cc
Branches
Tags
master
Blinker
src
noarch
Sender_impl.h
sender static destination now own class
Stefan Schuermans
commited
be034cc
at 2011-11-22 22:13:20
Sender_impl.h
Blame
History
Raw
/* Blinker Copyright 2011 Stefan Schuermans <stefan@blinkenarea.org> Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html a blinkenarea.org project */ #ifndef SENDER_IMPL_H #define SENDER_IMPL_H #include <list> #include <map> #include <string> #include <BlinkenLib/BlinkenProto.h> #include <BlinkenLib/BlinkenFrame.h> #include "CallMgr.h" #include "Directory.h" #include "File.h" #include "IoCallee.h" #include "Module.h" #include "Sender.h" #include "SenderDest.h" #include "SenderDest_impl.h" #include "SettingFile.h" #include "StreamMgr.h" #include "StreamRecv.h" #include "Time.h" #include "TimeCallee.h" namespace Blinker { /** * @brief constructor * @param[in] callMgr callback manager * @param[in] streamMgr stream manager * @param[in] dirBase base directory */ template<typename ADDR, typename SOCK> Sender<ADDR, SOCK>::Sender(CallMgr &callMgr, StreamMgr &streamMgr, const Directory &dirBase): Module(callMgr, streamMgr, dirBase), m_fileInStream(dirBase.getFile("instream")), m_fileBind(dirBase.getFile("bind")), m_dirDestsBlp(dirBase.getSubdir("blp")), m_dirDestsEblp(dirBase.getSubdir("eblp")), m_dirDestsMcuf(dirBase.getSubdir("mcuf")), m_pInStream(NULL), m_pSock(NULL) { // initialize protocol data buffers noFrame2data(BlinkenProtoBlp, m_noFrameDataBlp); noFrame2data(BlinkenProtoEblp, m_noFrameDataEblp); noFrame2data(BlinkenProtoMcuf, m_noFrameDataMcuf); m_dataBlp = m_noFrameDataBlp; m_dataEblp = m_noFrameDataEblp; m_dataMcuf = m_noFrameDataMcuf; // get input stream and attach to it getInStream(); // create and bind socket createSock(); // load static destinations updateDestsFull(m_dirDestsBlp, m_destListBlp, &m_noFrameDataBlp); updateDestsFull(m_dirDestsEblp, m_destListEblp, &m_noFrameDataEblp); updateDestsFull(m_dirDestsMcuf, m_destListMcuf, &m_noFrameDataMcuf); } /// virtual destructor template<typename ADDR, typename SOCK> Sender<ADDR, SOCK>::~Sender() { // send "no frame" to all destinations sendAllNoFrame(); // free static destination lists freeDestList(m_destListBlp); freeDestList(m_destListEblp); freeDestList(m_destListMcuf); // destroy socket destroySock(); // detach from input stream and release it releaseInStream(); // cancel time callback m_callMgr.cancelTimeCall(this); } /// check for update of configuration template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::updateConfig() { // input stream name file was modified -> re-get input stream if (m_fileInStream.checkModified()) { releaseInStream(); getInStream(); } // bind address file was modified -> re-create socket if (m_fileBind.checkModified()) { destroySock(); createSock(); } // static destinations update // (directory modified -> full, otherwise -> light) if (m_dirDestsBlp.checkModified()) updateDestsFull(m_dirDestsBlp, m_destListBlp, &m_noFrameDataBlp); else updateDestsLight(m_destListBlp); if (m_dirDestsEblp.checkModified()) updateDestsFull(m_dirDestsEblp, m_destListEblp, &m_noFrameDataEblp); else updateDestsLight(m_destListEblp); if (m_dirDestsMcuf.checkModified()) updateDestsFull(m_dirDestsMcuf, m_destListMcuf, &m_noFrameDataMcuf); else updateDestsLight(m_destListMcuf); } /** * @brief set current frame * @param[in] stream stream name * @param[in] pFrame current frame */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::setFrame(const std::string &stream, stBlinkenFrame *pFrame) { // send frame to all destinations sendAllFrame(pFrame); (void)stream; // unused } /** * @brief set current frame to none * @param[in] stream stream name */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::setNoFrame(const std::string &stream) { // send "no frame" to all destinations sendAllNoFrame(); (void)stream; // unused } /// callback when requsted time reached template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::timeCall() { stBlinkenFrame *pFrame; std::string data; // get current frame from stream if (m_pInStream && m_pInStream->getCurFrame(pFrame)) // repeat frame to all destinations sendAllFrame(pFrame); // no stream of no current frame else // repeat "no frame" to all destinations sendAllNoFrame(); } /** * @brief callback when I/O object is readable * @param[in] io I/O object that is readable */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::ioReadCall(Io *io) { // reception on socket if (io == m_pSock) receiveFromSock(); } /** * @brief callback when I/O object is writable * @param[in] io I/O object that is writable */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::ioWriteCall(Io *io) { (void)io; // unused } /** * @brief free static destiation list * @param[in] destList static destination list to free */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::freeDestList(DestList &destList) { while (!destList.empty()) { delete destList.back().m_pDest; destList.pop_back(); } } /// get input stream and attach to it template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::getInStream() { // get input stream m_fileInStream.getStr(m_nameInStream); m_pInStream = &m_streamMgr.refStream(m_nameInStream); // attach to input stream if (m_pInStream) m_pInStream->attach(this); } /// detach from input stream and release it template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::releaseInStream() { // detach from input stream if (m_pInStream) m_pInStream->detach(this); // unreference stream m_pInStream = NULL; m_streamMgr.unrefStream(m_nameInStream); } /// create socket and bind it template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::createSock() { std::string strAddr; ADDR addr; // create socket if (!m_pSock) { m_pSock = new SOCK(); if (!m_pSock) return; } // get bind address from bind address setting file if (!m_fileBind.getStr(strAddr) || !addr.fromStr(strAddr)) { delete m_pSock; m_pSock = NULL; return; } // bind socket if (!m_pSock->bind(addr)) { delete m_pSock; m_pSock = NULL; return; } // request callback on recpetion m_callMgr.requestIoReadCall(this, m_pSock); } /// destroy socket template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::destroySock() { // send "no frame" to all destinations // (stream from this socket will stop now) sendAllNoFrame(); // clear dynamic destinations // (they registered with this socket adn this socket is gone) m_dynDestsBlp.clear(); m_dynDestsEblp.clear(); m_dynDestsMcuf.clear(); // cancel callback request m_callMgr.cancelIoReadCall(this, m_pSock); // destroy socket if (m_pSock) { delete m_pSock; m_pSock = NULL; } } /** * @brief light update of static destinations, * i.e. stat all files in current static destination directory * @param[in] destList static destinations for one protocol */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::updateDestsLight(DestList &destList) { // walk through all files in static dest dir and check for modification typename DestList::iterator itDest; for (itDest = destList.begin(); itDest != destList.end(); ++itDest) itDest->m_pDest->updateConfig(); } /** * @brief full update of static destinations, * i.e. scan files in playlist directory * @param[in] dirDests static destinations directory for protocol * @param[in] destList static destinations for protocol * @param[in] pNoFrameData "no frame" protocaol data */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::updateDestsFull(Directory &dirDests, DestList &destList, const std::string *pNoFrameData) { // get list of subdirs in input directory typedef std::list<std::string> Subdirlist; Subdirlist curSubdirs; dirDests.getEntries(Directory::TypeSubdir, curSubdirs); // walk through current static destinations and subdir list simultaneously Subdirlist::const_iterator itSubdir = curSubdirs.begin(); typename DestList::iterator itDest = destList.begin(); while (itSubdir != curSubdirs.end() || itDest != destList.end()) { // new static destination inserted if (itDest == destList.end() || (itSubdir != curSubdirs.end() && *itSubdir < itDest->m_name)) { // create static destination object DestEntry destEntry(*itSubdir); destEntry.m_pDest = new Dest(*this, dirDests.getSubdir(*itSubdir), pNoFrameData); if (destEntry.m_pDest) // insert static destination entry destList.insert(itDest, destEntry); // advance to next subdir ++itSubdir; } // static destination removed else if (itSubdir == curSubdirs.end() || *itSubdir > itDest->m_name) { // remove static destination delete itDest->m_pDest; itDest = destList.erase(itDest); // do not advance to next subdir } // static sestination stayed in input list else { // check for update itDest->m_pDest->updateConfig(); // advance to next file and next entry ++itSubdir; ++itDest; } } // while itSubdir itDest } /// remove timed-out dynamic destinations template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::removeTimedOutDynDests() { Time now, timeout; typename DynDests::iterator itDyn; now = Time::now(); timeout = Time(30); for (itDyn = m_dynDestsBlp.begin(); itDyn != m_dynDestsBlp.end(); ) if (itDyn->second + timeout < now) m_dynDestsBlp.erase(itDyn++); else ++itDyn; for (itDyn = m_dynDestsEblp.begin(); itDyn != m_dynDestsEblp.end(); ) if (itDyn->second + timeout < now) m_dynDestsEblp.erase(itDyn++); else ++itDyn; for (itDyn = m_dynDestsMcuf.begin(); itDyn != m_dynDestsMcuf.end(); ) if (itDyn->second + timeout < now) m_dynDestsMcuf.erase(itDyn++); else ++itDyn; } /** * @brief send frame to all destinations * @param[in] pFrame frame to send */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::sendAllFrame(stBlinkenFrame *pFrame) { // remove timed-out dynamic destinations removeTimedOutDynDests(); // convert frame to protocol data and send to all static/dynamic destinations if (!m_destListBlp.empty() || !m_dynDestsBlp.empty()) { frame2data(pFrame, BlinkenProtoBlp, m_dataBlp); sendDests(&m_dataBlp, m_destListBlp, m_dynDestsBlp); } if (!m_destListEblp.empty() || !m_dynDestsEblp.empty()) { frame2data(pFrame, BlinkenProtoEblp, m_dataEblp); sendDests(&m_dataEblp, m_destListEblp, m_dynDestsEblp); } if (!m_destListMcuf.empty() || !m_dynDestsMcuf.empty()) { frame2data(pFrame, BlinkenProtoMcuf, m_dataMcuf); sendDests(&m_dataMcuf, m_destListMcuf, m_dynDestsMcuf); } // request time callback in one second m_callMgr.requestTimeCall(this, Time::now() + Time(1)); } /// send "no frame" to all destinations template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::sendAllNoFrame() { // remove timed-out dynamic destinations removeTimedOutDynDests(); // get "no frame" protocol data and send to all static/dynamic destinations if (!m_destListBlp.empty() || !m_dynDestsBlp.empty()) { noFrame2data(BlinkenProtoBlp, m_dataBlp); sendDests(&m_dataBlp, m_destListBlp, m_dynDestsBlp); } if (!m_destListEblp.empty() || !m_dynDestsEblp.empty()) { noFrame2data(BlinkenProtoEblp, m_dataEblp); sendDests(&m_dataEblp, m_destListEblp, m_dynDestsEblp); } if (!m_destListMcuf.empty() || !m_dynDestsMcuf.empty()) { noFrame2data(BlinkenProtoMcuf, m_dataMcuf); sendDests(&m_dataMcuf, m_destListMcuf, m_dynDestsMcuf); } // request time callback in one second m_callMgr.requestTimeCall(this, Time::now() + Time(1)); } /** * @brief send data to static/dynamic destinations * @param[in] data *pData protocol data to send * @param[in] destList static destinations * @param[in] dynDests dynamic destinations */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::sendDests(const std::string *pData, const DestList destList, const DynDests dynDests) { // send data to static destinations typename DestList::const_iterator itDest; for (itDest = destList.begin(); itDest != destList.end(); ++itDest) itDest->m_pDest->setProtoData(pData); // send data to all dynamic destinations typename DynDests::const_iterator itDyn; for (itDyn = dynDests.begin(); itDyn != dynDests.end(); ++itDyn) sendFrame(*pData, itDyn->first); } /** * @brief send current frame to address * @param[in] addr address to send to * @param[in] proto Blinken protocol identifier */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::sendCurFrame(const ADDR &addr, etBlinkenProto proto) const { stBlinkenFrame *pFrame; std::string data; // get current frame from stream if (m_pInStream && m_pInStream->getCurFrame(pFrame)) { // convert frame to protocal data frame2data(pFrame, proto, data); // send frame to address sendFrame(data, addr); } // no stream of no current frame else { // get "no frame" ad protocal data noFrame2data(proto, data); // send "no frame" to address sendFrame(data, addr); } } /** * @brief send "no frame" to address * @param[in] addr address to send to * @param[in] proto Blinken protocol identifier */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::sendNoFrame(const ADDR &addr, etBlinkenProto proto) const { std::string data; // get "no frame" ad protocal data noFrame2data(proto, data); // send "no frame" to address sendFrame(data, addr); } /** * @brief send frame to address * @param[in] data protocol data of frame * @param[in] addr address to send to */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::sendFrame(const std::string &data, const ADDR &addr) const { if (m_pSock) { m_pSock->send(data, addr); } } /** * @brief convert frame to protocol data * @param[in] pFrame frame * @param[in] proto Blinken protocol identifier * @param[out] data protocol data */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::frame2data(stBlinkenFrame *pFrame, etBlinkenProto proto, std::string &data) { char buf[65536]; int len; // convert frame to protcol data len = BlinkenFrameToNetwork(pFrame, proto, buf, sizeof(buf)); if (len < 0) len = 0; data.assign(buf, len); } /** * @brief get "no frame" protocol data * @param[in] proto Blinken protocol identifier * @param[out] data protcol data */ template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::noFrame2data(etBlinkenProto proto, std::string &data) { char buf[16]; int len; // obtain "no frame" protcol data len = BlinkenProtoMakePacket(proto, BlinkenPacketStreamEnd, buf, sizeof(buf)); if (len < 0) len = 0; data.assign(buf, len); } /// receive data from socket template<typename ADDR, typename SOCK> void Sender<ADDR, SOCK>::receiveFromSock() { etBlinkenProto proto; etBlinkenPacket packet; std::string data; ADDR addr; // make sure socket exists if (!m_pSock) return; // receive (leave if no reception) if (!m_pSock->recv(data, addr)) return; // detect packet type and protocol BlinkenProtoDetectPacket(data.c_str(), data.size(), &proto, &packet); switch (packet) { // request -> add to dynamic destinations and send current frame case BlinkenPacketRequest: switch (proto) { case BlinkenProtoBlp: m_dynDestsBlp[addr] = Time::now(); sendCurFrame(addr, BlinkenProtoBlp); break; case BlinkenProtoEblp: m_dynDestsEblp[addr] = Time::now(); sendCurFrame(addr, BlinkenProtoEblp); break; case BlinkenProtoMcuf: m_dynDestsMcuf[addr] = Time::now(); sendCurFrame(addr, BlinkenProtoMcuf); break; default: break; } break; // end request -> remove from dynamic destinations case BlinkenPacketEndRequest: switch (proto) { case BlinkenProtoBlp: m_dynDestsBlp.erase(addr); break; case BlinkenProtoEblp: m_dynDestsEblp.erase(addr); break; case BlinkenProtoMcuf: m_dynDestsMcuf.erase(addr); break; default: break; } break; default: break; } // switch (packet) } /* ##################### # Sender::DestEntry # ##################### */ /// constructor template<typename ADDR, typename SOCK> Sender<ADDR, SOCK>::DestEntry::DestEntry(const std::string &name): m_name(name), m_pDest(NULL) { } } // namespace Blinker #endif // #ifndef SENDER_H