bfeee829460dc1047e69af19e7a0bf4338d58e5a
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

1) /* Blinker
2)    Copyright 2011 Stefan Schuermans <stefan@blinkenarea.org>
3)    Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html
4)    a blinkenarea.org project */
5) 
6) #ifndef BLINKER_RECEIVER_IMPL_H
7) #define BLINKER_RECEIVER_IMPL_H
8) 
9) #include <string>
10) 
11) #include <BlinkenLib/BlinkenProto.h>
12) #include <BlinkenLib/BlinkenFrame.h>
13) 
14) #include "Directory.h"
15) #include "File.h"
16) #include "IoCallee.h"
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

17) #include "Mgrs.h"
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

18) #include "Module.h"
19) #include "OutStreamFile.h"
20) #include "Protocol.h"
21) #include "ProtocolFile.h"
22) #include "Receiver.h"
23) #include "SettingFile.h"
24) #include "Time.h"
25) #include "TimeCallee.h"
26) 
27) namespace Blinker {
28) 
29) /**
30)  * @brief constructor
Stefan Schuermans make modules know their name

Stefan Schuermans authored 12 years ago

31)  * @param[in] name module name
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

32)  * @param[in] mgrs managers
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

33)  * @param[in] dirBase base directory
34)  */
35) template<typename ADDR, typename SOCK>
Stefan Schuermans make modules know their name

Stefan Schuermans authored 12 years ago

36) Receiver<ADDR, SOCK>::Receiver(const std::string &name, Mgrs &mgrs,
37)                                const Directory &dirBase):
38)   Module(name, mgrs, dirBase),
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

39)   m_fileOutStream(dirBase.getFile("outstream"), mgrs.m_streamMgr),
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

40)   m_fileBind(dirBase.getFile("bind")),
41)   m_fileSrc(dirBase.getFile("source")),
42)   m_fileProtocol(dirBase.getFile("protocol")),
43)   m_pSock(NULL),
44)   m_needTimeout(false),
45)   m_needNextReq(false)
46) {
47)   // read protocol and source address
48)   readProto();
49)   readSrc();
50) 
51)   // create and bind socket
52)   createSock();
53) }
54) 
55) /// virtual destructor
56) template<typename ADDR, typename SOCK>
57) Receiver<ADDR, SOCK>::~Receiver()
58) {
59)   // cancel stream request
60)   request(false);
61) 
62)   // destroy socket
63)   destroySock();
64) }
65) 
66) /// check for update of configuration
67) template<typename ADDR, typename SOCK>
68) void Receiver<ADDR, SOCK>::updateConfig()
69) {
70)   // output stream name file was modified -> re-get output stream
71)   if (m_fileOutStream.checkModified())
72)     m_fileOutStream.update();
73) 
74)   // bind address file was modified -> re-create socket
75)   if (m_fileBind.checkModified()) {
76)     destroySock();
77)     createSock();
78)   }
79) 
80)   // source address file was modified -> re-read source address
81)   if (m_fileSrc.checkModified())
82)     readSrc();
83) 
84)   // protocol file was modified -> re-read protocol
85)   if (m_fileProtocol.checkModified())
86)     readProto();
87) }
88) 
89) /// callback when requested time reached
90) template<typename ADDR, typename SOCK>
91) void Receiver<ADDR, SOCK>::timeCall()
92) {
93)   Time now = Time::now();
94) 
95)   // stream timeout
96)   if (m_needTimeout && m_timeout <= now) {
97)     m_needTimeout = false;
98)     m_fileOutStream.setFrame(NULL);
99)   }
100) 
101)   // re-request stream
102)   if (m_needNextReq && m_nextReq <= now)
103)     request(true);
104) 
105)   updateTimeCallback();
106) }
107) 
108) /**
109)  * @brief callback when I/O object is readable
110)  * @param[in] io I/O object that is readable
111)  */
112) template<typename ADDR, typename SOCK>
113) void Receiver<ADDR, SOCK>::ioReadCall(Io *io)
114) {
115)   // reception on socket
116)   if (io == m_pSock)
117)     receiveFromSock();
118) }
119) 
120) /**
121)  * @brief callback when I/O object is writable
122)  * @param[in] io I/O object that is writable
123)  */
124) template<typename ADDR, typename SOCK>
125) void Receiver<ADDR, SOCK>::ioWriteCall(Io *io)
126) {
127)   (void)io; // unused
128) }
129) 
130) /// (re-)read source address
131) template<typename ADDR, typename SOCK>
132) void Receiver<ADDR, SOCK>::readSrc()
133) {
134)   // cancel old stream request
135)   request(false);
136)   // set "no frame", because stream is going to change
137)   m_fileOutStream.setFrame(NULL);
138) 
139)   m_fileSrc.update();
140) 
141)   // send new stream request
142)   request(true);
143) }
144) 
145) /// (re-)read protocol
146) template<typename ADDR, typename SOCK>
147) void Receiver<ADDR, SOCK>::readProto()
148) {
149)   // cancel old stream request
150)   request(false);
151)   // set "no frame", because stream is going to change
152)   m_fileOutStream.setFrame(NULL);
153) 
154)   // read new protocol from file
155)   m_fileProtocol.update();
156) 
157)   // send new stream request
158)   request(true);
159) }
160) 
161) /// create socket and bind it
162) template<typename ADDR, typename SOCK>
163) void Receiver<ADDR, SOCK>::createSock()
164) {
165)   // create socket
166)   if (!m_pSock) {
167)     m_pSock = new SOCK();
168)     if (!m_pSock)
169)       return;
170)   }
171) 
172)   // get bind address from bind address setting file
173)   m_fileBind.update();
174)   if (!m_fileBind.m_valid) {
175)     delete m_pSock;
176)     m_pSock = NULL;
177)     return;
178)   }
179) 
180)   // bind socket
181)   if (!m_pSock->bind(m_fileBind.m_obj)) {
182)     delete m_pSock;
183)     m_pSock = NULL;
184)     return;
185)   }
186) 
187)   // request callback on recepetion
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

