/* * Copyright (c) 2023 Rodrigo Peixoto * SPDX-License-Identifier: Apache-2.0 */ #include #include #include LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL); static atomic_t sub_count = ATOMIC_INIT(0); struct confirmed_msg { uint32_t payload; }; ZBUS_CHAN_DEFINE(confirmed_chan, /* Name */ struct confirmed_msg, /* Message type */ NULL, /* Validator */ &sub_count, /* User data */ ZBUS_OBSERVERS(foo_lis, bar_sub1, bar_sub2, bar_sub3), /* observers */ ZBUS_MSG_INIT(.payload = 0) /* Initial value */ ); static void listener_callback_example(const struct zbus_channel *chan) { const struct confirmed_msg *cm = zbus_chan_const_msg(chan); LOG_INF("From listener -> Confirmed message payload = %u", cm->payload); } ZBUS_LISTENER_DEFINE(foo_lis, listener_callback_example); ZBUS_SUBSCRIBER_DEFINE(bar_sub1, 4); static void bar_sub1_task(void) { const struct zbus_channel *chan; while (!zbus_sub_wait(&bar_sub1, &chan, K_FOREVER)) { struct confirmed_msg cm; if (&confirmed_chan != chan) { continue; } zbus_chan_read(&confirmed_chan, &cm, K_MSEC(500)); k_msleep(2500); atomic_dec(zbus_chan_user_data(&confirmed_chan)); LOG_INF("From bar_sub1 subscriber -> Confirmed " "message payload = " "%u", cm.payload); } } K_THREAD_DEFINE(bar_sub1_task_id, CONFIG_MAIN_STACK_SIZE, bar_sub1_task, NULL, NULL, NULL, 3, 0, 0); ZBUS_SUBSCRIBER_DEFINE(bar_sub2, 4); static void bar_sub2_task(void) { const struct zbus_channel *chan; while (!zbus_sub_wait(&bar_sub2, &chan, K_FOREVER)) { struct confirmed_msg cm; if (&confirmed_chan != chan) { continue; } zbus_chan_read(&confirmed_chan, &cm, K_MSEC(500)); k_msleep(1000); atomic_dec(zbus_chan_user_data(&confirmed_chan)); LOG_INF("From bar_sub2 subscriber -> Confirmed " "message payload = " "%u", cm.payload); } } K_THREAD_DEFINE(bar_sub2_task_id, CONFIG_MAIN_STACK_SIZE, bar_sub2_task, NULL, NULL, NULL, 3, 0, 0); ZBUS_SUBSCRIBER_DEFINE(bar_sub3, 4); static void bar_sub3_task(void) { const struct zbus_channel *chan; while (!zbus_sub_wait(&bar_sub3, &chan, K_FOREVER)) { struct confirmed_msg cm; if (&confirmed_chan != chan) { continue; } zbus_chan_read(&confirmed_chan, &cm, K_MSEC(500)); k_msleep(5000); atomic_dec(zbus_chan_user_data(&confirmed_chan)); LOG_INF("From bar_sub3 subscriber -> Confirmed " "message payload = " "%u", cm.payload); } } K_THREAD_DEFINE(bar_sub3_task_id, CONFIG_MAIN_STACK_SIZE, bar_sub3_task, NULL, NULL, NULL, 3, 0, 0); static void pub_to_confirmed_channel(struct confirmed_msg *cm) { /* Wait for channel be consumed */ while (atomic_get(zbus_chan_user_data(&confirmed_chan)) > 0) { k_msleep(100); } /* Set the number of subscribers to consume the channel */ atomic_set(zbus_chan_user_data(&confirmed_chan), 3); zbus_chan_pub(&confirmed_chan, cm, K_MSEC(500)); } int main(void) { struct confirmed_msg cm = {0}; while (1) { pub_to_confirmed_channel(&cm); ++cm.payload; } return 0; }