Refactored PubSub

Stronger typing
Added tests
This commit is contained in:
Ken Van Hoeylandt 2025-09-01 00:02:06 +02:00
parent c00ffa4fcf
commit 1aa5daf1b0
19 changed files with 207 additions and 212 deletions

View File

@ -2,12 +2,9 @@
## Higher Priority
- Move Display settings from flash to `/data/apps/display/display.properties`
- Expose app::Paths to TactilityC
- Call tt::lvgl::isSyncSet after HAL init and show an error (and crash?) when it is not set.
- External app loading: Check the version of Tactility and check ESP target hardware to check for compatibility.
- App packaging
- Create more unit tests for `tactility-core`
- Make a URL handler. Use it for handling local files. Match file types with apps.
- Fix Development service: when no SD card is present, the app fails to install. Consider installing to `/data`
- Refactor `PropertiesFile.cpp` to use `tt::file::readLines()` (see TODO in code)

View File

@ -11,8 +11,6 @@ namespace tt::service::gps {
class GpsService final : public Service {
private:
struct GpsDeviceRecord {
std::shared_ptr<hal::gps::GpsDevice> device = nullptr;
hal::gps::GpsDevice::GgaSubscriptionId satelliteSubscriptionId = -1;
@ -25,7 +23,7 @@ private:
Mutex mutex = Mutex(Mutex::Type::Recursive);
Mutex stateMutex;
std::vector<GpsDeviceRecord> deviceRecords;
std::shared_ptr<PubSub> statePubSub = std::make_shared<PubSub>();
std::shared_ptr<PubSub<State>> statePubSub = std::make_shared<PubSub<State>>();
std::unique_ptr<Paths> paths;
State state = State::Off;
@ -46,8 +44,8 @@ private:
public:
void onStart(tt::service::ServiceContext &serviceContext) final;
void onStop(tt::service::ServiceContext &serviceContext) final;
void onStart(ServiceContext &serviceContext) override;
void onStop(ServiceContext &serviceContext) override;
bool addGpsConfiguration(hal::gps::GpsConfiguration configuration);
bool removeGpsConfiguration(hal::gps::GpsConfiguration configuration);
@ -61,7 +59,7 @@ public:
bool getCoordinates(minmea_sentence_rmc& rmc) const;
/** @return GPS service pubsub that broadcasts State* objects */
std::shared_ptr<PubSub> getStatePubsub() const { return statePubSub; }
std::shared_ptr<PubSub<State>> getStatePubsub() const { return statePubSub; }
};
std::shared_ptr<GpsService> findGpsService();

View File

@ -11,15 +11,11 @@ namespace tt::service::loader {
// region LoaderEvent for PubSub
typedef enum {
LoaderEventTypeApplicationStarted,
LoaderEventTypeApplicationShowing,
LoaderEventTypeApplicationHiding,
LoaderEventTypeApplicationStopped
} LoaderEventType;
struct LoaderEvent {
LoaderEventType type;
enum class LoaderEvent{
ApplicationStarted,
ApplicationShowing,
ApplicationHiding,
ApplicationStopped
};
// endregion LoaderEvent for PubSub
@ -43,6 +39,6 @@ std::shared_ptr<app::App> _Nullable getCurrentApp();
/**
* @brief PubSub for LoaderEvent
*/
std::shared_ptr<PubSub> getPubsub();
std::shared_ptr<PubSub<LoaderEvent>> getPubsub();
} // namespace

View File

@ -33,7 +33,7 @@ typedef enum {
namespace tt::service::wifi {
enum class EventType {
enum class WifiEvent {
/** Radio was turned on */
RadioStateOn,
/** Radio is turning on. */
@ -61,10 +61,6 @@ enum class RadioState {
Off,
};
struct Event {
EventType type;
};
struct ApRecord {
std::string ssid;
int8_t rssi;
@ -76,7 +72,7 @@ struct ApRecord {
* @brief Get wifi pubsub that broadcasts Event objects
* @return PubSub
*/
std::shared_ptr<PubSub> getPubsub();
std::shared_ptr<PubSub<WifiEvent>> getPubsub();
/** @return Get the current radio state */
RadioState getRadioState();

View File

@ -19,9 +19,11 @@ class WifiConnect : public App {
.onConnectSsidContext = nullptr
};
View view = View(&bindings, &state);
PubSub::SubscriptionHandle wifiSubscription;
PubSub<service::wifi::WifiEvent>::SubscriptionHandle wifiSubscription;
bool view_enabled = false;
void onWifiEvent(service::wifi::WifiEvent event);
public:
WifiConnect();

View File

@ -13,15 +13,15 @@ namespace tt::app::wifimanage {
class WifiManage : public App {
private:
PubSub::SubscriptionHandle wifiSubscription = nullptr;
PubSub<service::wifi::WifiEvent>::SubscriptionHandle wifiSubscription = nullptr;
Mutex mutex;
Bindings bindings = { };
State state;
View view = View(&bindings, &state);
bool isViewEnabled = false;
void onWifiEvent(service::wifi::WifiEvent event);
public:
WifiManage();

View File

@ -1,14 +1,13 @@
#pragma once
#include <Tactility/app/AppContext.h>
#include <Tactility/MessageQueue.h>
#include <Tactility/Mutex.h>
#include <Tactility/PubSub.h>
#include <Tactility/service/Service.h>
#include "Tactility/app/AppContext.h"
#include <Tactility/service/loader/Loader.h>
#include <cstdio>
#include <lvgl.h>
namespace tt::service::gui {
@ -23,7 +22,7 @@ class GuiService : public Service {
// Thread and lock
Thread* thread = nullptr;
Mutex mutex = Mutex(Mutex::Type::Recursive);
PubSub::SubscriptionHandle loader_pubsub_subscription = nullptr;
PubSub<loader::LoaderEvent>::SubscriptionHandle loader_pubsub_subscription = nullptr;
// Layers and Canvas
lv_obj_t* appRootWidget = nullptr;
@ -37,10 +36,10 @@ class GuiService : public Service {
bool isStarted = false;
static void onLoaderMessage(const void* message, TT_UNUSED void* context);
static int32_t guiMain();
void onLoaderEvent(loader::LoaderEvent event);
lv_obj_t* createAppViews(lv_obj_t* parent);
void redraw();

View File

@ -5,8 +5,9 @@
#include "Tactility/lvgl/LvglSync.h"
#include "Tactility/lvgl/Toolbar.h"
#include "Tactility/service/gps/GpsUtil.h"
#include "Tactility/service/loader/Loader.h"
#include <Tactility/service/gps/GpsService.h>
#include <Tactility/service/gps/GpsState.h>
#include "Tactility/service/loader/Loader.h"
#include <cstring>
#include <format>
@ -34,14 +35,9 @@ class GpsSettingsApp final : public App {
lv_obj_t* gpsConfigWrapper = nullptr;
lv_obj_t* addGpsWrapper = nullptr;
bool hasSetInfo = false;
PubSub::SubscriptionHandle serviceStateSubscription = nullptr;
PubSub<service::gps::State>::SubscriptionHandle serviceStateSubscription = nullptr;
std::shared_ptr<service::gps::GpsService> service;
static void onServiceStateChangedCallback(const void* data, void* context) {
auto* app = (GpsSettingsApp*)context;
app->onServiceStateChanged();
}
void onServiceStateChanged() {
auto lock = lvgl::getSyncLock()->asScopedLock();
if (lock.lock(100 / portTICK_PERIOD_MS)) {
@ -313,7 +309,9 @@ public:
lv_obj_set_style_pad_all(infoContainerWidget, 0, 0);
hasSetInfo = false;
serviceStateSubscription = service->getStatePubsub()->subscribe(onServiceStateChangedCallback, this);
serviceStateSubscription = service->getStatePubsub()->subscribe([this](auto) {
onServiceStateChanged();
});
gpsConfigWrapper = lv_obj_create(main_wrapper);
lv_obj_set_size(gpsConfigWrapper, LV_PCT(100), LV_SIZE_CONTENT);

View File

@ -6,36 +6,13 @@
#include "Tactility/lvgl/LvglSync.h"
namespace tt::app::wificonnect {
#define TAG "wifi_connect"
#define WIFI_CONNECT_PARAM_SSID "ssid" // String
#define WIFI_CONNECT_PARAM_PASSWORD "password" // String
constexpr auto TAG = "WifiConnect";
constexpr auto WIFI_CONNECT_PARAM_SSID = "ssid"; // String
constexpr auto WIFI_CONNECT_PARAM_PASSWORD = "password"; // String
extern const AppManifest manifest;
static void eventCallback(const void* message, void* context) {
auto* event = static_cast<const service::wifi::Event*>(message);
auto* wifi = static_cast<WifiConnect*>(context);
State& state = wifi->getState();
switch (event->type) {
case service::wifi::EventType::ConnectionFailed:
if (state.isConnecting()) {
state.setConnecting(false);
state.setConnectionError(true);
wifi->requestViewUpdate();
}
break;
case service::wifi::EventType::ConnectionSuccess:
if (wifi->getState().isConnecting()) {
state.setConnecting(false);
service::loader::stopApp();
}
break;
default:
break;
}
wifi->requestViewUpdate();
}
static void onConnect(const service::wifi::settings::WifiApSettings& ap_settings, bool remember, TT_UNUSED void* parameter) {
auto* wifi = static_cast<WifiConnect*>(parameter);
wifi->getState().setApSettings(ap_settings);
@ -43,8 +20,33 @@ static void onConnect(const service::wifi::settings::WifiApSettings& ap_settings
service::wifi::connect(ap_settings, remember);
}
void WifiConnect::onWifiEvent(service::wifi::WifiEvent event) {
State& state = getState();
switch (event) {
case service::wifi::WifiEvent::ConnectionFailed:
if (state.isConnecting()) {
state.setConnecting(false);
state.setConnectionError(true);
requestViewUpdate();
}
break;
case service::wifi::WifiEvent::ConnectionSuccess:
if (getState().isConnecting()) {
state.setConnecting(false);
service::loader::stopApp();
}
break;
default:
break;
}
requestViewUpdate();
}
WifiConnect::WifiConnect() {
wifiSubscription = service::wifi::getPubsub()->subscribe(&eventCallback, this);
wifiSubscription = service::wifi::getPubsub()->subscribe([this](auto event) {
onWifiEvent(event);
});
bindings = (Bindings) {
.onConnectSsid = onConnect,
.onConnectSsidContext = this,

View File

@ -10,7 +10,7 @@
namespace tt::app::wifimanage {
#define TAG "wifi_manage"
constexpr auto TAG = "WifiManage";
extern const AppManifest manifest;
@ -72,20 +72,18 @@ void WifiManage::requestViewUpdate() {
unlock();
}
static void wifiManageEventCallback(const void* message, void* context) {
auto* event = (service::wifi::Event*)message;
auto* wifi = (WifiManage*)context;
void WifiManage::onWifiEvent(service::wifi::WifiEvent event) {
auto radio_state = service::wifi::getRadioState();
TT_LOG_I(TAG, "Update with state %s", service::wifi::radioStateToString(radio_state));
wifi->getState().setRadioState(radio_state);
switch (event->type) {
using enum tt::service::wifi::EventType;
getState().setRadioState(radio_state);
switch (event) {
using enum service::wifi::WifiEvent;
case ScanStarted:
wifi->getState().setScanning(true);
getState().setScanning(true);
break;
case ScanFinished:
wifi->getState().setScanning(false);
wifi->getState().updateApRecords();
getState().setScanning(false);
getState().updateApRecords();
break;
case RadioStateOn:
if (!service::wifi::isScanning()) {
@ -96,11 +94,13 @@ static void wifiManageEventCallback(const void* message, void* context) {
break;
}
wifi->requestViewUpdate();
requestViewUpdate();
}
void WifiManage::onShow(AppContext& app, lv_obj_t* parent) {
wifiSubscription = service::wifi::getPubsub()->subscribe(&wifiManageEventCallback, this);
wifiSubscription = service::wifi::getPubsub()->subscribe([this](auto event) {
onWifiEvent(event);
});
// State update (it has its own locking)
state.setRadioState(service::wifi::getRadioState());

View File

@ -28,9 +28,9 @@ struct StatusbarIcon {
struct StatusbarData {
Mutex mutex = Mutex(Mutex::Type::Recursive);
std::shared_ptr<PubSub> pubsub = std::make_shared<PubSub>();
std::shared_ptr<PubSub<void*>> pubsub = std::make_shared<PubSub<void*>>();
StatusbarIcon icons[STATUSBAR_ICON_LIMIT] = {};
Timer* time_update_timer = new Timer(Timer::Type::Once, []() { onUpdateTime(); });
Timer* time_update_timer = new Timer(Timer::Type::Once, [] { onUpdateTime(); });
uint8_t time_hours = 0;
uint8_t time_minutes = 0;
bool time_set = false;
@ -44,7 +44,7 @@ typedef struct {
lv_obj_t* time;
lv_obj_t* icons[STATUSBAR_ICON_LIMIT];
lv_obj_t* battery_icon;
PubSub::SubscriptionHandle pubsub_subscription;
PubSub<void*>::SubscriptionHandle pubsub_subscription;
} Statusbar;
static bool statusbar_lock(TickType_t timeoutTicks = portMAX_DELAY) {
@ -108,9 +108,8 @@ static const lv_obj_class_t statusbar_class = {
.theme_inheritable = false
};
static void statusbar_pubsub_event(TT_UNUSED const void* message, void* obj) {
TT_LOG_D(TAG, "event");
auto* statusbar = static_cast<Statusbar*>(obj);
static void statusbar_pubsub_event(Statusbar* statusbar) {
TT_LOG_D(TAG, "Update event");
if (lock(portMAX_DELAY)) {
update_main(statusbar);
lv_obj_invalidate(&statusbar->obj);
@ -133,7 +132,9 @@ static void statusbar_constructor(const lv_obj_class_t* class_p, lv_obj_t* obj)
lv_obj_remove_flag(obj, LV_OBJ_FLAG_SCROLLABLE);
LV_TRACE_OBJ_CREATE("finished");
auto* statusbar = (Statusbar*)obj;
statusbar->pubsub_subscription = statusbar_data.pubsub->subscribe(&statusbar_pubsub_event, statusbar);
statusbar->pubsub_subscription = statusbar_data.pubsub->subscribe([statusbar](auto) {
statusbar_pubsub_event(statusbar);
});
if (!statusbar_data.time_update_timer->isRunning()) {
statusbar_data.time_update_timer->start(200 / portTICK_PERIOD_MS);

View File

@ -208,7 +208,7 @@ void GpsService::setState(State newState) {
lock.lock();
state = newState;
lock.unlock();
statePubSub->publish(&state);
statePubSub->publish(state);
}
bool GpsService::hasCoordinates() const {

View File

@ -12,22 +12,16 @@ namespace tt::service::gui {
extern const ServiceManifest manifest;
constexpr const char* TAG = "gui";
constexpr auto* TAG = "GuiService";
// region AppManifest
void GuiService::onLoaderMessage(const void* message, TT_UNUSED void* context) {
auto service = findService();
if (service == nullptr) {
return;
}
auto* event = static_cast<const loader::LoaderEvent*>(message);
if (event->type == loader::LoaderEventTypeApplicationShowing) {
void GuiService::onLoaderEvent(loader::LoaderEvent event) {
if (event == loader::LoaderEvent::ApplicationShowing) {
auto app_instance = app::getCurrentAppContext();
service->showApp(app_instance);
} else if (event->type == loader::LoaderEventTypeApplicationHiding) {
service->hideApp();
showApp(app_instance);
} else if (event == loader::LoaderEvent::ApplicationHiding) {
hideApp();
}
}
@ -124,7 +118,12 @@ void GuiService::onStart(TT_UNUSED ServiceContext& service) {
4096, // Last known minimum was 2800 for launching desktop
[]() { return guiMain(); }
);
loader_pubsub_subscription = loader::getPubsub()->subscribe(&onLoaderMessage, nullptr);
loader_pubsub_subscription = loader::getPubsub()->subscribe([this](auto event) {
onLoaderEvent(event);
});
tt_check(lvgl::lock(1000 / portTICK_PERIOD_MS));
keyboardGroup = lv_group_create();
auto* screen_root = lv_screen_active();

View File

@ -47,7 +47,7 @@ static const char* appStateToString(app::State state) {
class LoaderService final : public Service {
std::shared_ptr<PubSub> pubsubExternal = std::make_shared<PubSub>();
std::shared_ptr<PubSub<LoaderEvent>> pubsubExternal = std::make_shared<PubSub<LoaderEvent>>();
Mutex mutex = Mutex(Mutex::Type::Recursive);
std::stack<std::shared_ptr<app::AppInstance>> appStack;
app::LaunchId nextLaunchId = 0;
@ -64,13 +64,13 @@ class LoaderService final : public Service {
public:
void onStart(TT_UNUSED ServiceContext& service) final {
void onStart(TT_UNUSED ServiceContext& service) override {
dispatcherThread->start();
}
void onStop(TT_UNUSED ServiceContext& service) final {
void onStop(TT_UNUSED ServiceContext& service) override {
// Send stop signal to thread and wait for thread to finish
mutex.withLock([this]() {
mutex.withLock([this] {
dispatcherThread->stop();
});
}
@ -79,7 +79,7 @@ public:
void stopApp();
std::shared_ptr<app::AppContext> _Nullable getCurrentAppContext();
std::shared_ptr<PubSub> getPubsub() const { return pubsubExternal; }
std::shared_ptr<PubSub<LoaderEvent>> getPubsub() const { return pubsubExternal; }
};
std::shared_ptr<LoaderService> _Nullable optScreenshotService() {
@ -117,8 +117,7 @@ void LoaderService::onStartAppMessage(const std::string& id, app::LaunchId launc
transitionAppToState(new_app, app::State::Showing);
LoaderEvent event_external = { .type = LoaderEventTypeApplicationStarted };
pubsubExternal->publish(&event_external);
pubsubExternal->publish(LoaderEvent::ApplicationStarted);
}
void LoaderService::onStopAppMessage(const std::string& id) {
@ -188,8 +187,7 @@ void LoaderService::onStopAppMessage(const std::string& id) {
lock.unlock();
// WARNING: After this point we cannot change the app states from this method directly anymore as we don't have a lock!
LoaderEvent event_external = { .type = LoaderEventTypeApplicationStopped };
pubsubExternal->publish(&event_external);
pubsubExternal->publish(LoaderEvent::ApplicationStopped);
if (instance_to_resume != nullptr) {
if (result_set) {
@ -240,13 +238,11 @@ void LoaderService::transitionAppToState(const std::shared_ptr<app::AppInstance>
app->getApp()->onCreate(*app);
break;
case Showing: {
LoaderEvent event_showing = { .type = LoaderEventTypeApplicationShowing };
pubsubExternal->publish(&event_showing);
pubsubExternal->publish(LoaderEvent::ApplicationShowing);
break;
}
case Hiding: {
LoaderEvent event_hiding = { .type = LoaderEventTypeApplicationHiding };
pubsubExternal->publish(&event_hiding);
pubsubExternal->publish(LoaderEvent::ApplicationHiding);
break;
}
case Stopped:
@ -307,7 +303,7 @@ std::shared_ptr<app::App> _Nullable getCurrentApp() {
return app_context != nullptr ? app_context->getApp() : nullptr;
}
std::shared_ptr<PubSub> getPubsub() {
std::shared_ptr<PubSub<LoaderEvent>> getPubsub() {
auto service = optScreenshotService();
assert(service);
return service->getPubsub();

View File

@ -48,7 +48,7 @@ public:
Mutex dataMutex = Mutex(Mutex::Type::Recursive);
std::unique_ptr<Timer> autoConnectTimer;
/** @brief The public event bus */
std::shared_ptr<PubSub> pubsub = std::make_shared<PubSub>();
std::shared_ptr<PubSub<WifiEvent>> pubsub = std::make_shared<PubSub<WifiEvent>>();
// TODO: Deal with messages that come in while an action is ongoing
// for example: when scanning and you turn off the radio, the scan should probably stop or turning off
// the radio should disable the on/off button in the app as it is pending.
@ -129,7 +129,7 @@ static std::shared_ptr<Wifi> wifi_singleton;
// region Public functions
std::shared_ptr<PubSub> getPubsub() {
std::shared_ptr<PubSub<WifiEvent>> getPubsub() {
auto wifi = wifi_singleton;
if (wifi == nullptr) {
tt_crash("Service not running");
@ -371,11 +371,10 @@ static void scan_list_free_safely(std::shared_ptr<Wifi> wifi) {
}
}
static void publish_event_simple(std::shared_ptr<Wifi> wifi, EventType type) {
static void publish_event(std::shared_ptr<Wifi> wifi, WifiEvent event) {
auto lock = wifi->dataMutex.asScopedLock();
if (lock.lock()) {
Event turning_on_event = {.type = type};
wifi->pubsub->publish(&turning_on_event);
wifi->pubsub->publish(event);
}
}
@ -484,7 +483,7 @@ static void eventHandler(TT_UNUSED void* arg, esp_event_base_t event_base, int32
break;
}
wifi->setRadioState(RadioState::On);
publish_event_simple(wifi, EventType::Disconnected);
publish_event(wifi, WifiEvent::Disconnected);
kernel::publishSystemEvent(kernel::SystemEvent::NetworkDisconnected);
} else if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) {
auto* event = static_cast<ip_event_got_ip_t*>(event_data);
@ -511,7 +510,7 @@ static void eventHandler(TT_UNUSED void* arg, esp_event_base_t event_base, int32
esp_wifi_scan_stop();
}
publish_event_simple(wifi_singleton, EventType::ScanFinished);
publish_event(wifi_singleton, WifiEvent::ScanFinished);
TT_LOG_I(TAG, "eventHandler: Finished scan");
if (copied_list && wifi_singleton->getRadioState() == RadioState::On && !wifi->pause_auto_connect) {
@ -537,7 +536,7 @@ static void dispatchEnable(std::shared_ptr<Wifi> wifi) {
if (lock.lock(50 / portTICK_PERIOD_MS)) {
TT_LOG_I(TAG, "Enabling");
wifi->setRadioState(RadioState::OnPending);
publish_event_simple(wifi, EventType::RadioStateOnPending);
publish_event(wifi, WifiEvent::RadioStateOnPending);
if (wifi->netif != nullptr) {
esp_netif_destroy(wifi->netif);
@ -554,7 +553,7 @@ static void dispatchEnable(std::shared_ptr<Wifi> wifi) {
TT_LOG_E(TAG, "Insufficient memory");
}
wifi->setRadioState(RadioState::Off);
publish_event_simple(wifi, EventType::RadioStateOff);
publish_event(wifi, WifiEvent::RadioStateOff);
return;
}
@ -582,7 +581,7 @@ static void dispatchEnable(std::shared_ptr<Wifi> wifi) {
TT_LOG_E(TAG, "Wifi mode setting failed");
wifi->setRadioState(RadioState::Off);
esp_wifi_deinit();
publish_event_simple(wifi, EventType::RadioStateOff);
publish_event(wifi, WifiEvent::RadioStateOff);
return;
}
@ -595,12 +594,12 @@ static void dispatchEnable(std::shared_ptr<Wifi> wifi) {
wifi->setRadioState(RadioState::Off);
esp_wifi_set_mode(WIFI_MODE_NULL);
esp_wifi_deinit();
publish_event_simple(wifi, EventType::RadioStateOff);
publish_event(wifi, WifiEvent::RadioStateOff);
return;
}
wifi->setRadioState(RadioState::On);
publish_event_simple(wifi, EventType::RadioStateOn);
publish_event(wifi, WifiEvent::RadioStateOn);
wifi->pause_auto_connect = false;
@ -631,7 +630,7 @@ static void dispatchDisable(std::shared_ptr<Wifi> wifi) {
TT_LOG_I(TAG, "Disabling");
wifi->setRadioState(RadioState::OffPending);
publish_event_simple(wifi, EventType::RadioStateOffPending);
publish_event(wifi, WifiEvent::RadioStateOffPending);
// Free up scan list memory
scan_list_free_safely(wifi_singleton);
@ -639,7 +638,7 @@ static void dispatchDisable(std::shared_ptr<Wifi> wifi) {
if (esp_wifi_stop() != ESP_OK) {
TT_LOG_E(TAG, "Failed to stop radio");
wifi->setRadioState(RadioState::On);
publish_event_simple(wifi, EventType::RadioStateOn);
publish_event(wifi, WifiEvent::RadioStateOn);
return;
}
@ -672,7 +671,7 @@ static void dispatchDisable(std::shared_ptr<Wifi> wifi) {
wifi->netif = nullptr;
wifi->setScanActive(false);
wifi->setRadioState(RadioState::Off);
publish_event_simple(wifi, EventType::RadioStateOff);
publish_event(wifi, WifiEvent::RadioStateOff);
TT_LOG_I(TAG, "Disabled");
}
@ -706,7 +705,7 @@ static void dispatchScan(std::shared_ptr<Wifi> wifi) {
TT_LOG_I(TAG, "Starting scan");
wifi->setScanActive(true);
publish_event_simple(wifi, EventType::ScanStarted);
publish_event(wifi, WifiEvent::ScanStarted);
}
static void dispatchConnect(std::shared_ptr<Wifi> wifi) {
@ -740,7 +739,7 @@ static void dispatchConnect(std::shared_ptr<Wifi> wifi) {
wifi->setRadioState(RadioState::ConnectionPending);
publish_event_simple(wifi, EventType::ConnectionPending);
publish_event(wifi, WifiEvent::ConnectionPending);
wifi_config_t config;
memset(&config, 0, sizeof(wifi_config_t));
@ -762,7 +761,7 @@ static void dispatchConnect(std::shared_ptr<Wifi> wifi) {
if (set_config_result != ESP_OK) {
wifi->setRadioState(RadioState::On);
TT_LOG_E(TAG, "Failed to set wifi config (%s)", esp_err_to_name(set_config_result));
publish_event_simple(wifi, EventType::ConnectionFailed);
publish_event(wifi, WifiEvent::ConnectionFailed);
return;
}
@ -771,7 +770,7 @@ static void dispatchConnect(std::shared_ptr<Wifi> wifi) {
if (wifi_start_result != ESP_OK) {
wifi->setRadioState(RadioState::On);
TT_LOG_E(TAG, "Failed to start wifi to begin connecting (%s)", esp_err_to_name(wifi_start_result));
publish_event_simple(wifi, EventType::ConnectionFailed);
publish_event(wifi, WifiEvent::ConnectionFailed);
return;
}
@ -784,7 +783,7 @@ static void dispatchConnect(std::shared_ptr<Wifi> wifi) {
if (bits & WIFI_CONNECTED_BIT) {
wifi->setSecureConnection(config.sta.password[0] != 0x00U);
wifi->setRadioState(RadioState::ConnectionActive);
publish_event_simple(wifi, EventType::ConnectionSuccess);
publish_event(wifi, WifiEvent::ConnectionSuccess);
TT_LOG_I(TAG, "Connected to %s", wifi->connection_target.ssid.c_str());
if (wifi->connection_target_remember) {
if (!settings::save(wifi->connection_target)) {
@ -795,11 +794,11 @@ static void dispatchConnect(std::shared_ptr<Wifi> wifi) {
}
} else if (bits & WIFI_FAIL_BIT) {
wifi->setRadioState(RadioState::On);
publish_event_simple(wifi, EventType::ConnectionFailed);
publish_event(wifi, WifiEvent::ConnectionFailed);
TT_LOG_I(TAG, "Failed to connect to %s", wifi->connection_target.ssid.c_str());
} else {
wifi->setRadioState(RadioState::On);
publish_event_simple(wifi, EventType::ConnectionFailed);
publish_event(wifi, WifiEvent::ConnectionFailed);
TT_LOG_E(TAG, "UNEXPECTED EVENT");
}
@ -834,7 +833,7 @@ static void dispatchDisconnectButKeepActive(std::shared_ptr<Wifi> wifi) {
// TODO: disable radio, because radio state is in limbo between off and on
wifi->setRadioState(RadioState::Off);
TT_LOG_E(TAG, "failed to set wifi config (%s)", esp_err_to_name(set_config_result));
publish_event_simple(wifi, EventType::RadioStateOff);
publish_event(wifi, WifiEvent::RadioStateOff);
return;
}
@ -843,12 +842,12 @@ static void dispatchDisconnectButKeepActive(std::shared_ptr<Wifi> wifi) {
// TODO: disable radio, because radio state is in limbo between off and on
wifi->setRadioState(RadioState::Off);
TT_LOG_E(TAG, "failed to start wifi to begin connecting (%s)", esp_err_to_name(wifi_start_result));
publish_event_simple(wifi, EventType::RadioStateOff);
publish_event(wifi, WifiEvent::RadioStateOff);
return;
}
wifi->setRadioState(RadioState::On);
publish_event_simple(wifi, EventType::Disconnected);
publish_event(wifi, WifiEvent::Disconnected);
TT_LOG_I(TAG, "Disconnected");
}

View File

@ -22,7 +22,7 @@ struct Wifi {
/** @brief Locking mechanism for modifying the Wifi instance */
Mutex mutex = Mutex(Mutex::Type::Recursive);
/** @brief The public event bus */
std::shared_ptr<PubSub> pubsub = std::make_shared<PubSub>();
std::shared_ptr<PubSub<WifiEvent>> pubsub = std::make_shared<PubSub<WifiEvent>>();
/** @brief The internal message queue */
bool scan_active = false;
bool secure_connection = false;
@ -34,16 +34,15 @@ static Wifi* wifi = nullptr;
// region Static
static void publish_event_simple(Wifi* wifi, EventType type) {
Event turning_on_event = { .type = type };
wifi->pubsub->publish(&turning_on_event);
static void publish_event(WifiEvent event) {
wifi->pubsub->publish(event);
}
// endregion Static
// region Public functions
std::shared_ptr<PubSub> getPubsub() {
std::shared_ptr<PubSub<WifiEvent>> getPubsub() {
assert(wifi);
return wifi->pubsub;
}

View File

@ -1,7 +1,3 @@
/**
* @file pubsub.h
* PubSub
*/
#pragma once
#include "Mutex.h"
@ -9,18 +5,13 @@
namespace tt {
/** PubSub Callback type */
typedef void (*PubSubCallback)(const void* message, void* context);
/** Publish and subscribe to messages in a thread-safe manner. */
template<typename DataType>
class PubSub {
private:
struct Subscription {
uint64_t id;
PubSubCallback callback;
void* callbackParameter;
std::function<void(DataType)> callback;
};
typedef std::list<Subscription> Subscriptions;
@ -42,21 +33,55 @@ public:
/** Start receiving messages at the specified handle (Threadsafe, Re-entrable)
* @param[in] callback
* @param[in] callbackParameter the data to pass to the callback
* @return subscription instance
*/
SubscriptionHandle subscribe(PubSubCallback callback, void* callbackParameter);
SubscriptionHandle subscribe(std::function<void(DataType)> callback) {
mutex.lock();
items.push_back({
.id = (++lastId),
.callback = std::move(callback)
});
mutex.unlock();
return reinterpret_cast<SubscriptionHandle>(lastId);
}
/** Stop receiving messages at the specified handle (Threadsafe, Re-entrable.)
* No use of `tt_pubsub_subscription` allowed after call of this method
* @param[in] subscription
*/
void unsubscribe(SubscriptionHandle subscription);
void unsubscribe(SubscriptionHandle subscription) {
assert(subscription);
/** Publish message to all subscribers (Threadsafe, Re-entrable.)
* @param[in] message message pointer to publish - it is passed as-is to the callback
mutex.lock();
bool result = false;
auto id = reinterpret_cast<uint64_t>(subscription);
for (auto it = items.begin(); it != items.end(); ++it) {
if (it->id == id) {
items.erase(it);
result = true;
break;
}
}
mutex.unlock();
tt_check(result);
}
/** Publish something to all subscribers (Threadsafe, Re-entrable.)
* @param[in] data the data to publish
*/
void publish(void* message);
void publish(DataType data) {
mutex.lock();
// Iterate over subscribers
for (auto& it : items) {
it.callback(data);
}
mutex.unlock();
}
};

View File

@ -1,47 +0,0 @@
#include "Tactility/PubSub.h"
#include "Tactility/Check.h"
namespace tt {
PubSub::SubscriptionHandle PubSub::subscribe(PubSubCallback callback, void* callbackParameter) {
mutex.lock();
items.push_back({
.id = (++lastId),
.callback = callback,
.callbackParameter = callbackParameter});
mutex.unlock();
return (Subscription*)lastId;
}
void PubSub::unsubscribe(SubscriptionHandle subscription) {
assert(subscription);
mutex.lock();
bool result = false;
auto id = (uint64_t)subscription;
for (auto it = items.begin(); it != items.end(); it++) {
if (it->id == id) {
items.erase(it);
result = true;
break;
}
}
mutex.unlock();
tt_check(result);
}
void PubSub::publish(void* message) {
mutex.lock();
// Iterate over subscribers
for (auto& it : items) {
it.callback(message, it.callbackParameter);
}
mutex.unlock();
}
} // namespace

View File

@ -0,0 +1,35 @@
#include "doctest.h"
#include <Tactility/TactilityCore.h>
#include <Tactility/PubSub.h>
using namespace tt;
TEST_CASE("PubSub publishing with no subscriptions should not crash") {
PubSub<int> pubsub;
pubsub.publish(1);
}
TEST_CASE("PubSub subscription receives published data") {
PubSub<int> pubsub;
int value = 0;
auto subscription = pubsub.subscribe([&value](auto newValue) {
value = newValue;
});
pubsub.publish(1);
CHECK_EQ(value, 1);
}
TEST_CASE("PubSub unsubscribed subscription does not receive published data") {
PubSub<int> pubsub;
int value = 0;
auto subscription = pubsub.subscribe([&value](auto newValue) {
value = newValue;
});
pubsub.unsubscribe(subscription);
pubsub.publish(1);
CHECK_EQ(value, 0);
}