implemented stream receiver
Stefan Schuermans

Stefan Schuermans commited on 2011-12-21 22:47:08
Showing 10 changed files, with 547 additions and 2 deletions.

... ...
@@ -92,6 +92,7 @@
92 92
         <li><a href="priority.html">Priority</a></li>
93 93
         <li><a href="resizer.html">Resizer</a></li>
94 94
         <li><a href="scaler.html">Scaler</a></li>
95
+        <li><a href="udp4receiver.html">UDP4 Receiver</a></li>
95 96
         <li><a href="udp4sender.html">UDP4 Sender</a></li>
96 97
       </ul>
97 98
     </p>
... ...
@@ -0,0 +1,54 @@
1
+<html>
2
+  <head>
3
+    <title>Blinker - UDPv4 Receiver</title>
4
+  </head>
5
+  <body>
6
+    <h1>Blinker - UDPv4 Receiver</h1>
7
+    <p>
8
+      The UDPv4 receiver module reeceives a stream using UDP version 4 across
9
+      a network.
10
+      It supports the static and dynamic variants of the BLP, EBLP and MCUF
11
+      protocols.
12
+    </p>
13
+    <h2>Configuration</h2>
14
+    <p>
15
+      The configuration of the UDPv4 receiver module with name
16
+      <code>NAME</code> is located in the <code>udp4receivers/NAME</code>
17
+      subdirectory.
18
+    </p>
19
+    <h3>Output Stream</h3>
20
+    <p>
21
+      The file <code>outstream</code> contains the name of the stream to
22
+      write the received network stream to.
23
+    </p>
24
+    <h3>Protocol</h3>
25
+    <p>
26
+      The protocol to use is configured in the file <code>protocol</code>.
27
+      It can contain the string <code>blp</code>, <code>eblp</code> or
28
+      <code>mcuf</code> to select the BLP, EBLP or MCUF protocol respectively.
29
+      If the file is not present, all protocols are accepted.
30
+    </p>
31
+    <h3>Bind Address</h3>
32
+    <p>
33
+      The file <code>bind</code> contains the local address to bind to.
34
+      It must contain the IP address and the port, i.e. a string
35
+      <code>&lt;IP&gt;:&lt;port&gt;</code> (hostnames are not supported).
36
+      If the local address should be determined automatically, the file
37
+      can contain <code>0.0.0.0:0</code>.
38
+    </p>
39
+    <h3>Source address</h3>
40
+    <p>
41
+      The file <code>source</code> contains the source address
42
+      as a string <code>&lt;IP&gt;:&lt;port&gt;</code>
43
+      (hostnames are not supported).
44
+      <br>
45
+      If the source address file exists, only a stream from the specified
46
+      address is accepted.
47
+      Otherwise, the stream can be sent from any address.
48
+      <br>
49
+      If both the source address and the protocol are given, dynamic
50
+      stream request packets are sent to the source address.
51
+    </p>
52
+  </body>
53
+</html>
54
+
... ...
@@ -0,0 +1 @@
1
+127.0.0.1:23232
... ...
@@ -0,0 +1 @@
1
+127.0.0.1:23235
... ...
@@ -0,0 +1,123 @@
1
+/* Blinker
2
+   Copyright 2011 Stefan Schuermans <stefan@blinkenarea.org>
3
+   Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html
4
+   a blinkenarea.org project */
5
+
6
+#ifndef BLINKER_RECEIVER_H
7
+#define BLINKER_RECEVIER_H
8
+
9
+#include <list>
10
+#include <map>
11
+#include <string>
12
+
13
+#include <BlinkenLib/BlinkenProto.h>
14
+#include <BlinkenLib/BlinkenFrame.h>
15
+
16
+#include "CallMgr.h"
17
+#include "Directory.h"
18
+#include "File.h"
19
+#include "IoCallee.h"
20
+#include "Module.h"
21
+#include "OutStreamFile.h"
22
+#include "Protocol.h"
23
+#include "ProtocolFile.h"
24
+#include "SettingFile.h"
25
+#include "StreamMgr.h"
26
+#include "Time.h"
27
+#include "TimeCallee.h"
28
+
29
+namespace Blinker {
30
+
31
+/// stream receiver
32
+template<typename ADDR, typename SOCK>
33
+class Receiver: public IoCallee, public Module, public TimeCallee
34
+{
35
+protected:
36
+  /// type for address setting file
37
+  typedef SettingFile<ADDR> AddrFile;
38
+
39
+public:
40
+  /**
41
+   * @brief constructor
42
+   * @param[in] callMgr callback manager
43
+   * @param[in] streamMgr stream manager
44
+   * @param[in] dirBase base directory
45
+   */
46
+  Receiver(CallMgr &callMgr, StreamMgr &streamMgr, const Directory &dirBase);
47
+
48
+  /// virtual destructor
49
+  virtual ~Receiver();
50
+
51
+private:
52
+  /// copy constructor disabled
53
+  Receiver(const Receiver &that);
54
+
55
+  /// assignment operator disabled
56
+  const Receiver & operator=(const Receiver &that);
57
+
58
+public:
59
+  /// check for update of configuration
60
+  virtual void updateConfig();
61
+
62
+  /// callback when requested time reached
63
+  virtual void timeCall();
64
+
65
+  /**
66
+   * @brief callback when I/O object is readable
67
+   * @param[in] io I/O object that is readable
68
+   */
69
+  virtual void ioReadCall(Io *io);
70
+
71
+  /**
72
+   * @brief callback when I/O object is writable
73
+   * @param[in] io I/O object that is writable
74
+   */
75
+  virtual void ioWriteCall(Io *io);
76
+
77
+protected:
78
+  /// (re-)read protocol
79
+  void readProto();
80
+
81
+  /// (re-)read source address
82
+  void readSrc();
83
+
84
+  /// create socket and bind it
85
+  void createSock();
86
+
87
+  /// destroy socket
88
+  void destroySock();
89
+
90
+  /**
91
+   * @brief request stream / cancel stream request
92
+   * @param req if to send a request (otherwise: cancel request)
93
+   */
94
+  void request(bool req);
95
+
96
+  /// receive data from socket
97
+  void receiveFromSock();
98
+
99
+  /**
100
+   * @brief process frame
101
+   * @param[in] data received frame protocol data
102
+   */
103
+  void procFrame(const std::string &data);
104
+
105
+  /// update time callback
106
+  void updateTimeCallback();
107
+
108
+protected:
109
+  OutStreamFile m_fileOutStream; ///< output stream name file
110
+  AddrFile      m_fileBind;      ///< bind address file
111
+  AddrFile      m_fileSrc;       ///< source address file
112
+  ProtocolFile  m_fileProtocol;  ///< protocol file
113
+  SOCK          *m_pSock;        ///< socket to use for sending streams
114
+  Time          m_timeout;       ///< timeout of network stream
115
+  bool          m_needTimeout;   ///< if a timeout is needed
116
+  Time          m_nextReq;       ///< when to send next request
117
+  bool          m_needNextReq;   ///< if a next request is needed
118
+}; // class Receiver
119
+
120
+} // namespace Blinker
121
+
122
+#endif // #ifndef BLINKER_RECEIVER_H
123
+
... ...
@@ -0,0 +1,339 @@
1
+/* Blinker
2
+   Copyright 2011 Stefan Schuermans <stefan@blinkenarea.org>
3
+   Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html
4
+   a blinkenarea.org project */
5
+
6
+#ifndef BLINKER_RECEIVER_IMPL_H
7
+#define BLINKER_RECEIVER_IMPL_H
8
+
9
+#include <list>
10
+#include <map>
11
+#include <string>
12
+
13
+#include <BlinkenLib/BlinkenProto.h>
14
+#include <BlinkenLib/BlinkenFrame.h>
15
+
16
+#include "CallMgr.h"
17
+#include "Directory.h"
18
+#include "File.h"
19
+#include "IoCallee.h"
20
+#include "Module.h"
21
+#include "OutStreamFile.h"
22
+#include "Protocol.h"
23
+#include "ProtocolFile.h"
24
+#include "Receiver.h"
25
+#include "SettingFile.h"
26
+#include "StreamMgr.h"
27
+#include "Time.h"
28
+#include "TimeCallee.h"
29
+
30
+namespace Blinker {
31
+
32
+/**
33
+ * @brief constructor
34
+ * @param[in] callMgr callback manager
35
+ * @param[in] streamMgr stream manager
36
+ * @param[in] dirBase base directory
37
+ */
38
+template<typename ADDR, typename SOCK>
39
+Receiver<ADDR, SOCK>::Receiver(CallMgr &callMgr, StreamMgr &streamMgr,
40
+                           const Directory &dirBase):
41
+  Module(callMgr, streamMgr, dirBase),
42
+  m_fileOutStream(dirBase.getFile("outstream"), streamMgr),
43
+  m_fileBind(dirBase.getFile("bind")),
44
+  m_fileSrc(dirBase.getFile("source")),
45
+  m_fileProtocol(dirBase.getFile("protocol")),
46
+  m_pSock(NULL),
47
+  m_needTimeout(false),
48
+  m_needNextReq(false)
49
+{
50
+  // read protocol and source address
51
+  readProto();
52
+  readSrc();
53
+
54
+  // create and bind socket
55
+  createSock();
56
+}
57
+
58
+/// virtual destructor
59
+template<typename ADDR, typename SOCK>
60
+Receiver<ADDR, SOCK>::~Receiver()
61
+{
62
+  // cancel stream request
63
+  request(false);
64
+
65
+  // destroy socket
66
+  destroySock();
67
+}
68
+
69
+/// check for update of configuration
70
+template<typename ADDR, typename SOCK>
71
+void Receiver<ADDR, SOCK>::updateConfig()
72
+{
73
+  // output stream name file was modified -> re-get output stream
74
+  if (m_fileOutStream.checkModified())
75
+    m_fileOutStream.update();
76
+
77
+  // bind address file was modified -> re-create socket
78
+  if (m_fileBind.checkModified()) {
79
+    destroySock();
80
+    createSock();
81
+  }
82
+
83
+  // source address file was modified -> re-read source address
84
+  if (m_fileSrc.checkModified())
85
+    readSrc();
86
+
87
+  // protocol file was modified -> re-read protocol
88
+  if (m_fileProtocol.checkModified())
89
+    readProto();
90
+}
91
+
92
+/// callback when requested time reached
93
+template<typename ADDR, typename SOCK>
94
+void Receiver<ADDR, SOCK>::timeCall()
95
+{
96
+  Time now = Time::now();
97
+
98
+  // stream timeout
99
+  if (m_needTimeout && m_timeout <= now) {
100
+    m_needTimeout = false;
101
+    m_fileOutStream.setFrame(NULL);
102
+  }
103
+
104
+  // re-request stream
105
+  if (m_needNextReq && m_nextReq <= now)
106
+    request(true);
107
+
108
+  updateTimeCallback();
109
+}
110
+
111
+/**
112
+ * @brief callback when I/O object is readable
113
+ * @param[in] io I/O object that is readable
114
+ */
115
+template<typename ADDR, typename SOCK>
116
+void Receiver<ADDR, SOCK>::ioReadCall(Io *io)
117
+{
118
+  // reception on socket
119
+  if (io == m_pSock)
120
+    receiveFromSock();
121
+}
122
+
123
+/**
124
+ * @brief callback when I/O object is writable
125
+ * @param[in] io I/O object that is writable
126
+ */
127
+template<typename ADDR, typename SOCK>
128
+void Receiver<ADDR, SOCK>::ioWriteCall(Io *io)
129
+{
130
+  (void)io; // unused
131
+}
132
+
133
+/// (re-)read source address
134
+template<typename ADDR, typename SOCK>
135
+void Receiver<ADDR, SOCK>::readSrc()
136
+{
137
+  // cancel old stream request
138
+  request(false);
139
+  // set "no frame", because stream is going to change
140
+  m_fileOutStream.setFrame(NULL);
141
+
142
+  m_fileSrc.update();
143
+
144
+  // send new stream request
145
+  request(true);
146
+}
147
+
148
+/// (re-)read protocol
149
+template<typename ADDR, typename SOCK>
150
+void Receiver<ADDR, SOCK>::readProto()
151
+{
152
+  // cancel old stream request
153
+  request(false);
154
+  // set "no frame", because stream is going to change
155
+  m_fileOutStream.setFrame(NULL);
156
+
157
+  // read new protocol from file
158
+  m_fileProtocol.update();
159
+
160
+  // send new stream request
161
+  request(true);
162
+}
163
+
164
+/// create socket and bind it
165
+template<typename ADDR, typename SOCK>
166
+void Receiver<ADDR, SOCK>::createSock()
167
+{
168
+  // create socket
169
+  if (!m_pSock) {
170
+    m_pSock = new SOCK();
171
+    if (!m_pSock)
172
+      return;
173
+  }
174
+
175
+  // get bind address from bind address setting file
176
+  m_fileBind.update();
177
+  if (!m_fileBind.m_valid) {
178
+    delete m_pSock;
179
+    m_pSock = NULL;
180
+    return;
181
+  }
182
+
183
+  // bind socket
184
+  if (!m_pSock->bind(m_fileBind.m_obj)) {
185
+    delete m_pSock;
186
+    m_pSock = NULL;
187
+    return;
188
+  }
189
+
190
+  // request callback on recepetion
191
+  m_callMgr.requestIoReadCall(this, m_pSock);
192
+
193
+  // send new stream request
194
+  request(true);
195
+}
196
+
197
+/// destroy socket
198
+template<typename ADDR, typename SOCK>
199
+void Receiver<ADDR, SOCK>::destroySock()
200
+{
201
+  // cancel old stream request
202
+  request(false);
203
+  // set "no frame", because stream is going to change
204
+  m_fileOutStream.setFrame(NULL);
205
+
206
+  // cancel callback request
207
+  m_callMgr.cancelIoReadCall(this, m_pSock);
208
+
209
+  // destroy socket
210
+  if (m_pSock) {
211
+    delete m_pSock;
212
+    m_pSock = NULL;
213
+  }
214
+}
215
+
216
+/**
217
+ * @brief request stream / cancel stream request
218
+ * @param req if to send a request (otherwise: cancel request)
219
+ */
220
+template<typename ADDR, typename SOCK>
221
+void Receiver<ADDR, SOCK>::request(bool req)
222
+{
223
+  etBlinkenPacket type;
224
+  char buf[65536];
225
+  int len;
226
+  std::string data;
227
+
228
+  // check that there is a socket, a source address and a protocol
229
+  if (!m_pSock || !m_fileSrc.m_valid || !m_fileProtocol.m_valid)
230
+    return;
231
+
232
+  // assemble request / cancel request
233
+  type = req ? BlinkenPacketRequest: BlinkenPacketEndRequest;
234
+  len = BlinkenProtoMakePacket(m_fileProtocol.m_obj.m_proto, type,
235
+                               buf, sizeof(buf));
236
+  if (len < 0)
237
+    len = 0;
238
+  data.assign(buf, len);
239
+
240
+  // send request / cancel request
241
+  m_pSock->send(data, m_fileSrc.m_obj);
242
+
243
+  // set time for next request
244
+  if (req)
245
+    m_nextReq = Time::now() + Time(10);
246
+  m_needNextReq = req;
247
+  updateTimeCallback();
248
+}
249
+
250
+/// receive data from socket
251
+template<typename ADDR, typename SOCK>
252
+void Receiver<ADDR, SOCK>::receiveFromSock()
253
+{
254
+  etBlinkenProto proto;
255
+  etBlinkenPacket packet;
256
+  std::string data;
257
+  ADDR addr;
258
+
259
+  // make sure socket exists
260
+  if (!m_pSock)
261
+    return;
262
+
263
+  // receive (leave if no reception)
264
+  if (!m_pSock->recv(data, addr))
265
+    return;
266
+
267
+  // detect packet type and protocol
268
+  BlinkenProtoDetectPacket(data.c_str(), data.size(), &proto, &packet);
269
+
270
+  // check source address (if configured)
271
+  if (m_fileSrc.m_valid && addr != m_fileSrc.m_obj)
272
+    return; // mismatch
273
+
274
+  // check protocol (if configured)
275
+  if (m_fileProtocol.m_valid && proto != m_fileProtocol.m_obj.m_proto)
276
+    return; // mismatch
277
+
278
+  switch (packet) {
279
+
280
+    // frame -> process it
281
+    case BlinkenPacketFrame:
282
+      procFrame(data);
283
+      break;
284
+
285
+    // end of stream -> pass on "no frame"
286
+    case BlinkenPacketStreamEnd:
287
+      m_fileOutStream.setFrame(NULL);
288
+      break;
289
+
290
+    default:
291
+      break;
292
+
293
+  } // switch (packet)
294
+}
295
+
296
+/**
297
+ * @brief process frame
298
+ * @param[in] data received frame protocol data
299
+ */
300
+template<typename ADDR, typename SOCK>
301
+void Receiver<ADDR, SOCK>::procFrame(const std::string &data)
302
+{
303
+  stBlinkenFrame *pFrame;
304
+
305
+  // parse frame
306
+  pFrame = BlinkenFrameFromNetwork(data.c_str(), data.size(), NULL);
307
+  if (!pFrame)
308
+    return;
309
+
310
+  // pass on frame
311
+  m_fileOutStream.setFrame(pFrame);
312
+
313
+  // set new stream timeout
314
+  m_timeout = Time::now() + Time(5);
315
+  m_needTimeout = true;
316
+  updateTimeCallback();
317
+}
318
+
319
+/// update time callback
320
+template<typename ADDR, typename SOCK>
321
+void Receiver<ADDR, SOCK>::updateTimeCallback()
322
+{
323
+  if (m_needTimeout)
324
+    if (m_needNextReq)
325
+      m_callMgr.requestTimeCall(this, m_timeout < m_nextReq ? m_timeout
326
+                                                            : m_nextReq);
327
+    else
328
+      m_callMgr.requestTimeCall(this, m_timeout);
329
+  else
330
+    if (m_needNextReq)
331
+      m_callMgr.requestTimeCall(this, m_nextReq);
332
+    else
333
+      m_callMgr.cancelTimeCall(this);
334
+}
335
+
336
+} // namespace Blinker
337
+
338
+#endif // #ifndef BLINKER_RECEIVER_H
339
+
... ...
@@ -0,0 +1,21 @@
1
+/* Blinker
2
+   Copyright 2011 Stefan Schuermans <stefan@blinkenarea.org>
3
+   Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html
4
+   a blinkenarea.org project */
5
+
6
+#ifndef UDP4RECEIVER_H
7
+#define UDP4RECEIVER_H
8
+
9
+#include "Receiver_impl.h"
10
+#include "Udp4Addr.h"
11
+#include "Udp4Sock.h"
12
+
13
+namespace Blinker {
14
+
15
+/// UDP v4 stream sender
16
+typedef Receiver<Udp4Addr, Udp4Sock> Udp4Receiver;
17
+
18
+} // namespace Blinker
19
+
20
+#endif // #ifndef UDP4RECEIVER_H
21
+
... ...
@@ -19,6 +19,7 @@
19 19
 #include "Resizer.h"
