9 #ifndef INCLUDE_SUBSCRIBER_H_ 10 #define INCLUDE_SUBSCRIBER_H_ 14 #include <unordered_map> 27 template<
typename T >
30 explicit Optional( T data ) : data_(data), isDefined_(
true ) {}
32 inline bool isDefined() {
return isDefined_; }
33 inline bool data() {
return data_; }
40 typedef std::function<
41 void(std::tuple<sem_id_t, shm_object*, generic_t*>&)>
callback;
64 registered_callbacks_lock;
92 static std::unordered_map<std::string, octopOS_id_t>
topic_ids;
97 static std::unordered_map<std::string, std::vector<callback>>
114 template <
typename T>
127 pthread_mutex_init(&data_queue_lock, NULL);
128 pthread_cond_init(&data_queue_condition, NULL);
133 [
this](std::tuple<sem_id_t, shm_object*, generic_t*> &data) {
134 sem_id_t sem_id = std::get<0>(data);
137 if (
p(sem_id, 3) < 0)
139 if (
p(sem_id, 0) < 0)
143 if (
p(sem_id, 2) < 0)
148 pthread_mutex_lock(&data_queue_lock);
149 data_queue.push(*reinterpret_cast<T*>(std::get<2>(data)));
150 pthread_mutex_unlock(&data_queue_lock);
152 if (
p(sem_id, 0) < 0)
156 if (
v(sem_id, 2) < 0)
159 pthread_cond_signal(&data_queue_condition);
160 }, topic_name,
sizeof(T),
this))
161 throw std::runtime_error(
"Failed to register topic");
169 return !data_queue.empty();
179 pthread_mutex_lock(&data_queue_lock);
181 while (data_queue.empty())
182 pthread_cond_wait(&data_queue_condition, &data_queue_lock);
184 return_value = data_queue.front();
187 pthread_mutex_unlock(&data_queue_lock);
198 pthread_mutex_lock(&data_queue_lock);
199 if (data_queue.empty()) {
200 pthread_mutex_unlock(&data_queue_lock);
205 pthread_mutex_unlock(&data_queue_lock);
214 pthread_mutex_destroy(&data_queue_lock);
215 pthread_cond_destroy(&data_queue_condition);
223 pthread_mutex_t data_queue_lock;
226 pthread_cond_t data_queue_condition;
230 std::queue<T> data_queue;
235 #endif // INCLUDE_SUBSCRIBER_H_
intptr_t generic_t
Definition: utility.h:45
bool data_available()
Definition: subscriber.h:168
Definition: subscriber.h:47
static std::unordered_map< std::string, std::vector< callback > > registered_callbacks
Definition: subscriber.h:98
T get_data()
Definition: subscriber.h:177
Definition: tentacle.h:31
static std::unordered_map< std::string, octopOS_id_t > topic_ids
Definition: subscriber.h:92
subscriber(std::string topic_name, key_t shared_queue)
Definition: subscriber.h:124
std::function< void(std::tuple< sem_id_t, shm_object *, generic_t * > &)> callback
Definition: subscriber.h:41
Definition: subscriber.h:28
int v(int sem, int counter)
Definition: utility.cpp:54
~subscriber()
Definition: subscriber.h:213
static std::unordered_map< std::string, std::tuple< sem_id_t, shm_object *, generic_t * > > topic_memory
Definition: subscriber.h:86
Optional< T > get_data_async()
Definition: subscriber.h:197
Definition: subscriber.h:115
static bool register_cb(callback cb, std::string topic, uint size, subscriber_manager *sub)
Definition: subscriber.cpp:16
static std::mutex topic_ids_lock
Definition: subscriber.h:62
uint sem_id_t
Definition: utility.h:49
int p(int sem, int counter)
Definition: utility.cpp:39
subscriber_manager(key_t shared_queue)
Definition: subscriber.h:105
unsigned rw_array[2]
Definition: utility.h:54
static void * wait_for_data(void *data)
Definition: subscriber.cpp:60