188)   m_mgrs.m_callMgr.requestIoReadCall(this, m_pSock);
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

189) 
190)   // send new stream request
191)   request(true);
192) }
193) 
194) /// destroy socket
195) template<typename ADDR, typename SOCK>
196) void Receiver<ADDR, SOCK>::destroySock()
197) {
198)   // cancel old stream request
199)   request(false);
200)   // set "no frame", because stream is going to change
201)   m_fileOutStream.setFrame(NULL);
202) 
203)   // cancel callback request
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

204)   m_mgrs.m_callMgr.cancelIoReadCall(this, m_pSock);
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

205) 
206)   // destroy socket
207)   if (m_pSock) {
208)     delete m_pSock;
209)     m_pSock = NULL;
210)   }
211) }
212) 
213) /**
214)  * @brief request stream / cancel stream request
215)  * @param req if to send a request (otherwise: cancel request)
216)  */
217) template<typename ADDR, typename SOCK>
218) void Receiver<ADDR, SOCK>::request(bool req)
219) {
220)   etBlinkenPacket type;
221)   char buf[65536];
222)   int len;
223)   std::string data;
224) 
225)   // check that there is a socket, a source address and a protocol
226)   if (!m_pSock || !m_fileSrc.m_valid || !m_fileProtocol.m_valid)
227)     return;
228) 
229)   // assemble request / cancel request
230)   type = req ? BlinkenPacketRequest: BlinkenPacketEndRequest;
231)   len = BlinkenProtoMakePacket(m_fileProtocol.m_obj.m_proto, type,
232)                                buf, sizeof(buf));
233)   if (len < 0)
234)     len = 0;
235)   data.assign(buf, len);
236) 
237)   // send request / cancel request
238)   m_pSock->send(data, m_fileSrc.m_obj);
239) 
240)   // set time for next request
241)   if (req)
242)     m_nextReq = Time::now() + Time(10);
243)   m_needNextReq = req;
244)   updateTimeCallback();
245) }
246) 
247) /// receive data from socket
248) template<typename ADDR, typename SOCK>
249) void Receiver<ADDR, SOCK>::receiveFromSock()
250) {
251)   etBlinkenProto proto;
252)   etBlinkenPacket packet;
253)   std::string data;
254)   ADDR addr;
255) 
256)   // make sure socket exists
257)   if (!m_pSock)
258)     return;
259) 
260)   // receive (leave if no reception)
261)   if (!m_pSock->recv(data, addr))
262)     return;
263) 
264)   // check source address (if configured)
265)   if (m_fileSrc.m_valid && addr != m_fileSrc.m_obj)
266)     return; // mismatch
267) 
Stefan Schuermans first check address and the...

Stefan Schuermans authored 12 years ago

268)   // detect packet type and protocol
269)   BlinkenProtoDetectPacket(data.c_str(), data.size(), &proto, &packet);
270) 
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

271)   // check protocol (if configured)
272)   if (m_fileProtocol.m_valid && proto != m_fileProtocol.m_obj.m_proto)
273)     return; // mismatch
274) 
275)   switch (packet) {
276) 
277)     // frame -> process it
278)     case BlinkenPacketFrame:
279)       procFrame(data);
280)       break;
281) 
282)     // end of stream -> pass on "no frame"
283)     case BlinkenPacketStreamEnd:
284)       m_fileOutStream.setFrame(NULL);
285)       break;
286) 
287)     default:
288)       break;
289) 
290)   } // switch (packet)
291) }
292) 
293) /**
294)  * @brief process frame
295)  * @param[in] data received frame protocol data
296)  */
297) template<typename ADDR, typename SOCK>
298) void Receiver<ADDR, SOCK>::procFrame(const std::string &data)
299) {
300)   stBlinkenFrame *pFrame;
301) 
302)   // parse frame
303)   pFrame = BlinkenFrameFromNetwork(data.c_str(), data.size(), NULL);
304)   if (!pFrame)
305)     return;
306) 
307)   // pass on frame
308)   m_fileOutStream.setFrame(pFrame);
309) 
Stefan Schuermans fix memory leak

Stefan Schuermans authored 12 years ago

310)   // frame not needed any more (stream creates copy for it use)
311)   BlinkenFrameFree(pFrame);
312) 
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

313)   // set new stream timeout
314)   m_timeout = Time::now() + Time(5);
315)   m_needTimeout = true;
316)   updateTimeCallback();
317) }
318) 
319) /// update time callback
320) template<typename ADDR, typename SOCK>
321) void Receiver<ADDR, SOCK>::updateTimeCallback()
322) {
323)   if (m_needTimeout)
324)     if (m_needNextReq)
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

325)       m_mgrs.m_callMgr.requestTimeCall(this, m_timeout < m_nextReq ? m_timeout
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

326)                                                             : m_nextReq);
327)     else
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

328)       m_mgrs.m_callMgr.requestTimeCall(this, m_timeout);
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

329)   else
330)     if (m_needNextReq)
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

331)       m_mgrs.m_callMgr.requestTimeCall(this, m_nextReq);
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

332)     else
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

333)       m_mgrs.m_callMgr.cancelTimeCall(this);
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

334) }
335) 
336) } // namespace Blinker
337) 
Stefan Schuermans fix comment

Stefan Schuermans authored 12 years ago

338) #endif // #ifndef BLINKER_RECEIVER_IMPL_H