Radio: Add state PubSub, generalize PubSub

This commit is contained in:
Dominic Höglinger 2025-09-26 17:41:34 +02:00
parent f57868b3fd
commit 4625f56c6e
6 changed files with 102 additions and 21 deletions

View File

@ -0,0 +1,49 @@
#pragma once
#include <functional>
#include <memory>
namespace tt::hal::radio {
// Proposed PubSub class, can be moved elsewhere
template<class... Ts>
class PubSub
{
public:
typedef int SubscriptionId;
using Notifier = std::function<void(Ts...)>;
protected:
struct Subscription {
SubscriptionId id;
std::shared_ptr<Notifier> notifier;
};
SubscriptionId lastSubscriptionId = 0;
std::vector<Subscription> subscriptions;
public:
PubSub() {}
virtual ~PubSub() = default;
SubscriptionId subscribe(Notifier onPublish) {
subscriptions.push_back({
.id = ++lastSubscriptionId,
.notifier = std::make_shared<Notifier>(onPublish)
});
return lastSubscriptionId;
}
void unsubscribe(SubscriptionId subscriptionId) {
std::erase_if(subscriptions, [subscriptionId](auto& subscription) { return subscription.id == subscriptionId; });
}
void publish(Ts... pargs) {
for (auto& subscription : subscriptions) {
(*subscription.notifier)(pargs...);
}
}
};
}

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include "../Device.h" #include "../Device.h"
#include "PubSub.h"
#include "Unit.h" #include "Unit.h"
#include <Tactility/Mutex.h> #include <Tactility/Mutex.h>
@ -52,10 +53,6 @@ public:
Success Success
}; };
typedef int RxSubscriptionId;
typedef int TxId;
enum class State { enum class State {
PendingOn, PendingOn,
On, On,
@ -72,6 +69,15 @@ public:
Error Error
}; };
typedef int TxId;
using StatePubSub = PubSub<Device::Id, State>;
using RxPubSub = PubSub<Device::Id, const RxPacket&>;
using StateSubscriptionId = StatePubSub::SubscriptionId;
using RxSubscriptionId = RxPubSub::SubscriptionId;
using StateCallback = StatePubSub::Notifier;
using RxCallback = RxPubSub::Notifier;
using TxStateCallback = std::function<void(TxId id, TransmissionState state)>; using TxStateCallback = std::function<void(TxId id, TransmissionState state)>;
protected: protected:
@ -82,18 +88,13 @@ protected:
}; };
private: private:
struct RxSubscription {
RxSubscriptionId id;
std::shared_ptr<std::function<void(Device::Id id, const RxPacket&)>> onData;
};
State state; State state;
Modulation modulation; Modulation modulation;
Mutex mutex = Mutex(Mutex::Type::Recursive); Mutex mutex = Mutex(Mutex::Type::Recursive);
std::vector<RxSubscription> rxSubscriptions; StatePubSub statePubSub;
RxPubSub rxPubSub;
std::deque<TxItem> txQueue; std::deque<TxItem> txQueue;
TxId lastTxId = 0; TxId lastTxId = 0;
RxSubscriptionId lastRxSubscriptionId = 0;
protected: protected:
const Mutex &getMutex() const { return mutex; } const Mutex &getMutex() const { return mutex; }
@ -150,20 +151,28 @@ public:
return txId; return txId;
} }
RxSubscriptionId subscribeRx(const std::function<void(Device::Id id, const RxPacket&)>& onData) { StateSubscriptionId subscribeStateChange(StateCallback onChange) {
auto lock = mutex.asScopedLock(); auto lock = mutex.asScopedLock();
lock.lock(); lock.lock();
rxSubscriptions.push_back({ return statePubSub.subscribe(onChange);
.id = ++lastRxSubscriptionId, }
.onData = std::make_shared<std::function<void(Device::Id, const RxPacket&)>>(onData)
}); void unsubscribeStateChange(StateSubscriptionId subscriptionId) {
return lastRxSubscriptionId; auto lock = mutex.asScopedLock();
lock.lock();
return rxPubSub.unsubscribe(subscriptionId);
}
RxSubscriptionId subscribeRx(const RxCallback& onData) {
auto lock = mutex.asScopedLock();
lock.lock();
return rxPubSub.subscribe(onData);
} }
void unsubscribeRx(RxSubscriptionId subscriptionId) { void unsubscribeRx(RxSubscriptionId subscriptionId) {
auto lock = mutex.asScopedLock(); auto lock = mutex.asScopedLock();
lock.lock(); lock.lock();
std::erase_if(rxSubscriptions, [subscriptionId](auto& subscription) { return subscription.id == subscriptionId; }); return rxPubSub.unsubscribe(subscriptionId);
} }
State getState() const; State getState() const;

View File

