1 /*
2 * Copyright (C) 2015-2020 Alibaba Group Holding Limited
3 */
4
5 #include <stdio.h>
6 #include <stdint.h>
7
8 #include <aos/kernel.h>
9
10 #if AOS_COMP_CLI
11 #include "aos/cli.h"
12 #endif
13
14 /**
15 * 该示例使用消息队列实现任务间数据同步,具体场景为创建任务A和认为B,以及一消息队列。\n\r
16 * 任务A作为生产者循环向消息队列发送消息,任务B作为消费者循环从消息队列接收消息,
17 * 一般情况下,消费者处理数据是要花费很长时间,所以会导致消息产生的速度大于消息处理的速度,使得消息队列溢出。
18 * 所以可以通过调整任务B优先级大于任务A来避免这种情况,或者使用信号量来控制数据收发同步。
19 * 示例说明如下:
20 * 1. t0时刻,任务T调用aos_queue_new()创建一互斥量。任务T然后调用aos_task_create()创建任务A和任务B,任务A优先级设置为31,任务B优先级设置为30。任务B因消息队列无消息而阻塞,任务A得到运行向消息队列发送消息。
21 * 2. t1时刻,任务B因从消息队列读取到消息而解除阻塞,任务B对消息进行处理后继续等待新的消息到来。
22 * 3. t2时刻,任务A向消息队列发送消息。
23 * 4. t3时刻,重复t1时刻的操作。
24 */
25
26 /* module name used by log print */
27 #define MODULE_NAME "aos_queue_example"
28
29 /* taskA parameters */
30 #define TASKA_NAME "taskA"
31 #define TASKA_PRIO 31
32 #define TASKA_STACKSIZE 1024
33
34 /* taskB parameters */
35 #define TASKB_NAME "taskB"
36 #define TASKB_PRIO 30
37 #define TASKB_STACKSIZE 1024
38
39 /* queue resource */
40 #define MESSAGE_MAX_LENGTH 10 /* maximum message length */
41
42 /* Static memory for static creation */
43 static aos_queue_t queue_handle; /* queue handle */
44 static char queue_buffer[MESSAGE_MAX_LENGTH * 10]; /* for the internal buffer of the queue */
45
46 /* task entry for taskA*/
taskA_entry(void * arg)47 static void taskA_entry(void *arg)
48 {
49 uint32_t i;
50 aos_status_t status;
51
52 char message_buf[MESSAGE_MAX_LENGTH]; /* buffer used to send message */
53 uint8_t message_id = 0;
54
55 while (1) {
56 /**
57 * generate message. The sequence of messages is as follows:
58 * 0123456789
59 * 1234567890
60 * 2345678901
61 * ......
62 */
63 for (i = 0; i < sizeof(message_buf); i++) {
64 message_buf[i] = (message_id + i) % 10;
65 }
66 message_id++;
67
68 /* send message. The message length must not exceed the maximum message length */
69 status = aos_queue_send(&queue_handle, (void *)message_buf, sizeof(message_buf));
70 if (status != 0) {
71 printf("[%s]send buf queue error\n", MODULE_NAME);
72 }
73
74 aos_msleep(1000); /* sleep 1000ms */
75 }
76 }
77
78 /* task entry for taskB*/
taskB_entry(void * arg)79 static void taskB_entry(void *arg)
80 {
81 uint32_t i;
82 aos_status_t status;
83 /* The buffer must be greater than or equal to the maximum message length */
84 char message_buf[MESSAGE_MAX_LENGTH];
85 size_t rev_size = 0;
86
87 while (1) {
88 /**
89 * receive message. The task will wait until it receives the message.
90 * rev_size is set to the actual length of the received message.
91 */
92 status = aos_queue_recv(&queue_handle, AOS_WAIT_FOREVER, (void *)message_buf, &rev_size);
93 if (status == 0) {
94 /* show message data */
95 printf("[%s]%d recv message : ", MODULE_NAME, rev_size);
96 for (i = 0; i < rev_size; i++) {
97 printf("%d", message_buf[i]);
98 }
99 printf("\r\n");
100 } else {
101 printf("[%s]recv buf queue error\n", MODULE_NAME);
102 }
103 }
104 }
105
aos_queue_example(int argc,char ** argv)106 static void aos_queue_example(int argc, char **argv)
107 {
108 aos_status_t status;
109 aos_task_t taskA_handle;
110 aos_task_t taskB_handle;
111
112 /**
113 * create a queue.
114 * queue: queue_handle(aos_queue_t struct variable)
115 * buf: queue_buffer(for the internal buffer of the queue)
116 * size: sizeof(queue_buffer) is the length of buf
117 * max_msg: MESSAGE_MAX_LENGTH(maximum message length, here is 10 byte)
118 */
119 status = aos_queue_new(&queue_handle, (void *)queue_buffer, sizeof(queue_buffer),
120 MESSAGE_MAX_LENGTH);
121 if (status != 0) {
122 printf("[%s]create queue error\n", MODULE_NAME);
123 return;
124 }
125
126 /* TaskA is a producer that produces a set of data every second. */
127 status = aos_task_create(&taskA_handle, TASKA_NAME, taskA_entry, NULL, NULL, TASKA_STACKSIZE, TASKA_PRIO, AOS_TASK_AUTORUN);
128 if (status != 0) {
129 aos_queue_free(&queue_handle);
130 printf("[%s]create %s error\n", MODULE_NAME, TASKA_NAME);
131 return;
132 }
133
134 /* TaskB is the consumer that processes the data sent by taskA. */
135 status = aos_task_create(&taskB_handle, TASKB_NAME, taskB_entry, NULL, NULL, TASKB_STACKSIZE, TASKB_PRIO, AOS_TASK_AUTORUN);
136 if (status != 0) {
137 aos_queue_free(&queue_handle);
138 printf("[%s]create %s error\n", MODULE_NAME, TASKB_NAME);
139 return;
140 }
141 }
142
143 #if AOS_COMP_CLI
144 /* reg args: fun, cmd, description*/
145 ALIOS_CLI_CMD_REGISTER(aos_queue_example, queue_example, aos queue example)
146 #endif