implement UDP synchronization receivers
Stefan Schuermans

Stefan Schuermans commited on 2014-01-03 16:19:05
Showing 15 changed files, with 390 additions and 3 deletions.

... ...
@@ -107,7 +107,7 @@ protected:
107 107
   AddrFile      m_fileBind;      ///< bind address file
108 108
   AddrFile      m_fileSrc;       ///< source address file
109 109
   ProtocolFile  m_fileProtocol;  ///< protocol file
110
-  SOCK          *m_pSock;        ///< socket to use for sending streams
110
+  SOCK          *m_pSock;        ///< socket to use for receiving stream
111 111
   Time          m_timeout;       ///< timeout of network stream
112 112
   bool          m_needTimeout;   ///< if a timeout is needed
113 113
   Time          m_nextReq;       ///< when to send next request
... ...
@@ -0,0 +1,92 @@
1
+/* Blinker
2
+   Copyright 2011-2014 Stefan Schuermans <stefan@blinkenarea.org>
3
+   Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html
4
+   a blinkenarea.org project */
5
+
6
+#ifndef BLINKER_SYNCRECEIVER_H
7
+#define BLINKER_SYNCRECEIVER_H
8
+
9
+#include <string>
10
+
11
+#include "Directory.h"
12
+#include "File.h"
13
+#include "IoCallee.h"
14
+#include "Mgrs.h"
15
+#include "Module.h"
16
+#include "OutSyncFile.h"
17
+#include "Protocol.h"
18
+#include "ProtocolFile.h"
19
+#include "SettingFile.h"
20
+
21
+namespace Blinker {
22
+
23
+/// stream receiver
24
+template<typename ADDR, typename SOCK>
25
+class SyncReceiver: public IoCallee, public Module
26
+{
27
+protected:
28
+  /// type for address setting file
29
+  typedef SettingFile<ADDR> AddrFile;
30
+
31
+public:
32
+  /**
33
+   * @brief constructor
34
+   * @param[in] name module name
35
+   * @param[in] mgrs managers
36
+   * @param[in] dirBase base directory
37
+   */
38
+  SyncReceiver(const std::string &name, Mgrs &mgrs, const Directory &dirBase);
39
+
40
+  /// virtual destructor
41
+  virtual ~SyncReceiver();
42
+
43
+private:
44
+  /// copy constructor disabled
45
+  SyncReceiver(const SyncReceiver &that);
46
+
47
+  /// assignment operator disabled
48
+  const SyncReceiver & operator=(const SyncReceiver &that);
49
+
50
+public:
51
+  /// check for update of configuration
52
+  virtual void updateConfig();
53
+
54
+  /**
55
+   * @brief callback when I/O object is readable
56
+   * @param[in] io I/O object that is readable
57
+   */
58
+  virtual void ioReadCall(Io *io);
59
+
60
+  /**
61
+   * @brief callback when I/O object is writable
62
+   * @param[in] io I/O object that is writable
63
+   */
64
+  virtual void ioWriteCall(Io *io);
65
+
66
+protected:
67
+  /// create socket and bind it
68
+  void createSock();
69
+
70
+  /// destroy socket
71
+  void destroySock();
72
+
73
+  /// receive data from socket
74
+  void receiveFromSock();
75
+
76
+  /**
77
+   * @brief process synchronization information
78
+   * @param[in] data received synchronization data
79
+   */
80
+  void procInfo(const std::string &data);
81
+
82
+protected:
83
+  OutSyncFile m_fileOutSync; ///< output sync stream name file
84
+  AddrFile    m_fileBind;    ///< bind address file
85
+  AddrFile    m_fileSrc;     ///< source address file
86
+  SOCK        *m_pSock;      ///< socket to use for receiving sync stream
87
+}; // class SyncReceiver
88
+
89
+} // namespace Blinker
90
+
91
+#endif // #ifndef BLINKER_SYNCRECEIVER_H
92
+
... ...
@@ -0,0 +1,206 @@
1
+/* Blinker
2
+   Copyright 2011-2014 Stefan Schuermans <stefan@blinkenarea.org>
3
+   Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html
4
+   a blinkenarea.org project */
5
+
6
+#ifndef BLINKER_SYNCRECEIVER_IMPL_H
7
+#define BLINKER_SYNCRECEIVER_IMPL_H
8
+
9
+#include <stdint.h>
10
+#include <string>
11
+#include <string.h>
12
+
13
+#include "Directory.h"
14
+#include "File.h"
15
+#include "IoCallee.h"
16
+#include "Mgrs.h"
17
+#include "Module.h"
18
+#include "NetHost.h"
19
+#include "OutSyncFile.h"
20
+#include "Protocol.h"
21
+#include "ProtocolFile.h"
22
+#include "SettingFile.h"
23
+#include "SyncRecv.h"
24
+#include "SyncReceiver.h"
25
+
26
+namespace Blinker {
27
+
28
+/**
29
+ * @brief constructor
30
+ * @param[in] name module name
31
+ * @param[in] mgrs managers
32
+ * @param[in] dirBase base directory
33
+ */
34
+template<typename ADDR, typename SOCK>
35
+SyncReceiver<ADDR, SOCK>::SyncReceiver(const std::string &name, Mgrs &mgrs,
36
+                                       const Directory &dirBase):
37
+  Module(name, mgrs, dirBase),
38
+  m_fileOutSync(dirBase.getFile("outsync"), mgrs.m_syncMgr),
39
+  m_fileBind(dirBase.getFile("bind")),
40
+  m_fileSrc(dirBase.getFile("source")),
41
+  m_pSock(NULL)
42
+{
43
+  // read source address
44
+  m_fileSrc.update();
45
+
46
+  // create and bind socket
47
+  createSock();
48
+}
49
+
50
+/// virtual destructor
51
+template<typename ADDR, typename SOCK>
52
+SyncReceiver<ADDR, SOCK>::~SyncReceiver()
53
+{
54
+  // destroy socket
55
+  destroySock();
56
+}
57
+
58
+/// check for update of configuration
59
+template<typename ADDR, typename SOCK>
60
+void SyncReceiver<ADDR, SOCK>::updateConfig()
61
+{
62
+  // output sync stream name file was modified -> re-get output sync stream
63
+  if (m_fileOutSync.checkModified())
64
+    m_fileOutSync.update();
65
+
66
+  // bind address file was modified -> re-create socket
67
+  if (m_fileBind.checkModified()) {
68
+    destroySock();
69
+    createSock();
70
+  }
71
+
72
+  // source address file was modified -> re-read source address
73
+  if (m_fileSrc.checkModified())
74
+    m_fileSrc.update();
75
+}
76
+
77
+/**
78
+ * @brief callback when I/O object is readable
79
+ * @param[in] io I/O object that is readable
80
+ */
81
+template<typename ADDR, typename SOCK>
82
+void SyncReceiver<ADDR, SOCK>::ioReadCall(Io *io)
83
+{
84
+  // reception on socket
85
+  if (io == m_pSock)
86
+    receiveFromSock();
87
+}
88
+
89
+/**
90
+ * @brief callback when I/O object is writable
91
+ * @param[in] io I/O object that is writable
92
+ */
93
+template<typename ADDR, typename SOCK>
94
+void SyncReceiver<ADDR, SOCK>::ioWriteCall(Io *io)
95
+{
96
+  (void)io; // unused
97
+}
98
+
99
+/// create socket and bind it
100
+template<typename ADDR, typename SOCK>
101
+void SyncReceiver<ADDR, SOCK>::createSock()
102
+{
103
+  // create socket
104
+  if (!m_pSock) {
105
+    m_pSock = new SOCK();
106
+    if (!m_pSock)
107
+      return;
108
+  }
109
+
110
+  // get bind address from bind address setting file
111
+  m_fileBind.update();
112
+  if (!m_fileBind.m_valid) {
113
+    delete m_pSock;
114
+    m_pSock = NULL;
115
+    return;
116
+  }
117
+
118
+  // bind socket
119
+  if (!m_pSock->bind(m_fileBind.m_obj)) {
120
+    delete m_pSock;
121
+    m_pSock = NULL;
122
+    return;
123
+  }
124
+
125
+  // request callback on recepetion
126
+  m_mgrs.m_callMgr.requestIoReadCall(this, m_pSock);
127
+}
128
+
129
+/// destroy socket
130
+template<typename ADDR, typename SOCK>
131
+void SyncReceiver<ADDR, SOCK>::destroySock()
132
+{
133
+  // cancel callback request
134
+  m_mgrs.m_callMgr.cancelIoReadCall(this, m_pSock);
135
+
136
+  // destroy socket
137
+  if (m_pSock) {
138
+    delete m_pSock;
139
+    m_pSock = NULL;
140
+  }
141
+}
142
+
143
+/// receive data from socket
144
+template<typename ADDR, typename SOCK>
145
+void SyncReceiver<ADDR, SOCK>::receiveFromSock()
146
+{
147
+  std::string data;
148
+  ADDR addr;
149
+
150
+  // make sure socket exists
151
+  if (!m_pSock)
152
+    return;
153
+
154
+  // receive (leave if no reception)
155
+  if (!m_pSock->recv(data, addr))
156
+    return;
157
+
158
+  // check source address (if configured)
159
+  if (m_fileSrc.m_valid && addr != m_fileSrc.m_obj)
160
+    return; // mismatch
161
+
162
+  // process received synchronization information
163
+  procInfo(data);
164
+}
165
+
166
+/**
167
+ * @brief process synchronization information
168
+ * @param[in] data received synchronization data
169
+ */
170
+template<typename ADDR, typename SOCK>
171
+void SyncReceiver<ADDR, SOCK>::procInfo(const std::string &data)
172
+{
173
+  /// Po.W.E.R. (stage play by BBM) synchronization protocol
174
+  struct PoSyPacket {
175
+    char     magic[4]; ///< fixed magic, always "PoSy"
176
+    uint32_t flags;    ///< flags, see PosSyFlags below
177
+    char     name[64]; ///< name of current piece in playlist
178
+    uint32_t pos_ms;   ///< position within current piece in milliseconds
179
+  };
180
+
181
+  /// flags for PoSyPacket
182
+  enum PoSyFlags {
183
+    PoSyPause = 0x00000001, ///< pause mode active
184
+  };
185
+
186
+  PoSyPacket     packet;
187
+  SyncRecv::Info info;
188
+
189
+  // parse frame, leave if invalid
190
+  if (data.size() < sizeof(packet))
191
+    return;
192
+  memcpy(&packet, data.c_str(), sizeof(packet));
193
+  if (memcmp(packet.magic, "PoSy", sizeof(packet.magic)) != 0)
194
+    return;
195
+  info.pause = (Net2Host32(packet.flags) & PoSyPause) != 0;
196
+  info.name = packet.name;
197
+  info.pos_ms = Net2Host32(packet.pos_ms);
198
+
199
+  // pass on sync information
200
+  m_fileOutSync.sendInfo(info);
201
+}
202
+
203
+} // namespace Blinker
204
+
205
+#endif // #ifndef BLINKER_SYNCRECEIVER_IMPL_H
206
+
... ...
@@ -12,7 +12,7 @@
12 12
 
