implemented dynamic destinations in sender module
Stefan Schuermans

Stefan Schuermans commited on 2011-11-15 23:54:16
Showing 3 changed files, with 236 additions and 20 deletions.

... ...
@@ -28,6 +28,24 @@ Udp4Addr::~Udp4Addr()
28 28
 {
29 29
 }
30 30
 
31
+/// less-than operator (to allow usage as map key)
32
+bool Udp4Addr::operator<(const Udp4Addr &that) const
33
+{
34
+  if (m_addr.sin_family < that.m_addr.sin_family)
35
+    return true;
36
+  if (m_addr.sin_family > that.m_addr.sin_family)
37
+    return false;
38
+  if (ntohl(m_addr.sin_addr.s_addr) < ntohl(that.m_addr.sin_addr.s_addr))
39
+    return true;
40
+  if (ntohl(m_addr.sin_addr.s_addr) > ntohl(that.m_addr.sin_addr.s_addr))
41
+    return false;
42
+  if (ntohs(m_addr.sin_port) < ntohs(that.m_addr.sin_port))
43
+    return true;
44
+  if (ntohs(m_addr.sin_port) > ntohs(that.m_addr.sin_port))
45
+    return false;
46
+  return false;
47
+}
48
+
31 49
 /// return address family
32 50
 int Udp4Addr::getFamily() const
