a1794928efa4fae6d572a405fb4830ce030d58c8
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

1) /* Blinker
2)    Copyright 2011 Stefan Schuermans <stefan@blinkenarea.org>
3)    Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html
4)    a blinkenarea.org project */
5) 
6) #ifndef BLINKER_RECEIVER_IMPL_H
7) #define BLINKER_RECEIVER_IMPL_H
8) 
9) #include <string>
10) 
11) #include <BlinkenLib/BlinkenProto.h>
12) #include <BlinkenLib/BlinkenFrame.h>
13) 
14) #include "Directory.h"
15) #include "File.h"
16) #include "IoCallee.h"
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

17) #include "Mgrs.h"
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

18) #include "Module.h"
19) #include "OutStreamFile.h"
20) #include "Protocol.h"
21) #include "ProtocolFile.h"
22) #include "Receiver.h"
23) #include "SettingFile.h"
24) #include "Time.h"
25) #include "TimeCallee.h"
26) 
27) namespace Blinker {
28) 
29) /**
30)  * @brief constructor
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

31)  * @param[in] mgrs managers
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

32)  * @param[in] dirBase base directory
33)  */
34) template<typename ADDR, typename SOCK>
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

35) Receiver<ADDR, SOCK>::Receiver(Mgrs &mgrs, const Directory &dirBase):
36)   Module(mgrs, dirBase),
37)   m_fileOutStream(dirBase.getFile("outstream"), mgrs.m_streamMgr),
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

38)   m_fileBind(dirBase.getFile("bind")),
39)   m_fileSrc(dirBase.getFile("source")),
40)   m_fileProtocol(dirBase.getFile("protocol")),
41)   m_pSock(NULL),
42)   m_needTimeout(false),
43)   m_needNextReq(false)
44) {
45)   // read protocol and source address
46)   readProto();
47)   readSrc();
48) 
49)   // create and bind socket
50)   createSock();
51) }
52) 
53) /// virtual destructor
54) template<typename ADDR, typename SOCK>
55) Receiver<ADDR, SOCK>::~Receiver()
56) {
57)   // cancel stream request
58)   request(false);
59) 
60)   // destroy socket
61)   destroySock();
62) }
63) 
64) /// check for update of configuration
65) template<typename ADDR, typename SOCK>
66) void Receiver<ADDR, SOCK>::updateConfig()
67) {
68)   // output stream name file was modified -> re-get output stream
69)   if (m_fileOutStream.checkModified())
70)     m_fileOutStream.update();
71) 
72)   // bind address file was modified -> re-create socket
73)   if (m_fileBind.checkModified()) {
74)     destroySock();
75)     createSock();
76)   }
77) 
78)   // source address file was modified -> re-read source address
79)   if (m_fileSrc.checkModified())
80)     readSrc();
81) 
82)   // protocol file was modified -> re-read protocol
83)   if (m_fileProtocol.checkModified())
84)     readProto();
85) }
86) 
87) /// callback when requested time reached
88) template<typename ADDR, typename SOCK>
89) void Receiver<ADDR, SOCK>::timeCall()
90) {
91)   Time now = Time::now();
92) 
93)   // stream timeout
94)   if (m_needTimeout && m_timeout <= now) {
95)     m_needTimeout = false;
96)     m_fileOutStream.setFrame(NULL);
97)   }
98) 
99)   // re-request stream
100)   if (m_needNextReq && m_nextReq <= now)
101)     request(true);
102) 
103)   updateTimeCallback();
104) }
105) 
106) /**
107)  * @brief callback when I/O object is readable
108)  * @param[in] io I/O object that is readable
109)  */
110) template<typename ADDR, typename SOCK>
111) void Receiver<ADDR, SOCK>::ioReadCall(Io *io)
112) {
113)   // reception on socket
114)   if (io == m_pSock)
115)     receiveFromSock();
116) }
117) 
118) /**
119)  * @brief callback when I/O object is writable
120)  * @param[in] io I/O object that is writable
121)  */
122) template<typename ADDR, typename SOCK>
123) void Receiver<ADDR, SOCK>::ioWriteCall(Io *io)
124) {
125)   (void)io; // unused
126) }
127) 
128) /// (re-)read source address
129) template<typename ADDR, typename SOCK>
130) void Receiver<ADDR, SOCK>::readSrc()
131) {
132)   // cancel old stream request
133)   request(false);
134)   // set "no frame", because stream is going to change
135)   m_fileOutStream.setFrame(NULL);
136) 
137)   m_fileSrc.update();
138) 
139)   // send new stream request
140)   request(true);
141) }
142) 
143) /// (re-)read protocol
144) template<typename ADDR, typename SOCK>
145) void Receiver<ADDR, SOCK>::readProto()
146) {
147)   // cancel old stream request
148)   request(false);
149)   // set "no frame", because stream is going to change
150)   m_fileOutStream.setFrame(NULL);
151) 
152)   // read new protocol from file
153)   m_fileProtocol.update();
154) 
155)   // send new stream request
156)   request(true);
157) }
158) 
159) /// create socket and bind it
160) template<typename ADDR, typename SOCK>
161) void Receiver<ADDR, SOCK>::createSock()
162) {
163)   // create socket
164)   if (!m_pSock) {
165)     m_pSock = new SOCK();
166)     if (!m_pSock)
167)       return;
168)   }
169) 
170)   // get bind address from bind address setting file
171)   m_fileBind.update();
172)   if (!m_fileBind.m_valid) {
173)     delete m_pSock;
174)     m_pSock = NULL;
175)     return;
176)   }
177) 
178)   // bind socket
179)   if (!m_pSock->bind(m_fileBind.m_obj)) {
180)     delete m_pSock;
181)     m_pSock = NULL;
182)     return;
183)   }
184) 
185)   // request callback on recepetion
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

