OctopOS  0.6.0
Data communication bus for SPACE HAUC
subscriber.h
Go to the documentation of this file.
1 // Copyright 2017 Space HAUC Command and Data Handling
2 // This file is part of octopOS which is released under AGPLv3.
3 // See file LICENSE.txt or go to <http://www.gnu.org/licenses/> for full
4 // license details.
5 
9 #ifndef INCLUDE_SUBSCRIBER_H_
10 #define INCLUDE_SUBSCRIBER_H_
11 
12 #include <pthread.h>
13 
14 #include <unordered_map>
15 #include <sstream>
16 #include <vector>
17 #include <algorithm>
18 #include <iterator>
19 #include <string>
20 #include <queue>
21 #include <functional>
22 #include <tuple>
23 
24 #include "tentacle.h" // NOLINT
25 
27 template< typename T >
28 class Optional {
29  public:
30  explicit Optional( T data ) : data_(data), isDefined_( true ) {}
31  Optional() : isDefined_( false ) {}
32  inline bool isDefined() { return isDefined_; }
33  inline bool data() { return data_; }
34  private:
35  bool isDefined_;
36  T data_;
37 };
38 
40 typedef std::function<
41  void(std::tuple<sem_id_t, shm_object*, generic_t*>&)> callback;
42 
47 class subscriber_manager : protected tentacle {
48  public:
57  static void* wait_for_data(void* data);
58 
59  protected:
61 
62  static std::mutex topic_ids_lock,
63  topic_memory_lock,
64  registered_callbacks_lock;
66 
78  static bool register_cb(callback cb, std::string topic, uint size,
79  subscriber_manager* sub);
80 
85  static std::unordered_map<std::string, std::tuple<sem_id_t, shm_object*,
87 
92  static std::unordered_map<std::string, octopOS_id_t> topic_ids;
93 
97  static std::unordered_map<std::string, std::vector<callback>>
99 
105  explicit subscriber_manager(key_t shared_queue) : tentacle(shared_queue) {}
106 };
107 
114 template <typename T>
116  public:
124  subscriber(std::string topic_name, key_t shared_queue)
125  : subscriber_manager(shared_queue),
126  topic(topic_name) {
127  pthread_mutex_init(&data_queue_lock, NULL);
128  pthread_cond_init(&data_queue_condition, NULL);
129 
133  [this](std::tuple<sem_id_t, shm_object*, generic_t*> &data) {
134  sem_id_t sem_id = std::get<0>(data);
135  shm_object* rw = std::get<1>(data);
136 
137  if (p(sem_id, 3) < 0)
138  exit(1);
139  if (p(sem_id, 0) < 0)
140  exit(1);
141  rw->rw_array[0] += 1;
142  if (rw->rw_array[0] == 1)
143  if (p(sem_id, 2) < 0)
144  exit(1);
145  v(sem_id, 0);
146  v(sem_id, 3);
147 
148  pthread_mutex_lock(&data_queue_lock);
149  data_queue.push(*reinterpret_cast<T*>(std::get<2>(data)));
150  pthread_mutex_unlock(&data_queue_lock);
151 
152  if (p(sem_id, 0) < 0)
153  exit(1);
154  rw->rw_array[0] -= 1;
155  if (rw->rw_array[0] == 0)
156  if (v(sem_id, 2) < 0)
157  exit(1);
158  v(sem_id, 0);
159  pthread_cond_signal(&data_queue_condition);
160  }, topic_name, sizeof(T), this))
161  throw std::runtime_error("Failed to register topic");
162  }
163 
168  bool data_available() {
169  return !data_queue.empty();
170  }
171 
177  T get_data() {
178  T return_value;
179  pthread_mutex_lock(&data_queue_lock);
180 
181  while (data_queue.empty())
182  pthread_cond_wait(&data_queue_condition, &data_queue_lock);
183 
184  return_value = data_queue.front();
185  data_queue.pop();
186 
187  pthread_mutex_unlock(&data_queue_lock);
188  return return_value;
189  }
190 
198  pthread_mutex_lock(&data_queue_lock);
199  if (data_queue.empty()) {
200  pthread_mutex_unlock(&data_queue_lock);
201  return Optional<T>();
202  } else {
203  Optional<T> return_value( data_queue.front() );
204  data_queue.pop();
205  pthread_mutex_unlock(&data_queue_lock);
206  return return_value;
207  }
208  }
209 
214  pthread_mutex_destroy(&data_queue_lock);
215  pthread_cond_destroy(&data_queue_condition);
216  }
217 
218  private:
220  std::string topic;
221 
223  pthread_mutex_t data_queue_lock;
224 
226  pthread_cond_t data_queue_condition;
227 
230  std::queue<T> data_queue;
231 
233  long id; // NOLINT
234 };
235 #endif // INCLUDE_SUBSCRIBER_H_
Definition: utility.h:52
intptr_t generic_t
Definition: utility.h:45
bool data_available()
Definition: subscriber.h:168
Definition: subscriber.h:47
static std::unordered_map< std::string, std::vector< callback > > registered_callbacks
Definition: subscriber.h:98
T get_data()
Definition: subscriber.h:177
Definition: tentacle.h:31
static std::unordered_map< std::string, octopOS_id_t > topic_ids
Definition: subscriber.h:92
subscriber(std::string topic_name, key_t shared_queue)
Definition: subscriber.h:124
std::function< void(std::tuple< sem_id_t, shm_object *, generic_t * > &)> callback
Definition: subscriber.h:41
Definition: subscriber.h:28
int v(int sem, int counter)
Definition: utility.cpp:54
~subscriber()
Definition: subscriber.h:213
static std::unordered_map< std::string, std::tuple< sem_id_t, shm_object *, generic_t * > > topic_memory
Definition: subscriber.h:86
Optional< T > get_data_async()
Definition: subscriber.h:197
Definition: subscriber.h:115
static bool register_cb(callback cb, std::string topic, uint size, subscriber_manager *sub)
Definition: subscriber.cpp:16
static std::mutex topic_ids_lock
Definition: subscriber.h:62
uint sem_id_t
Definition: utility.h:49
int p(int sem, int counter)
Definition: utility.cpp:39
subscriber_manager(key_t shared_queue)
Definition: subscriber.h:105
unsigned rw_array[2]
Definition: utility.h:54
static void * wait_for_data(void *data)
Definition: subscriber.cpp:60