33 51
 {
... ...
@@ -26,6 +26,9 @@ public:
26 26
   virtual ~Udp4Addr();
27 27
 
28 28
 public:
29
+  /// less-than operator (to allow usage as map key)
30
+  virtual bool operator<(const Udp4Addr &that) const;
31
+
29 32
   /// return address family
30 33
   virtual int getFamily() const;
31 34
 
... ...
@@ -7,24 +7,29 @@
7 7
 #define SENDER_H
8 8
 
9 9
 #include <list>
10
+#include <map>
10 11
 #include <string>
11 12
 
13
+#include <BlinkenLib/BlinkenProto.h>
12 14
 #include <BlinkenLib/BlinkenFrame.h>
13 15
 
14 16
 #include "CallMgr.h"
15 17
 #include "Directory.h"
16 18
 #include "File.h"
19
+#include "IoCallee.h"
17 20
 #include "Module.h"
18 21
 #include "SettingFile.h"
19 22
 #include "StreamMgr.h"
20 23
 #include "StreamRecv.h"
24
+#include "Time.h"
21 25
 #include "TimeCallee.h"
22 26
 
23 27
 namespace Blinker {
24 28
 
25 29
 /// stream sender
26 30
 template<typename ADDR, typename SOCK>
27
-class Sender: public Module, public StreamRecv, public TimeCallee
31
+class Sender: public IoCallee, public Module, public StreamRecv,
32
+              public TimeCallee
28 33
 {
29 34
 protected:
30 35
   /// static destination
... ...
@@ -39,6 +44,9 @@ protected:
39 44
   /// static destinations
40 45
   typedef std::list<Dest> Dests;
41 46
 
47
+  /// dynamic destinations: address -> time of last request
48
+  typedef std::map<ADDR, Time> DynDests;
49
+
42 50
 public:
43 51
   /**
44 52
    * @brief constructor
... ...
@@ -74,6 +82,18 @@ public:
74 82
   /// callback when requsted time reached
75 83
   virtual void timeCall();
76 84
 
85
+  /**
86
+   * @brief callback when I/O object is readable
87
+   * @param[in] io I/O object that is readable
88
+   */
89
+  virtual void ioReadCall(Io *io);
90
+
91
+  /**
92
+   * @brief callback when I/O object is writable
93
+   * @param[in] io I/O object that is writable
94
+   */
95
+  virtual void ioWriteCall(Io *io);
96
+
77 97
 protected:
78 98
   /// get input stream and attach to it
79 99
   void getInStream();
... ...
@@ -105,26 +125,29 @@ protected:
105 125
   void updateDestsFull(Directory &dirDests, Dests &dests,
106 126
                        etBlinkenProto proto);
107 127
 
128
+  /// remove timed-out dynamic destinations
129
+  void removeTimedOutDynDests();
130
+
108 131
   /**
109 132
    * @brief send current frame to address
110 133
    * @param[in] addr address to send to
111 134
    * @param[in] proto Blinken protocol identifier
112 135
    */
113
-  void sendCurFrame(const ADDR &addr, etBlinkenProto proto);
136
+  void sendCurFrame(const ADDR &addr, etBlinkenProto proto) const;
114 137
 
115 138
   /**
116 139
    * @brief send "no frame" to address
117 140
    * @param[in] addr address to send to
118 141
    * @param[in] proto Blinken protocol identifier
119 142
    */
120
-  void sendNoFrame(const ADDR &addr, etBlinkenProto proto);
143
+  void sendNoFrame(const ADDR &addr, etBlinkenProto proto) const;
121 144
 
122 145
   /**
123 146
    * @brief send frame to address
124 147
    * @param[in] data protcol data of frame
125 148
    * @param[in] addr address to send to
126 149
    */
127
-  void sendFrame(const std::string &data, const ADDR &addr);
150
+  void sendFrame(const std::string &data, const ADDR &addr) const;
128 151
 
129 152
   /**
130 153
    * @brief convert frame to protocol data
... ...
@@ -132,7 +155,7 @@ protected:
132 155
    * @param[in] proto Blinken protocol identifier
133 156
    * @param[out] data protcol data
134 157
    */
135
-  void frame2data(stBlinkenFrame *pFrame, etBlinkenProto proto,
158
+  static void frame2data(stBlinkenFrame *pFrame, etBlinkenProto proto,
136 159
                          std::string &data);
137 160
 
138 161
   /**
... ...
@@ -140,7 +163,10 @@ protected:
140 163
    * @param[in] proto Blinken protocol identifier
141 164
    * @param[out] data protcol data
142 165
    */
143
-  void noFrame2data(etBlinkenProto proto, std::string &data);
166
+  static void noFrame2data(etBlinkenProto proto, std::string &data);
167
+
168
+  /// receive data from socket
169
+  void receiveFromSock();
144 170
 
145 171
 protected:
146 172
   SettingFile m_fileInStream; ///< input stream name file
... ...
@@ -151,9 +177,12 @@ protected:
151 177
   std::string m_nameInStream; ///< name of input stream
152 178
   Stream      *m_pInStream;   ///< input stream
153 179
   SOCK        *m_pSock;       ///< socket to use for sending streams
154
-  Dests        m_destsBlp;    ///< current static BLP destinations
155
-  Dests        m_destsEblp;   ///< current static EBLP destinations
156
-  Dests        m_destsMcuf;   ///< current static MCUF destinations
180
+  Dests       m_destsBlp;     ///< static BLP destinations
181
+  Dests       m_destsEblp;    ///< static EBLP destinations
182
+  Dests       m_destsMcuf;    ///< static MCUF destinations
183
+  DynDests    m_dynDestsBlp;  ///< dynamic BLP destinations
184
+  DynDests    m_dynDestsEblp; ///< dynamic EBLP destinations
185
+  DynDests    m_dynDestsMcuf; ///< dynamic MCUF destinations
157 186
 }; // class Sender
158 187
 
159 188
 /* ##########
... ...
@@ -245,12 +274,15 @@ void Sender<ADDR, SOCK>::setFrame(stBlinkenFrame *pFrame)
245 274
 {
246 275
   std::string blp, eblp, mcuf;
247 276
 
277
+  // remove timed-out dynamic destinations
278
+  removeTimedOutDynDests();
279
+
248 280
   // convert frame to protocol data
249
-  if (!m_destsBlp.empty())
281
+  if (!m_destsBlp.empty() || !m_dynDestsBlp.empty())
250 282
     frame2data(pFrame, BlinkenProtoBlp, blp);
251
-  if (!m_destsEblp.empty())
283
+  if (!m_destsEblp.empty() || !m_dynDestsEblp.empty())
252 284
     frame2data(pFrame, BlinkenProtoEblp, eblp);
253
-  if (!m_destsMcuf.empty())
285
+  if (!m_destsMcuf.empty() || !m_dynDestsMcuf.empty())
254 286
     frame2data(pFrame, BlinkenProtoMcuf, mcuf);
255 287
 
256 288
   // send frame to all static destinations
... ...
@@ -262,6 +294,15 @@ void Sender<ADDR, SOCK>::setFrame(stBlinkenFrame *pFrame)
262 294
   for (itDest = m_destsMcuf.begin(); itDest != m_destsMcuf.end(); ++itDest)
263 295
     sendFrame(mcuf, itDest->m_addr);
264 296
 
297
+  // send frame to all dynamic destinations
298
+  typename DynDests::const_iterator itDyn;
299
+  for (itDyn = m_dynDestsBlp.begin(); itDyn != m_dynDestsBlp.end(); ++itDyn)
300
+    sendFrame(blp, itDyn->first);
301
+  for (itDyn = m_dynDestsEblp.begin(); itDyn != m_dynDestsEblp.end(); ++itDyn)
302
+    sendFrame(eblp, itDyn->first);
303
+  for (itDyn = m_dynDestsMcuf.begin(); itDyn != m_dynDestsMcuf.end(); ++itDyn)
304
+    sendFrame(mcuf, itDyn->first);
305
+
265 306
   // request time callback in one second
266 307
   m_callMgr.requestTimeCall(this, Time::now() + Time(1));
267 308
 }
... ...
@@ -272,12 +313,15 @@ void Sender<ADDR, SOCK>::setNoFrame()
272 313
 {
273 314
   std::string blp, eblp, mcuf;
274 315
 
316
+  // remove timed-out dynamic destinations
317
+  removeTimedOutDynDests();
318
+
275 319
   // get "no frame" protocol data
276
-  if (!m_destsBlp.empty())
320
+  if (!m_destsBlp.empty() || !m_dynDestsBlp.empty())
277 321
     noFrame2data(BlinkenProtoBlp, blp);
278
-  if (!m_destsEblp.empty())
322
+  if (!m_destsEblp.empty() || !m_dynDestsEblp.empty())
279 323
     noFrame2data(BlinkenProtoEblp, eblp);
280
-  if (!m_destsMcuf.empty())
324
+  if (!m_destsMcuf.empty() || !m_dynDestsMcuf.empty())
281 325
     noFrame2data(BlinkenProtoMcuf, mcuf);
282 326
 
283 327
   // send "no frame" to all staticdestinations
... ...
@@ -289,6 +333,15 @@ void Sender<ADDR, SOCK>::setNoFrame()
289 333
   for (itDest = m_destsMcuf.begin(); itDest != m_destsMcuf.end(); ++itDest)
290 334
     sendFrame(mcuf, itDest->m_addr);
291 335
 
336
+  // send frame to all dynamic destinations
337
+  typename DynDests::const_iterator itDyn;
338
+  for (itDyn = m_dynDestsBlp.begin(); itDyn != m_dynDestsBlp.end(); ++itDyn)
339
+    sendFrame(blp, itDyn->first);
340
+  for (itDyn = m_dynDestsEblp.begin(); itDyn != m_dynDestsEblp.end(); ++itDyn)
341
+    sendFrame(eblp, itDyn->first);
342
+  for (itDyn = m_dynDestsMcuf.begin(); itDyn != m_dynDestsMcuf.end(); ++itDyn)
343
+    sendFrame(mcuf, itDyn->first);
344
+
292 345
   // request time callback in one second
293 346
   m_callMgr.requestTimeCall(this, Time::now() + Time(1));
294 347
 }
... ...
@@ -311,6 +364,28 @@ void Sender<ADDR, SOCK>::timeCall()
311 364
     setNoFrame();
312 365
 }
313 366
 
367
+/**
368
+ * @brief callback when I/O object is readable
369
+ * @param[in] io I/O object that is readable
370
+ */
371
+template<typename ADDR, typename SOCK>
372
+void Sender<ADDR, SOCK>::ioReadCall(Io *io)
373
+{
374
+  // reception on socket
375
+  if (io == m_pSock)
376
+    receiveFromSock();
377
+}
378
+
379
+/**
380
+ * @brief callback when I/O object is writable
381
+ * @param[in] io I/O object that is writable
382
+ */
383
+template<typename ADDR, typename SOCK>
384
+void Sender<ADDR, SOCK>::ioWriteCall(Io *io)
385
+{
386
+  (void)io; // unused
387
+}
388
+
314 389
 /// get input stream and attach to it
315 390
 template<typename ADDR, typename SOCK>
316 391
 void Sender<ADDR, SOCK>::getInStream()
... ...
@@ -364,12 +439,28 @@ void Sender<ADDR, SOCK>::createSock()
364 439
     m_pSock = NULL;
365 440
     return;
366 441
   }
442
+
443
+  // request callback on recpetion
444
+  m_callMgr.requestIoReadCall(this, m_pSock);
367 445
 }
368 446
 
369 447
 /// destroy socket
370 448
 template<typename ADDR, typename SOCK>
371 449
 void Sender<ADDR, SOCK>::destroySock()
372 450
 {
451
+  // send no frame to all destinations
452
+  // (stream from this socket will stop now)
453
+  setNoFrame();
454
+
455
+  // clear dynamic destinations
456
+  // (they registered with this socket adn this socket is gone)
457
+  m_dynDestsBlp.clear();
458
+  m_dynDestsEblp.clear();
459
+  m_dynDestsMcuf.clear();
460
+
461
+  // cancel callback request
462
+  m_callMgr.cancelIoReadCall(this, m_pSock);
463
+
373 464
   // destroy socket
374 465
   if (m_pSock) {
375 466
     delete m_pSock;
... ...
@@ -468,13 +559,41 @@ void Sender<ADDR, SOCK>::updateDestsFull(Directory &dirDests, Dests &dests,
468 559
   } // while itFile itDest
469 560
 }
470 561
 
562
+/// remove timed-out dynamic destinations
563
+template<typename ADDR, typename SOCK>
564
+void Sender<ADDR, SOCK>::removeTimedOutDynDests()
565
+{
566
+  Time now, timeout;
567
+  typename DynDests::iterator itDyn;
568
+
569
+  now = Time::now();
570
+  timeout = Time(30);
571
+
572
+  for (itDyn = m_dynDestsBlp.begin(); itDyn != m_dynDestsBlp.end(); )
573
+    if (itDyn->second + timeout < now)
574
+      m_dynDestsBlp.erase(itDyn++);
575
+    else
576
+      ++itDyn;
577
+  for (itDyn = m_dynDestsEblp.begin(); itDyn != m_dynDestsEblp.end(); )
578
+    if (itDyn->second + timeout < now)
579
+      m_dynDestsEblp.erase(itDyn++);
580
+    else
581
+      ++itDyn;
582
+  for (itDyn = m_dynDestsMcuf.begin(); itDyn != m_dynDestsMcuf.end(); )
583
+    if (itDyn->second + timeout < now)
584
+      m_dynDestsMcuf.erase(itDyn++);
585
+    else
586
+      ++itDyn;
587
+}
588
+
471 589
 /**
472 590
  * @brief send current frame to address
473 591
  * @param[in] addr address to send to
474 592
  * @param[in] proto Blinken protocol identifier
475 593
  */
476 594
 template<typename ADDR, typename SOCK>
477
-void Sender<ADDR, SOCK>::sendCurFrame(const ADDR &addr, etBlinkenProto proto)
595
+void Sender<ADDR, SOCK>::sendCurFrame(const ADDR &addr,
596
+                                      etBlinkenProto proto) const
478 597
 {
479 598
   stBlinkenFrame *pFrame;
480 599
   std::string data;
... ...
@@ -502,7 +621,8 @@ void Sender<ADDR, SOCK>::sendCurFrame(const ADDR &addr, etBlinkenProto proto)
502 621
  * @param[in] proto Blinken protocol identifier
503 622
  */
504 623
 template<typename ADDR, typename SOCK>
505
-void Sender<ADDR, SOCK>::sendNoFrame(const ADDR &addr, etBlinkenProto proto)
624
+void Sender<ADDR, SOCK>::sendNoFrame(const ADDR &addr,
625
+                                     etBlinkenProto proto) const
506 626
 {
507 627
   std::string data;
508 628
 
... ...
@@ -519,7 +639,8 @@ void Sender<ADDR, SOCK>::sendNoFrame(const ADDR &addr, etBlinkenProto proto)
519 639
  * @param[in] addr address to send to
520 640
  */
521 641
 template<typename ADDR, typename SOCK>
522
-void Sender<ADDR, SOCK>::sendFrame(const std::string &data, const ADDR &addr)
642
+void Sender<ADDR, SOCK>::sendFrame(const std::string &data,
643
+                                   const ADDR &addr) const
523 644
 {
524 645
   if (m_pSock) {
525 646
     m_pSock->send(data, addr);
... ...
@@ -541,6 +662,8 @@ void Sender<ADDR, SOCK>::frame2data(stBlinkenFrame *pFrame,
541 662
 
542 663
   // convert frame to protcol data
543 664
   len = BlinkenFrameToNetwork(pFrame, proto, buf, sizeof(buf));
665
+  if (len < 0)
666
+    len = 0;
544 667
   data.assign(buf, len);
545 668
 }
546 669
 
... ...
@@ -552,9 +675,81 @@ void Sender<ADDR, SOCK>::frame2data(stBlinkenFrame *pFrame,
552 675
 template<typename ADDR, typename SOCK>
553 676
 void Sender<ADDR, SOCK>::noFrame2data(etBlinkenProto proto, std::string &data)
554 677
 {
678
+  char buf[16];
679
+  int len;
680
+
555 681
   // obtain "no frame" protcol data
556
-  (void)proto; // FIXME
557
-  data.assign("", 1); // FIXME
682
+  len = BlinkenProtoMakePacket(proto, BlinkenPacketStreamEnd,
683
+        buf, sizeof(buf));
684
+  if (len < 0)
685
+    len = 0;
686
+  data.assign(buf, len);
687
+}
688
+
689
+/// receive data from socket
690
+template<typename ADDR, typename SOCK>
691
+void Sender<ADDR, SOCK>::receiveFromSock()
692
+{
693
+  etBlinkenProto proto;
694
+  etBlinkenPacket packet;
695
+  std::string data;
696
+  ADDR addr;
697
+
698
+  // make sure socket exists
699
+  if (!m_pSock)
700
+    return;
701
+
702
+  // receive (leave if no reception)
703
+  if (!m_pSock->recv(data, addr))
704
+    return;
705
+
706
+  // detect packet type and protocol
707
+  BlinkenProtoDetectPacket(data.c_str(), data.size(), &proto, &packet);
708
+
709
+  switch (packet) {
710
+
711
+    // request -> add to dynamic destinations and send current frame
712
+    case BlinkenPacketRequest:
713
+      switch (proto) {
714
+        case BlinkenProtoBlp:
715
+          m_dynDestsBlp[addr] = Time::now();
716
+          sendCurFrame(addr, BlinkenProtoBlp);
717
+          break;
718
+        case BlinkenProtoEblp:
719
+          m_dynDestsEblp[addr] = Time::now();
720
+          sendCurFrame(addr, BlinkenProtoEblp);
721
+          break;
722
+        case BlinkenProtoMcuf:
723
+          m_dynDestsMcuf[addr] = Time::now();
724
+          sendCurFrame(addr, BlinkenProtoMcuf);
725
+          break;
726
+        default:
727
+          break;
728
+      }
729
+      break;
730
+
731
+    // end request -> remove from dynamic destinations
732
+    case BlinkenPacketEndRequest:
733
+      switch (proto) {
734
+        case BlinkenProtoBlp:
735
+          m_dynDestsBlp.erase(addr);
736
+          break;
737
+        case BlinkenProtoEblp:
738
+          m_dynDestsEblp.erase(addr);
739
+          break;
740
+        case BlinkenProtoMcuf:
741
+          m_dynDestsMcuf.erase(addr);
742
+          break;
743
+        default:
744
+          break;
745
+      }
746
+      break;
747
+
748
+    default:
749
+      break;
750
+
751
+  } // switch (packet)
752
+
558 753
 }
559 754
 
560 755
 /* ################
561 756