186)   m_mgrs.m_callMgr.requestIoReadCall(this, m_pSock);
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

187) 
188)   // send new stream request
189)   request(true);
190) }
191) 
192) /// destroy socket
193) template<typename ADDR, typename SOCK>
194) void Receiver<ADDR, SOCK>::destroySock()
195) {
196)   // cancel old stream request
197)   request(false);
198)   // set "no frame", because stream is going to change
199)   m_fileOutStream.setFrame(NULL);
200) 
201)   // cancel callback request
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

202)   m_mgrs.m_callMgr.cancelIoReadCall(this, m_pSock);
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

203) 
204)   // destroy socket
205)   if (m_pSock) {
206)     delete m_pSock;
207)     m_pSock = NULL;
208)   }
209) }
210) 
211) /**
212)  * @brief request stream / cancel stream request
213)  * @param req if to send a request (otherwise: cancel request)
214)  */
215) template<typename ADDR, typename SOCK>
216) void Receiver<ADDR, SOCK>::request(bool req)
217) {
218)   etBlinkenPacket type;
219)   char buf[65536];
220)   int len;
221)   std::string data;
222) 
223)   // check that there is a socket, a source address and a protocol
224)   if (!m_pSock || !m_fileSrc.m_valid || !m_fileProtocol.m_valid)
225)     return;
226) 
227)   // assemble request / cancel request
228)   type = req ? BlinkenPacketRequest: BlinkenPacketEndRequest;
229)   len = BlinkenProtoMakePacket(m_fileProtocol.m_obj.m_proto, type,
230)                                buf, sizeof(buf));
231)   if (len < 0)
232)     len = 0;
233)   data.assign(buf, len);
234) 
235)   // send request / cancel request
236)   m_pSock->send(data, m_fileSrc.m_obj);
237) 
238)   // set time for next request
239)   if (req)
240)     m_nextReq = Time::now() + Time(10);
241)   m_needNextReq = req;
242)   updateTimeCallback();
243) }
244) 
245) /// receive data from socket
246) template<typename ADDR, typename SOCK>
247) void Receiver<ADDR, SOCK>::receiveFromSock()
248) {
249)   etBlinkenProto proto;
250)   etBlinkenPacket packet;
251)   std::string data;
252)   ADDR addr;
253) 
254)   // make sure socket exists
255)   if (!m_pSock)
256)     return;
257) 
258)   // receive (leave if no reception)
259)   if (!m_pSock->recv(data, addr))
260)     return;
261) 
262)   // check source address (if configured)
263)   if (m_fileSrc.m_valid && addr != m_fileSrc.m_obj)
264)     return; // mismatch
265) 
Stefan Schuermans first check address and the...

Stefan Schuermans authored 12 years ago

266)   // detect packet type and protocol
267)   BlinkenProtoDetectPacket(data.c_str(), data.size(), &proto, &packet);
268) 
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

269)   // check protocol (if configured)
270)   if (m_fileProtocol.m_valid && proto != m_fileProtocol.m_obj.m_proto)
271)     return; // mismatch
272) 
273)   switch (packet) {
274) 
275)     // frame -> process it
276)     case BlinkenPacketFrame:
277)       procFrame(data);
278)       break;
279) 
280)     // end of stream -> pass on "no frame"
281)     case BlinkenPacketStreamEnd:
282)       m_fileOutStream.setFrame(NULL);
283)       break;
284) 
285)     default:
286)       break;
287) 
288)   } // switch (packet)
289) }
290) 
291) /**
292)  * @brief process frame
293)  * @param[in] data received frame protocol data
294)  */
295) template<typename ADDR, typename SOCK>
296) void Receiver<ADDR, SOCK>::procFrame(const std::string &data)
297) {
298)   stBlinkenFrame *pFrame;
299) 
300)   // parse frame
301)   pFrame = BlinkenFrameFromNetwork(data.c_str(), data.size(), NULL);
302)   if (!pFrame)
303)     return;
304) 
305)   // pass on frame
306)   m_fileOutStream.setFrame(pFrame);
307) 
308)   // set new stream timeout
309)   m_timeout = Time::now() + Time(5);
310)   m_needTimeout = true;
311)   updateTimeCallback();
312) }
313) 
314) /// update time callback
315) template<typename ADDR, typename SOCK>
316) void Receiver<ADDR, SOCK>::updateTimeCallback()
317) {
318)   if (m_needTimeout)
319)     if (m_needNextReq)
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

320)       m_mgrs.m_callMgr.requestTimeCall(this, m_timeout < m_nextReq ? m_timeout
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

321)                                                             : m_nextReq);
322)     else
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

323)       m_mgrs.m_callMgr.requestTimeCall(this, m_timeout);
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

324)   else
325)     if (m_needNextReq)
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

326)       m_mgrs.m_callMgr.requestTimeCall(this, m_nextReq);
Stefan Schuermans implemented stream receiver

Stefan Schuermans authored 12 years ago

327)     else
Stefan Schuermans put all managers in one str...

Stefan Schuermans authored 12 years ago

328)       m_mgrs.m_callMgr.cancelTimeCall(this);