13 13
 namespace Blinker {
14 14
 
15
-/// UDP v4 stream sender
15
+/// UDP v4 stream receiver
16 16
 typedef Receiver<Udp4Addr, Udp4Sock> Udp4Receiver;
17 17
 
18 18
 } // namespace Blinker
... ...
@@ -0,0 +1,21 @@
1
+/* Blinker
2
+   Copyright 2011-2014 Stefan Schuermans <stefan@blinkenarea.org>
3
+   Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html
4
+   a blinkenarea.org project */
5
+
6
+#ifndef UDP4SYNCRECEIVER_H
7
+#define UDP4SYNCRECEIVER_H
8
+
9
+#include "SyncReceiver_impl.h"
10
+#include "Udp4Addr.h"
11
+#include "Udp4Sock.h"
12
+
13
+namespace Blinker {
14
+
15
+/// UDP v4 synchronization stream receiver
16
+typedef SyncReceiver<Udp4Addr, Udp4Sock> Udp4SyncReceiver;
17
+
18
+} // namespace Blinker
19
+
20
+#endif // #ifndef UDP4SYNCRECEIVER_H
21
+
... ...
@@ -12,7 +12,7 @@
12 12
 
13 13
 namespace Blinker {
14 14
 
15
-/// UDP v6 stream sender
15
+/// UDP v6 stream receiver
16 16
 typedef Receiver<Udp6Addr, Udp6Sock> Udp6Receiver;
17 17
 
18 18
 } // namespace Blinker