20 20
 #include "Scaler.h"
21 21
 #include "StreamMgr.h"
22
+#include "Udp4Receiver.h"
22 23
 #include "Udp4Sender.h"
23 24
 
24 25
 using namespace Blinker;
... ...
@@ -44,9 +45,11 @@ void run(const std::string &dirConfig)
44 45
                                   dirCfg.getSubdir("priorities"));
45 46
   ModuleMgr<Resizer> resizerMgr(callMgr, streamMgr,
46 47
                                 dirCfg.getSubdir("resizers"));
47
-  ModuleMgr<Scaler> scalersMgr(callMgr, streamMgr,
48
+  ModuleMgr<Scaler> scalerMgr(callMgr, streamMgr,
48 49
                               dirCfg.getSubdir("scalers"));
49
-  ModuleMgr<Udp4Sender> udp4Mgr(callMgr, streamMgr,
50
+  ModuleMgr<Udp4Receiver> udp4ReceiverMgr(callMgr, streamMgr,
51
+                                          dirCfg.getSubdir("udp4receivers"));
52
+  ModuleMgr<Udp4Sender> udp4SenderMgr(callMgr, streamMgr,
50 53
                                       dirCfg.getSubdir("udp4senders"));
51 54
 
52 55
   callMgr.run();
53 56