1 /*
2  * Copyright (C) 2015-2020 Alibaba Group Holding Limited
3  */
4 
5 #include <stdio.h>
6 #include <stdint.h>
7 
8 #include <aos/kernel.h>
9 
10 #if AOS_COMP_CLI
11 #include "aos/cli.h"
12 #endif
13 
14 /**
15  * 该示例使用消息队列实现任务间数据同步,具体场景为创建任务A和认为B,以及一消息队列。\n\r
16  * 任务A作为生产者循环向消息队列发送消息,任务B作为消费者循环从消息队列接收消息,
17  * 一般情况下,消费者处理数据是要花费很长时间,所以会导致消息产生的速度大于消息处理的速度,使得消息队列溢出。
18  * 所以可以通过调整任务B优先级大于任务A来避免这种情况,或者使用信号量来控制数据收发同步。
19  * 示例说明如下:
20  * 1. t0时刻,任务T调用aos_queue_new()创建一互斥量。任务T然后调用aos_task_create()创建任务A和任务B,任务A优先级设置为31,任务B优先级设置为30。任务B因消息队列无消息而阻塞,任务A得到运行向消息队列发送消息。
21  * 2. t1时刻,任务B因从消息队列读取到消息而解除阻塞,任务B对消息进行处理后继续等待新的消息到来。
22  * 3. t2时刻,任务A向消息队列发送消息。
23  * 4. t3时刻,重复t1时刻的操作。
24  */
25 
26 /* module name used by log print */
27 #define MODULE_NAME "aos_queue_example"
28 
29 /* taskA parameters */
30 #define TASKA_NAME      "taskA"
31 #define TASKA_PRIO       31
32 #define TASKA_STACKSIZE 1024
33 
34 /* taskB parameters */
35 #define TASKB_NAME      "taskB"
36 #define TASKB_PRIO       30
37 #define TASKB_STACKSIZE 1024
38 
39 /* queue resource */
40 #define MESSAGE_MAX_LENGTH 10 /* maximum message length */
41 
42 /* Static memory for static creation */
43 static aos_queue_t queue_handle;                          /* queue handle */
44 static char        queue_buffer[MESSAGE_MAX_LENGTH * 10]; /* for the internal buffer of the queue */
45 
46 /* task entry for taskA*/
taskA_entry(void * arg)47 static void taskA_entry(void *arg)
48 {
49     uint32_t      i;
50     aos_status_t  status;
51 
52     char     message_buf[MESSAGE_MAX_LENGTH]; /* buffer used to send message */
53     uint8_t  message_id = 0;
54 
55     while (1) {
56         /**
57          * generate message. The sequence of messages is as follows:
58          * 0123456789
59          * 1234567890
60          * 2345678901
61          * ......
62          */
63         for (i = 0; i < sizeof(message_buf); i++) {
64             message_buf[i] = (message_id + i) % 10;
65         }
66         message_id++;
67 
68         /* send message. The message length must not exceed the maximum message length */
69         status = aos_queue_send(&queue_handle, (void *)message_buf, sizeof(message_buf));
70         if (status != 0) {
71             printf("[%s]send buf queue error\n", MODULE_NAME);
72         }
73 
74         aos_msleep(1000); /* sleep 1000ms */
75     }
76 }
77 
78 /* task entry for taskB*/
taskB_entry(void * arg)79 static void taskB_entry(void *arg)
80 {
81     uint32_t      i;
82     aos_status_t  status;
83     /* The buffer must be greater than or equal to the maximum message length */
84     char          message_buf[MESSAGE_MAX_LENGTH];
85     size_t        rev_size = 0;
86 
87     while (1) {
88         /**
89          * receive message. The task will wait until it receives the message.
90          * rev_size is set to the actual length of the received message.
91          */
92         status = aos_queue_recv(&queue_handle, AOS_WAIT_FOREVER, (void *)message_buf, &rev_size);
93         if (status == 0) {
94             /* show message data */
95             printf("[%s]%d recv message : ", MODULE_NAME, rev_size);
96             for (i = 0; i < rev_size; i++) {
97                 printf("%d", message_buf[i]);
98             }
99             printf("\r\n");
100         } else {
101             printf("[%s]recv buf queue error\n", MODULE_NAME);
102         }
103     }
104 }
105 
aos_queue_example(int argc,char ** argv)106 static void aos_queue_example(int argc, char **argv)
107 {
108     aos_status_t  status;
109     aos_task_t    taskA_handle;
110     aos_task_t    taskB_handle;
111 
112     /**
113      * create a queue.
114      * queue:   queue_handle(aos_queue_t struct variable)
115      * buf:     queue_buffer(for the internal buffer of the queue)
116      * size:    sizeof(queue_buffer) is the length of buf
117      * max_msg: MESSAGE_MAX_LENGTH(maximum message length, here is 10 byte)
118      */
119     status = aos_queue_new(&queue_handle, (void *)queue_buffer, sizeof(queue_buffer),
120                            MESSAGE_MAX_LENGTH);
121     if (status != 0) {
122         printf("[%s]create queue error\n", MODULE_NAME);
123         return;
124     }
125 
126     /* TaskA is a producer that produces a set of data every second. */
127     status = aos_task_create(&taskA_handle, TASKA_NAME, taskA_entry, NULL, NULL, TASKA_STACKSIZE, TASKA_PRIO, AOS_TASK_AUTORUN);
128     if (status != 0) {
129         aos_queue_free(&queue_handle);
130         printf("[%s]create %s error\n", MODULE_NAME, TASKA_NAME);
131         return;
132     }
133 
134     /* TaskB is the consumer that processes the data sent by taskA. */
135     status = aos_task_create(&taskB_handle, TASKB_NAME, taskB_entry, NULL, NULL, TASKB_STACKSIZE, TASKB_PRIO, AOS_TASK_AUTORUN);
136     if (status != 0) {
137         aos_queue_free(&queue_handle);
138         printf("[%s]create %s error\n", MODULE_NAME, TASKB_NAME);
139         return;
140     }
141 }
142 
143 #if AOS_COMP_CLI
144 /* reg args: fun, cmd, description*/
145 ALIOS_CLI_CMD_REGISTER(aos_queue_example, queue_example, aos queue example)
146 #endif