@ -39,14 +39,17 @@ RadioDevice::State RadioDevice::getState() const {
void RadioDevice::setState(State newState) { void RadioDevice::setState(State newState) {
auto lock = mutex.asScopedLock(); auto lock = mutex.asScopedLock();
lock.lock(); lock.lock();
if (state != newState) {
statePubSub.publish(getId(), newState);
}
state = newState; state = newState;
} }
void RadioDevice::publishRx(const RxPacket& packet) { void RadioDevice::publishRx(const RxPacket& packet) {
mutex.lock(); mutex.lock();
for (auto& subscription : rxSubscriptions) { rxPubSub.publish(getId(), packet);
(*subscription.onData)(getId(), packet);
}
mutex.unlock(); mutex.unlock();
} }

View File

@ -52,6 +52,7 @@ enum RadioTxState {
}; };
typedef int32_t RadioRxSubscriptionId; typedef int32_t RadioRxSubscriptionId;
typedef int32_t RadioStateSubscriptionId;
typedef int32_t RadioTxId; typedef int32_t RadioTxId;
struct RadioRxPacket { struct RadioRxPacket {
@ -67,6 +68,7 @@ struct RadioTxPacket {
uint32_t address; uint32_t address;
}; };
typedef void (*RadioStateCallback)(DeviceId id, RadioState state);
typedef void (*RadioTxStateCallback)(RadioTxId id, RadioTxState state); typedef void (*RadioTxStateCallback)(RadioTxId id, RadioTxState state);
typedef void (*RadioOnReceiveCallback)(DeviceId id, const RadioRxPacket* packet); typedef void (*RadioOnReceiveCallback)(DeviceId id, const RadioRxPacket* packet);
@ -203,6 +205,14 @@ RadioTxId tt_hal_radio_transmit(RadioHandle handle, RadioTxPacket packet, RadioT
*/ */
RadioRxSubscriptionId tt_hal_radio_subscribe_receive(RadioHandle handle, RadioOnReceiveCallback callback); RadioRxSubscriptionId tt_hal_radio_subscribe_receive(RadioHandle handle, RadioOnReceiveCallback callback);
/**
* Subscribe for any state change of the radio driver object.
* @param[in] handle the radio driver handle
* @param[in] callback function to call when the state of the radio changes
* @return the identifier for the subscription
*/
RadioStateSubscriptionId tt_hal_radio_subscribe_state(RadioHandle handle, RadioStateCallback callback);
/** /**
* Unsubscribe for any received packet that the radio driver object receives. * Unsubscribe for any received packet that the radio driver object receives.
* @param[in] handle the radio driver handle * @param[in] handle the radio driver handle

View File

@ -129,6 +129,15 @@ extern "C" {
}); });
} }
RadioStateSubscriptionId tt_hal_radio_subscribe_state(RadioHandle handle, RadioStateCallback callback) {
auto wrapper = static_cast<DeviceWrapper*>(handle);
return wrapper->device->subscribeStateChange([callback](tt::hal::Device::Id id, tt::hal::radio::RadioDevice::State state) {
if (callback) {
callback(id, fromCpp(state));
}
});
}
RadioRxSubscriptionId tt_hal_radio_subscribe_receive(RadioHandle handle, RadioOnReceiveCallback callback) { RadioRxSubscriptionId tt_hal_radio_subscribe_receive(RadioHandle handle, RadioOnReceiveCallback callback) {
auto wrapper = static_cast<DeviceWrapper*>(handle); auto wrapper = static_cast<DeviceWrapper*>(handle);
return wrapper->device->subscribeRx([callback](tt::hal::Device::Id id, const tt::hal::radio::RxPacket& ttPacket) { return wrapper->device->subscribeRx([callback](tt::hal::Device::Id id, const tt::hal::radio::RxPacket& ttPacket) {

View File

@ -282,6 +282,7 @@ const esp_elfsym elf_symbols[] {
ESP_ELFSYM_EXPORT(tt_hal_radio_start), ESP_ELFSYM_EXPORT(tt_hal_radio_start),
ESP_ELFSYM_EXPORT(tt_hal_radio_stop), ESP_ELFSYM_EXPORT(tt_hal_radio_stop),
ESP_ELFSYM_EXPORT(tt_hal_radio_transmit), ESP_ELFSYM_EXPORT(tt_hal_radio_transmit),
ESP_ELFSYM_EXPORT(tt_hal_radio_subscribe_state),
ESP_ELFSYM_EXPORT(tt_hal_radio_subscribe_receive), ESP_ELFSYM_EXPORT(tt_hal_radio_subscribe_receive),
ESP_ELFSYM_EXPORT(tt_hal_radio_unsubscribe_receive), ESP_ELFSYM_EXPORT(tt_hal_radio_unsubscribe_receive),
ESP_ELFSYM_EXPORT(tt_hal_touch_driver_get_touched_points), ESP_ELFSYM_EXPORT(tt_hal_touch_driver_get_touched_points),