implemented stream receiver
Stefan Schuermans authored 12 years ago
|
1) /* Blinker
2) Copyright 2011 Stefan Schuermans <stefan@blinkenarea.org>
3) Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html
4) a blinkenarea.org project */
5)
6) #ifndef BLINKER_RECEIVER_IMPL_H
7) #define BLINKER_RECEIVER_IMPL_H
8)
9) #include <list>
10) #include <map>
11) #include <string>
12)
13) #include <BlinkenLib/BlinkenProto.h>
14) #include <BlinkenLib/BlinkenFrame.h>
15)
16) #include "Directory.h"
17) #include "File.h"
18) #include "IoCallee.h"
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
19) #include "Mgrs.h"
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
20) #include "Module.h"
21) #include "OutStreamFile.h"
22) #include "Protocol.h"
23) #include "ProtocolFile.h"
24) #include "Receiver.h"
25) #include "SettingFile.h"
26) #include "Time.h"
27) #include "TimeCallee.h"
28)
29) namespace Blinker {
30)
31) /**
32) * @brief constructor
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
33) * @param[in] mgrs managers
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
34) * @param[in] dirBase base directory
35) */
36) template<typename ADDR, typename SOCK>
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
37) Receiver<ADDR, SOCK>::Receiver(Mgrs &mgrs, const Directory &dirBase):
38) Module(mgrs, dirBase),
39) m_fileOutStream(dirBase.getFile("outstream"), mgrs.m_streamMgr),
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
40) m_fileBind(dirBase.getFile("bind")),
41) m_fileSrc(dirBase.getFile("source")),
42) m_fileProtocol(dirBase.getFile("protocol")),
43) m_pSock(NULL),
44) m_needTimeout(false),
45) m_needNextReq(false)
46) {
47) // read protocol and source address
48) readProto();
49) readSrc();
50)
51) // create and bind socket
52) createSock();
53) }
54)
55) /// virtual destructor
56) template<typename ADDR, typename SOCK>
57) Receiver<ADDR, SOCK>::~Receiver()
58) {
59) // cancel stream request
60) request(false);
61)
62) // destroy socket
63) destroySock();
64) }
65)
66) /// check for update of configuration
67) template<typename ADDR, typename SOCK>
68) void Receiver<ADDR, SOCK>::updateConfig()
69) {
70) // output stream name file was modified -> re-get output stream
71) if (m_fileOutStream.checkModified())
72) m_fileOutStream.update();
73)
74) // bind address file was modified -> re-create socket
75) if (m_fileBind.checkModified()) {
76) destroySock();
77) createSock();
78) }
79)
80) // source address file was modified -> re-read source address
81) if (m_fileSrc.checkModified())
82) readSrc();
83)
84) // protocol file was modified -> re-read protocol
85) if (m_fileProtocol.checkModified())
86) readProto();
87) }
88)
89) /// callback when requested time reached
90) template<typename ADDR, typename SOCK>
91) void Receiver<ADDR, SOCK>::timeCall()
92) {
93) Time now = Time::now();
94)
95) // stream timeout
96) if (m_needTimeout && m_timeout <= now) {
97) m_needTimeout = false;
98) m_fileOutStream.setFrame(NULL);
99) }
100)
101) // re-request stream
102) if (m_needNextReq && m_nextReq <= now)
103) request(true);
104)
105) updateTimeCallback();
106) }
107)
108) /**
109) * @brief callback when I/O object is readable
110) * @param[in] io I/O object that is readable
111) */
112) template<typename ADDR, typename SOCK>
113) void Receiver<ADDR, SOCK>::ioReadCall(Io *io)
114) {
115) // reception on socket
116) if (io == m_pSock)
117) receiveFromSock();
118) }
119)
120) /**
121) * @brief callback when I/O object is writable
122) * @param[in] io I/O object that is writable
123) */
124) template<typename ADDR, typename SOCK>
125) void Receiver<ADDR, SOCK>::ioWriteCall(Io *io)
126) {
127) (void)io; // unused
128) }
129)
130) /// (re-)read source address
131) template<typename ADDR, typename SOCK>
132) void Receiver<ADDR, SOCK>::readSrc()
133) {
134) // cancel old stream request
135) request(false);
136) // set "no frame", because stream is going to change
137) m_fileOutStream.setFrame(NULL);
138)
139) m_fileSrc.update();
140)
141) // send new stream request
142) request(true);
143) }
144)
145) /// (re-)read protocol
146) template<typename ADDR, typename SOCK>
147) void Receiver<ADDR, SOCK>::readProto()
148) {
149) // cancel old stream request
150) request(false);
151) // set "no frame", because stream is going to change
152) m_fileOutStream.setFrame(NULL);
153)
154) // read new protocol from file
155) m_fileProtocol.update();
156)
157) // send new stream request
158) request(true);
159) }
160)
161) /// create socket and bind it
162) template<typename ADDR, typename SOCK>
163) void Receiver<ADDR, SOCK>::createSock()
164) {
165) // create socket
166) if (!m_pSock) {
167) m_pSock = new SOCK();
168) if (!m_pSock)
169) return;
170) }
171)
172) // get bind address from bind address setting file
173) m_fileBind.update();
174) if (!m_fileBind.m_valid) {
175) delete m_pSock;
176) m_pSock = NULL;
177) return;
178) }
179)
180) // bind socket
181) if (!m_pSock->bind(m_fileBind.m_obj)) {
182) delete m_pSock;
183) m_pSock = NULL;
184) return;
185) }
186)
187) // request callback on recepetion
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
188) m_mgrs.m_callMgr.requestIoReadCall(this, m_pSock);
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
189)
190) // send new stream request
191) request(true);
192) }
193)
194) /// destroy socket
195) template<typename ADDR, typename SOCK>
196) void Receiver<ADDR, SOCK>::destroySock()
197) {
198) // cancel old stream request
199) request(false);
200) // set "no frame", because stream is going to change
201) m_fileOutStream.setFrame(NULL);
202)
203) // cancel callback request
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
204) m_mgrs.m_callMgr.cancelIoReadCall(this, m_pSock);
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
205)
206) // destroy socket
207) if (m_pSock) {
208) delete m_pSock;
209) m_pSock = NULL;
210) }
211) }
212)
213) /**
214) * @brief request stream / cancel stream request
215) * @param req if to send a request (otherwise: cancel request)
216) */
217) template<typename ADDR, typename SOCK>
218) void Receiver<ADDR, SOCK>::request(bool req)
219) {
220) etBlinkenPacket type;
221) char buf[65536];
222) int len;
223) std::string data;
224)
225) // check that there is a socket, a source address and a protocol
226) if (!m_pSock || !m_fileSrc.m_valid || !m_fileProtocol.m_valid)
227) return;
228)
229) // assemble request / cancel request
230) type = req ? BlinkenPacketRequest: BlinkenPacketEndRequest;
231) len = BlinkenProtoMakePacket(m_fileProtocol.m_obj.m_proto, type,
232) buf, sizeof(buf));
233) if (len < 0)
234) len = 0;
235) data.assign(buf, len);
236)
237) // send request / cancel request
238) m_pSock->send(data, m_fileSrc.m_obj);
239)
240) // set time for next request
241) if (req)
242) m_nextReq = Time::now() + Time(10);
243) m_needNextReq = req;
244) updateTimeCallback();
245) }
246)
247) /// receive data from socket
248) template<typename ADDR, typename SOCK>
249) void Receiver<ADDR, SOCK>::receiveFromSock()
250) {
251) etBlinkenProto proto;
252) etBlinkenPacket packet;
253) std::string data;
254) ADDR addr;
255)
256) // make sure socket exists
257) if (!m_pSock)
258) return;
259)
260) // receive (leave if no reception)
261) if (!m_pSock->recv(data, addr))
262) return;
263)
264) // detect packet type and protocol
265) BlinkenProtoDetectPacket(data.c_str(), data.size(), &proto, &packet);
266)
267) // check source address (if configured)
268) if (m_fileSrc.m_valid && addr != m_fileSrc.m_obj)
269) return; // mismatch
270)
271) // check protocol (if configured)
272) if (m_fileProtocol.m_valid && proto != m_fileProtocol.m_obj.m_proto)
273) return; // mismatch
274)
275) switch (packet) {
276)
277) // frame -> process it
278) case BlinkenPacketFrame:
279) procFrame(data);
280) break;
281)
282) // end of stream -> pass on "no frame"
283) case BlinkenPacketStreamEnd:
284) m_fileOutStream.setFrame(NULL);
285) break;
286)
287) default:
288) break;
289)
290) } // switch (packet)
291) }
292)
293) /**
294) * @brief process frame
295) * @param[in] data received frame protocol data
296) */
297) template<typename ADDR, typename SOCK>
298) void Receiver<ADDR, SOCK>::procFrame(const std::string &data)
299) {
300) stBlinkenFrame *pFrame;
301)
302) // parse frame
303) pFrame = BlinkenFrameFromNetwork(data.c_str(), data.size(), NULL);
304) if (!pFrame)
305) return;
306)
307) // pass on frame
308) m_fileOutStream.setFrame(pFrame);
309)
310) // set new stream timeout
311) m_timeout = Time::now() + Time(5);
312) m_needTimeout = true;
313) updateTimeCallback();
314) }
315)
316) /// update time callback
317) template<typename ADDR, typename SOCK>
318) void Receiver<ADDR, SOCK>::updateTimeCallback()
319) {
320) if (m_needTimeout)
321) if (m_needNextReq)
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
322) m_mgrs.m_callMgr.requestTimeCall(this, m_timeout < m_nextReq ? m_timeout
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
323) : m_nextReq);
324) else
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
325) m_mgrs.m_callMgr.requestTimeCall(this, m_timeout);
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
326) else
327) if (m_needNextReq)
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
328) m_mgrs.m_callMgr.requestTimeCall(this, m_nextReq);
|
implemented stream receiver
Stefan Schuermans authored 12 years ago
|
329) else
|
put all managers in one str...
Stefan Schuermans authored 12 years ago
|
330) m_mgrs.m_callMgr.cancelTimeCall(this);
|