implemented stream receiver
Stefan Schuermans authored 12 years ago
|
1) /* Blinker
2) Copyright 2011 Stefan Schuermans <stefan@blinkenarea.org>
3) Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html
4) a blinkenarea.org project */
5)
6) #ifndef BLINKER_RECEIVER_IMPL_H
7) #define BLINKER_RECEIVER_IMPL_H
8)
9) #include <string>
10)
11) #include <BlinkenLib/BlinkenProto.h>
12) #include <BlinkenLib/BlinkenFrame.h>
13)
14) #include "Directory.h"
15) #include "File.h"
16) #include "IoCallee.h"
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
17) #include "Mgrs.h"
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
18) #include "Module.h"
19) #include "OutStreamFile.h"
20) #include "Protocol.h"
21) #include "ProtocolFile.h"
22) #include "Receiver.h"
23) #include "SettingFile.h"
24) #include "Time.h"
25) #include "TimeCallee.h"
26)
27) namespace Blinker {
28)
29) /**
30) * @brief constructor
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
31) * @param[in] mgrs managers
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
32) * @param[in] dirBase base directory
33) */
34) template<typename ADDR, typename SOCK>
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
35) Receiver<ADDR, SOCK>::Receiver(Mgrs &mgrs, const Directory &dirBase):
36) Module(mgrs, dirBase),
37) m_fileOutStream(dirBase.getFile("outstream"), mgrs.m_streamMgr),
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
38) m_fileBind(dirBase.getFile("bind")),
39) m_fileSrc(dirBase.getFile("source")),
40) m_fileProtocol(dirBase.getFile("protocol")),
41) m_pSock(NULL),
42) m_needTimeout(false),
43) m_needNextReq(false)
44) {
45) // read protocol and source address
46) readProto();
47) readSrc();
48)
49) // create and bind socket
50) createSock();
51) }
52)
53) /// virtual destructor
54) template<typename ADDR, typename SOCK>
55) Receiver<ADDR, SOCK>::~Receiver()
56) {
57) // cancel stream request
58) request(false);
59)
60) // destroy socket
61) destroySock();
62) }
63)
64) /// check for update of configuration
65) template<typename ADDR, typename SOCK>
66) void Receiver<ADDR, SOCK>::updateConfig()
67) {
68) // output stream name file was modified -> re-get output stream
69) if (m_fileOutStream.checkModified())
70) m_fileOutStream.update();
71)
72) // bind address file was modified -> re-create socket
73) if (m_fileBind.checkModified()) {
74) destroySock();
75) createSock();
76) }
77)
78) // source address file was modified -> re-read source address
79) if (m_fileSrc.checkModified())
80) readSrc();
81)
82) // protocol file was modified -> re-read protocol
83) if (m_fileProtocol.checkModified())
84) readProto();
85) }
86)
87) /// callback when requested time reached
88) template<typename ADDR, typename SOCK>
89) void Receiver<ADDR, SOCK>::timeCall()
90) {
91) Time now = Time::now();
92)
93) // stream timeout
94) if (m_needTimeout && m_timeout <= now) {
95) m_needTimeout = false;
96) m_fileOutStream.setFrame(NULL);
97) }
98)
99) // re-request stream
100) if (m_needNextReq && m_nextReq <= now)
101) request(true);
102)
103) updateTimeCallback();
104) }
105)
106) /**
107) * @brief callback when I/O object is readable
108) * @param[in] io I/O object that is readable
109) */
110) template<typename ADDR, typename SOCK>
111) void Receiver<ADDR, SOCK>::ioReadCall(Io *io)
112) {
113) // reception on socket
114) if (io == m_pSock)
115) receiveFromSock();
116) }
117)
118) /**
119) * @brief callback when I/O object is writable
120) * @param[in] io I/O object that is writable
121) */
122) template<typename ADDR, typename SOCK>
123) void Receiver<ADDR, SOCK>::ioWriteCall(Io *io)
124) {
125) (void)io; // unused
126) }
127)
128) /// (re-)read source address
129) template<typename ADDR, typename SOCK>
130) void Receiver<ADDR, SOCK>::readSrc()
131) {
132) // cancel old stream request
133) request(false);
134) // set "no frame", because stream is going to change
135) m_fileOutStream.setFrame(NULL);
136)
137) m_fileSrc.update();
138)
139) // send new stream request
140) request(true);
141) }
142)
143) /// (re-)read protocol
144) template<typename ADDR, typename SOCK>
145) void Receiver<ADDR, SOCK>::readProto()
146) {
147) // cancel old stream request
148) request(false);
149) // set "no frame", because stream is going to change
150) m_fileOutStream.setFrame(NULL);
151)
152) // read new protocol from file
153) m_fileProtocol.update();
154)
155) // send new stream request
156) request(true);
157) }
158)
159) /// create socket and bind it
160) template<typename ADDR, typename SOCK>
161) void Receiver<ADDR, SOCK>::createSock()
162) {
163) // create socket
164) if (!m_pSock) {
165) m_pSock = new SOCK();
166) if (!m_pSock)
167) return;
168) }
169)
170) // get bind address from bind address setting file
171) m_fileBind.update();
172) if (!m_fileBind.m_valid) {
173) delete m_pSock;
174) m_pSock = NULL;
175) return;
176) }
177)
178) // bind socket
179) if (!m_pSock->bind(m_fileBind.m_obj)) {
180) delete m_pSock;
181) m_pSock = NULL;
182) return;
183) }
184)
185) // request callback on recepetion
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
186) m_mgrs.m_callMgr.requestIoReadCall(this, m_pSock);
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
187)
188) // send new stream request
189) request(true);
190) }
191)
192) /// destroy socket
193) template<typename ADDR, typename SOCK>
194) void Receiver<ADDR, SOCK>::destroySock()
195) {
196) // cancel old stream request
197) request(false);
198) // set "no frame", because stream is going to change
199) m_fileOutStream.setFrame(NULL);
200)
201) // cancel callback request
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
202) m_mgrs.m_callMgr.cancelIoReadCall(this, m_pSock);
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
203)
204) // destroy socket
205) if (m_pSock) {
206) delete m_pSock;
207) m_pSock = NULL;
208) }
209) }
210)
211) /**
212) * @brief request stream / cancel stream request
213) * @param req if to send a request (otherwise: cancel request)
214) */
215) template<typename ADDR, typename SOCK>
216) void Receiver<ADDR, SOCK>::request(bool req)
217) {
218) etBlinkenPacket type;
219) char buf[65536];
220) int len;
221) std::string data;
222)
223) // check that there is a socket, a source address and a protocol
224) if (!m_pSock || !m_fileSrc.m_valid || !m_fileProtocol.m_valid)
225) return;
226)
227) // assemble request / cancel request
228) type = req ? BlinkenPacketRequest: BlinkenPacketEndRequest;
229) len = BlinkenProtoMakePacket(m_fileProtocol.m_obj.m_proto, type,
230) buf, sizeof(buf));
231) if (len < 0)
232) len = 0;
233) data.assign(buf, len);
234)
235) // send request / cancel request
236) m_pSock->send(data, m_fileSrc.m_obj);
237)
238) // set time for next request
239) if (req)
240) m_nextReq = Time::now() + Time(10);
241) m_needNextReq = req;
242) updateTimeCallback();
243) }
244)
245) /// receive data from socket
246) template<typename ADDR, typename SOCK>
247) void Receiver<ADDR, SOCK>::receiveFromSock()
248) {
249) etBlinkenProto proto;
250) etBlinkenPacket packet;
251) std::string data;
252) ADDR addr;
253)
254) // make sure socket exists
255) if (!m_pSock)
256) return;
257)
258) // receive (leave if no reception)
259) if (!m_pSock->recv(data, addr))
260) return;
261)
262) // detect packet type and protocol
263) BlinkenProtoDetectPacket(data.c_str(), data.size(), &proto, &packet);
264)
265) // check source address (if configured)
266) if (m_fileSrc.m_valid && addr != m_fileSrc.m_obj)
267) return; // mismatch
268)
269) // check protocol (if configured)
270) if (m_fileProtocol.m_valid && proto != m_fileProtocol.m_obj.m_proto)
271) return; // mismatch
272)
273) switch (packet) {
274)
275) // frame -> process it
276) case BlinkenPacketFrame:
277) procFrame(data);
278) break;
279)
280) // end of stream -> pass on "no frame"
281) case BlinkenPacketStreamEnd:
282) m_fileOutStream.setFrame(NULL);
283) break;
284)
285) default:
286) break;
287)
288) } // switch (packet)
289) }
290)
291) /**
292) * @brief process frame
293) * @param[in] data received frame protocol data
294) */
295) template<typename ADDR, typename SOCK>
296) void Receiver<ADDR, SOCK>::procFrame(const std::string &data)
297) {
298) stBlinkenFrame *pFrame;
299)
300) // parse frame
301) pFrame = BlinkenFrameFromNetwork(data.c_str(), data.size(), NULL);
302) if (!pFrame)
303) return;
304)
305) // pass on frame
306) m_fileOutStream.setFrame(pFrame);
307)
308) // set new stream timeout
309) m_timeout = Time::now() + Time(5);
310) m_needTimeout = true;
311) updateTimeCallback();
312) }
313)
314) /// update time callback
315) template<typename ADDR, typename SOCK>
316) void Receiver<ADDR, SOCK>::updateTimeCallback()
317) {
318) if (m_needTimeout)
319) if (m_needNextReq)
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
320) m_mgrs.m_callMgr.requestTimeCall(this, m_timeout < m_nextReq ? m_timeout
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
321) : m_nextReq);
322) else
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
323) m_mgrs.m_callMgr.requestTimeCall(this, m_timeout);
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
324) else
325) if (m_needNextReq)
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
326) m_mgrs.m_callMgr.requestTimeCall(this, m_nextReq);
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
327) else
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
328) m_mgrs.m_callMgr.cancelTimeCall(this);
|