... ...
@@ -0,0 +1,21 @@
1
+/* Blinker
2
+   Copyright 2011-2014 Stefan Schuermans <stefan@blinkenarea.org>
3
+   Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html
4
+   a blinkenarea.org project */
5
+
6
+#ifndef UDP6SYNCRECEIVER_H
7
+#define UDP6SYNCRECEIVER_H
8
+
9
+#include "SyncReceiver_impl.h"
10
+#include "Udp6Addr.h"
11
+#include "Udp6Sock.h"
12
+
13
+namespace Blinker {
14
+
15
+/// UDP v6 synchronization stream receiver
16
+typedef SyncReceiver<Udp6Addr, Udp6Sock> Udp6SyncReceiver;
17
+
18
+} // namespace Blinker
19
+
20
+#endif // #ifndef UDP6SYNCRECEIVER_H
21
+
... ...
@@ -27,9 +27,11 @@
27 27
 #include "Udp4Phone.h"
28 28
 #include "Udp4Receiver.h"
29 29
 #include "Udp4Sender.h"
30
+#include "Udp4SyncReceiver.h"
30 31
 #include "Udp6Phone.h"
31 32
 #include "Udp6Receiver.h"
32 33
 #include "Udp6Sender.h"
34
+#include "Udp6SyncReceiver.h"
33 35
 
