Drake Designer
 All Classes Namespaces Files Functions Variables Enumerations Enumerator Macros Pages
ddLCMSubscriber.h
Go to the documentation of this file.
1 #ifndef __ddLCMSubscriber_h
2 #define __ddLCMSubscriber_h
3 
4 #include <QObject>
5 #include <ddMacros.h>
6 
7 #include <string>
8 
9 #include <PythonQt.h>
10 #include <QMutex>
11 #include <QMutexLocker>
12 #include <QWaitCondition>
13 #include <QTime>
14 
15 #include <lcm/lcm-cpp.hpp>
16 
17 #include "ddFPSCounter.h"
18 #include "ddAppConfigure.h"
19 
20 
21 namespace lcm
22 {
23  class LCM;
24 }
25 
26 class DD_APP_EXPORT ddLCMSubscriber : public QObject
27  {
28  Q_OBJECT
29 
30 public:
31 
32  ddLCMSubscriber(const QString& channel, QObject* parent=NULL) : QObject(parent)
33  {
34  mChannel = channel;
35  this->mEmitMessages = true;
36  this->mNotifyAllMessages = false;
37  this->mRequiredElapsedMilliseconds = 0;
38  this->connect(this, SIGNAL(messageReceivedInQueue(const QString&)), SLOT(onMessageInQueue(const QString&)));
39  }
40 
41  virtual ~ddLCMSubscriber()
42  {
43  }
44 
45  virtual void subscribe(lcm::LCM* lcmHandle)
46  {
47  mSubscription = lcmHandle->subscribe(mChannel.toAscii().data(), &ddLCMSubscriber::messageHandler, this);
48  }
49 
50  virtual void unsubscribe(lcm::LCM* lcmHandle)
51  {
52  lcmHandle->unsubscribe(mSubscription);
53  mSubscription = 0;
54  }
55 
56  const QString& channel() const
57  {
58  return mChannel;
59  }
60 
61  void setCallbackEnabled(bool enabled)
62  {
63  this->mEmitMessages = enabled;
64  }
65 
66  bool callbackIsEnabled() const
67  {
68  return this->mEmitMessages;
69  }
70 
71  // See notifyAllMessagesIsEnabled()
72  void setNotifyAllMessagesEnabled(bool enabled)
73  {
74  this->mNotifyAllMessages = enabled;
75  }
76 
77  // If the main thread is busy while several LCM messages are received by this
78  // subscriber on the LCM thread, then this flag determines which messages the
79  // main thread will see when it becomes ready to process messages. If this
80  // flag is true, then the main thread will be notified, via the messageReceived()
81  // signal, for each message. If this flag is false, then the main thread will
82  // be notified only once with the most recently received message. Set this
83  // flag to true if it is important to never miss a message. The default is
84  // false, meaning that messages will be dropped if the main thread is not
85  // available to process them before a new message is received.
87  {
88  return this->mNotifyAllMessages;
89  }
90 
91  void setSpeedLimit(double hertz)
92  {
93  if (hertz <= 0.0)
94  {
95  this->mRequiredElapsedMilliseconds = 0;
96  }
97  else
98  {
99  this->mRequiredElapsedMilliseconds = static_cast<int>(1000.0 / hertz);
100  }
101  }
102 
103  double getMessageRate()
104  {
105  return this->mFPSCounter.averageFPS();
106  }
107 
108  QByteArray getNextMessage(int timeout)
109  {
110 
111  QMutexLocker locker(&this->mMutex);
112 
113  QByteArray msg = this->mLastMessage;
114  this->mLastMessage.clear();
115 
116 
117  if (msg.size())
118  {
119  return msg;
120  }
121 
122  bool haveNewMessage = this->mWaitCondition.wait(&this->mMutex, timeout);
123 
124  if (!haveNewMessage)
125  {
126  return QByteArray();
127  }
128 
129  msg = this->mLastMessage;
130  this->mLastMessage.clear();
131 
132  return msg;
133  }
134 
135 signals:
136 
137  void messageReceived(const QByteArray& messageData, const QString& channel);
138  void messageReceivedInQueue(const QString& channel);
139 
140 protected slots:
141 
142  void onMessageInQueue(const QString& channel)
143  {
144  QByteArray msg = this->getNextMessage(0);
145  emit this->messageReceived(msg, channel);
146  }
147 
148 
149 protected:
150 
151 
152  void messageHandler(const lcm::ReceiveBuffer* rbuf, const std::string& channel)
153  {
154  ddNotUsed(channel);
155 
156  QByteArray messageBytes = QByteArray((char*)rbuf->data, rbuf->data_size);
157 
158  mFPSCounter.update();
159 
160  if (this->mEmitMessages)
161  {
162  if (this->mRequiredElapsedMilliseconds == 0 || mTimer.elapsed() > this->mRequiredElapsedMilliseconds)
163  {
164  this->mTimer.restart();
165 
166  if (this->mNotifyAllMessages)
167  {
168  emit this->messageReceived(messageBytes, QString(channel.c_str()));
169  }
170  else
171  {
172  this->mMutex.lock();
173  bool doEmit = !this->mLastMessage.size();
174  this->mLastMessage = messageBytes;
175  this->mMutex.unlock();
176 
177  if (doEmit)
178  {
179  emit this->messageReceivedInQueue(QString(channel.c_str()));
180  }
181  }
182 
183  }
184  }
185  else
186  {
187  this->mMutex.lock();
188  this->mLastMessage = messageBytes;
189  this->mMutex.unlock();
190  this->mWaitCondition.wakeAll();
191  }
192 
193  }
194 
198  mutable QMutex mMutex;
199  QWaitCondition mWaitCondition;
200  QByteArray mLastMessage;
202  QTime mTimer;
203  QString mChannel;
204  lcm::Subscription* mSubscription;
205 
206 };
207 
208 #endif
QByteArray getNextMessage(int timeout)
void messageHandler(const lcm::ReceiveBuffer *rbuf, const std::string &channel)
void setSpeedLimit(double hertz)
void setNotifyAllMessagesEnabled(bool enabled)
virtual void unsubscribe(lcm::LCM *lcmHandle)
bool callbackIsEnabled() const
const QString & channel() const
virtual void subscribe(lcm::LCM *lcmHandle)
lcm::Subscription * mSubscription
void setCallbackEnabled(bool enabled)
ddFPSCounter mFPSCounter
QWaitCondition mWaitCondition
virtual ~ddLCMSubscriber()
bool notifyAllMessagesIsEnabled() const
ddLCMSubscriber(const QString &channel, QObject *parent=NULL)
int mRequiredElapsedMilliseconds
#define DD_APP_EXPORT
double getMessageRate()
void onMessageInQueue(const QString &channel)
QByteArray mLastMessage