BlinkenArea - GitList
Repositories
Blog
Wiki
Blinker
Code
Commits
Branches
Tags
Search
Tree:
98f2233
Branches
Tags
master
Blinker
src
noarch
Receiver_impl.h
implemented stream receiver
Stefan Schuermans
commited
98f2233
at 2011-12-21 22:47:08
Receiver_impl.h
Blame
History
Raw
/* Blinker Copyright 2011 Stefan Schuermans <stefan@blinkenarea.org> Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html a blinkenarea.org project */ #ifndef BLINKER_RECEIVER_IMPL_H #define BLINKER_RECEIVER_IMPL_H #include <list> #include <map> #include <string> #include <BlinkenLib/BlinkenProto.h> #include <BlinkenLib/BlinkenFrame.h> #include "CallMgr.h" #include "Directory.h" #include "File.h" #include "IoCallee.h" #include "Module.h" #include "OutStreamFile.h" #include "Protocol.h" #include "ProtocolFile.h" #include "Receiver.h" #include "SettingFile.h" #include "StreamMgr.h" #include "Time.h" #include "TimeCallee.h" namespace Blinker { /** * @brief constructor * @param[in] callMgr callback manager * @param[in] streamMgr stream manager * @param[in] dirBase base directory */ template<typename ADDR, typename SOCK> Receiver<ADDR, SOCK>::Receiver(CallMgr &callMgr, StreamMgr &streamMgr, const Directory &dirBase): Module(callMgr, streamMgr, dirBase), m_fileOutStream(dirBase.getFile("outstream"), streamMgr), m_fileBind(dirBase.getFile("bind")), m_fileSrc(dirBase.getFile("source")), m_fileProtocol(dirBase.getFile("protocol")), m_pSock(NULL), m_needTimeout(false), m_needNextReq(false) { // read protocol and source address readProto(); readSrc(); // create and bind socket createSock(); } /// virtual destructor template<typename ADDR, typename SOCK> Receiver<ADDR, SOCK>::~Receiver() { // cancel stream request request(false); // destroy socket destroySock(); } /// check for update of configuration template<typename ADDR, typename SOCK> void Receiver<ADDR, SOCK>::updateConfig() { // output stream name file was modified -> re-get output stream if (m_fileOutStream.checkModified()) m_fileOutStream.update(); // bind address file was modified -> re-create socket if (m_fileBind.checkModified()) { destroySock(); createSock(); } // source address file was modified -> re-read source address if (m_fileSrc.checkModified()) readSrc(); // protocol file was modified -> re-read protocol if (m_fileProtocol.checkModified()) readProto(); } /// callback when requested time reached template<typename ADDR, typename SOCK> void Receiver<ADDR, SOCK>::timeCall() { Time now = Time::now(); // stream timeout if (m_needTimeout && m_timeout <= now) { m_needTimeout = false; m_fileOutStream.setFrame(NULL); } // re-request stream if (m_needNextReq && m_nextReq <= now) request(true); updateTimeCallback(); } /** * @brief callback when I/O object is readable * @param[in] io I/O object that is readable */ template<typename ADDR, typename SOCK> void Receiver<ADDR, SOCK>::ioReadCall(Io *io) { // reception on socket if (io == m_pSock) receiveFromSock(); } /** * @brief callback when I/O object is writable * @param[in] io I/O object that is writable */ template<typename ADDR, typename SOCK> void Receiver<ADDR, SOCK>::ioWriteCall(Io *io) { (void)io; // unused } /// (re-)read source address template<typename ADDR, typename SOCK> void Receiver<ADDR, SOCK>::readSrc() { // cancel old stream request request(false); // set "no frame", because stream is going to change m_fileOutStream.setFrame(NULL); m_fileSrc.update(); // send new stream request request(true); } /// (re-)read protocol template<typename ADDR, typename SOCK> void Receiver<ADDR, SOCK>::readProto() { // cancel old stream request request(false); // set "no frame", because stream is going to change m_fileOutStream.setFrame(NULL); // read new protocol from file m_fileProtocol.update(); // send new stream request request(true); } /// create socket and bind it template<typename ADDR, typename SOCK> void Receiver<ADDR, SOCK>::createSock() { // create socket if (!m_pSock) { m_pSock = new SOCK(); if (!m_pSock) return; } // get bind address from bind address setting file m_fileBind.update(); if (!m_fileBind.m_valid) { delete m_pSock; m_pSock = NULL; return; } // bind socket if (!m_pSock->bind(m_fileBind.m_obj)) { delete m_pSock; m_pSock = NULL; return; } // request callback on recepetion m_callMgr.requestIoReadCall(this, m_pSock); // send new stream request request(true); } /// destroy socket template<typename ADDR, typename SOCK> void Receiver<ADDR, SOCK>::destroySock() { // cancel old stream request request(false); // set "no frame", because stream is going to change m_fileOutStream.setFrame(NULL); // cancel callback request m_callMgr.cancelIoReadCall(this, m_pSock); // destroy socket if (m_pSock) { delete m_pSock; m_pSock = NULL; } } /** * @brief request stream / cancel stream request * @param req if to send a request (otherwise: cancel request) */ template<typename ADDR, typename SOCK> void Receiver<ADDR, SOCK>::request(bool req) { etBlinkenPacket type; char buf[65536]; int len; std::string data; // check that there is a socket, a source address and a protocol if (!m_pSock || !m_fileSrc.m_valid || !m_fileProtocol.m_valid) return; // assemble request / cancel request type = req ? BlinkenPacketRequest: BlinkenPacketEndRequest; len = BlinkenProtoMakePacket(m_fileProtocol.m_obj.m_proto, type, buf, sizeof(buf)); if (len < 0) len = 0; data.assign(buf, len); // send request / cancel request m_pSock->send(data, m_fileSrc.m_obj); // set time for next request if (req) m_nextReq = Time::now() + Time(10); m_needNextReq = req; updateTimeCallback(); } /// receive data from socket template<typename ADDR, typename SOCK> void Receiver<ADDR, SOCK>::receiveFromSock() { etBlinkenProto proto; etBlinkenPacket packet; std::string data; ADDR addr; // make sure socket exists if (!m_pSock) return; // receive (leave if no reception) if (!m_pSock->recv(data, addr)) return; // detect packet type and protocol BlinkenProtoDetectPacket(data.c_str(), data.size(), &proto, &packet); // check source address (if configured) if (m_fileSrc.m_valid && addr != m_fileSrc.m_obj) return; // mismatch // check protocol (if configured) if (m_fileProtocol.m_valid && proto != m_fileProtocol.m_obj.m_proto) return; // mismatch switch (packet) { // frame -> process it case BlinkenPacketFrame: procFrame(data); break; // end of stream -> pass on "no frame" case BlinkenPacketStreamEnd: m_fileOutStream.setFrame(NULL); break; default: break; } // switch (packet) } /** * @brief process frame * @param[in] data received frame protocol data */ template<typename ADDR, typename SOCK> void Receiver<ADDR, SOCK>::procFrame(const std::string &data) { stBlinkenFrame *pFrame; // parse frame pFrame = BlinkenFrameFromNetwork(data.c_str(), data.size(), NULL); if (!pFrame) return; // pass on frame m_fileOutStream.setFrame(pFrame); // set new stream timeout m_timeout = Time::now() + Time(5); m_needTimeout = true; updateTimeCallback(); } /// update time callback template<typename ADDR, typename SOCK> void Receiver<ADDR, SOCK>::updateTimeCallback() { if (m_needTimeout) if (m_needNextReq) m_callMgr.requestTimeCall(this, m_timeout < m_nextReq ? m_timeout : m_nextReq); else m_callMgr.requestTimeCall(this, m_timeout); else if (m_needNextReq) m_callMgr.requestTimeCall(this, m_nextReq); else m_callMgr.cancelTimeCall(this); } } // namespace Blinker #endif // #ifndef BLINKER_RECEIVER_H