34 36
 using namespace Blinker;
35 37
 
... ...
@@ -59,9 +61,11 @@ void run(const std::string &dirConfig)
59 61
   MODULEMGR(Udp4Phone,        udp4phones);
60 62
   MODULEMGR(Udp4Receiver,     udp4receivers);
61 63
   MODULEMGR(Udp4Sender,       udp4senders);
64
+  MODULEMGR(Udp4SyncReceiver, udp4syncreceivers);
62 65
   MODULEMGR(Udp6Phone,        udp6phones);
63 66
   MODULEMGR(Udp6Receiver,     udp6receivers);
64 67
   MODULEMGR(Udp6Sender,       udp6senders);
68
+  MODULEMGR(Udp6SyncReceiver, udp6syncreceivers);
65 69
 
66 70
 #undef MODULEMGR
67 71
 
... ...
@@ -0,0 +1,20 @@
1
+/* Blinker
2
+   Copyright 2011-2014 Stefan Schuermans <stefan@blinkenarea.org>
3
+   Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html
4
+   a blinkenarea.org project */
5
+
6
+#include <arpa/inet.h>
7
+#include <stdint.h>
8
+
9
+#include "NetHost.h"
10
+
11
+namespace Blinker {
12
+
13
+/// convert 32 bit unsigned integer from network to host byte order
14
+uint32_t Net2Host32(uint32_t val)
15
+{
16
+  return ntohl(val);
17
+}
18
+
19
+} // namespace Blinker
20
+
... ...
@@ -0,0 +1,19 @@
1
+/* Blinker
2
+   Copyright 2011-2014 Stefan Schuermans <stefan@blinkenarea.org>
3
+   Copyleft GNU public license - http://www.gnu.org/copyleft/gpl.html
4
+   a blinkenarea.org project */
5
+
6
+#ifndef BLINKER_NETHOST_H
7
+#define BLINKER_NETHOST_H
8
+
9
+#include <stdint.h>
10
+
11
+namespace Blinker {
12
+
13
+/// convert 32 bit unsigned integer from network to host byte order
14
+uint32_t Net2Host32(uint32_t val);
15
+
16
+} // namespace Blinker
17
+
18
+#endif // #ifndef BLINKER_NETHOST_H
19
+
0 20