1 /*
2  * Copyright (c) 2006-2025 RT-Thread Development Team
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  *
6  * Change Logs:
7  * Date           Author       Notes
8  * 2012-09-30     Bernard      first version.
9  * 2025-03-04     wumingzi     add doxygen comments.
10  */
11 
12 #include <rthw.h>
13 #include <rtdevice.h>
14 #include "dev_audio_pipe.h"
15 
_rt_audio_pipe_resume_writer(struct rt_audio_pipe * pipe)16 static void _rt_audio_pipe_resume_writer(struct rt_audio_pipe *pipe)
17 {
18     if (!rt_list_isempty(&pipe->suspended_write_list))
19     {
20         rt_thread_t thread;
21 
22         RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_WR);
23 
24         /* get suspended thread */
25         thread = RT_THREAD_LIST_NODE_ENTRY(pipe->suspended_write_list.next);
26 
27         /* resume the write thread */
28         rt_thread_resume(thread);
29 
30         rt_schedule();
31     }
32 }
33 
34 /**
35  * @brief Read audio pipe
36  *
37  * @param[in] dev pointer to audio device will be read
38  *
39  * @param[in] pos useless param
40  *
41  * @param[in] buffer pointer to ringbuffer of audio pipe to be read
42  *
43  * @param[in] size number of bytes will be read
44  *
45  * @return number of read bytes
46  *
47  * @note This function will execute time-consuming or affecting the
48  *       system operations like memcpy and disable interrupt.
49  */
rt_audio_pipe_read(rt_device_t dev,rt_off_t pos,void * buffer,rt_size_t size)50 static rt_ssize_t rt_audio_pipe_read(rt_device_t dev,
51                               rt_off_t    pos,
52                               void       *buffer,
53                               rt_size_t   size)
54 {
55     rt_base_t level;
56     rt_thread_t thread;
57     struct rt_audio_pipe *pipe;
58     rt_size_t read_nbytes;
59 
60     pipe = (struct rt_audio_pipe *)dev;
61     RT_ASSERT(pipe != RT_NULL);
62 
63     if (!(pipe->flag & RT_PIPE_FLAG_BLOCK_RD))
64     {
65         level = rt_hw_interrupt_disable();
66         read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), (rt_uint8_t *)buffer, size);
67 
68         /* if the ringbuffer is empty, there won't be any writer waiting */
69         if (read_nbytes)
70             _rt_audio_pipe_resume_writer(pipe);
71 
72         rt_hw_interrupt_enable(level);
73 
74         return read_nbytes;
75     }
76 
77     thread = rt_thread_self();
78 
79     /* current context checking */
80     RT_DEBUG_NOT_IN_INTERRUPT;
81 
82     do
83     {
84         level = rt_hw_interrupt_disable();
85         read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), (rt_uint8_t *)buffer, size);
86         if (read_nbytes == 0)
87         {
88             rt_thread_suspend(thread);
89             /* waiting on suspended read list */
90             rt_list_insert_before(&(pipe->suspended_read_list),
91                                   &RT_THREAD_LIST_NODE(thread));
92             rt_hw_interrupt_enable(level);
93 
94             rt_schedule();
95         }
96         else
97         {
98             _rt_audio_pipe_resume_writer(pipe);
99             rt_hw_interrupt_enable(level);
100             break;
101         }
102     }
103     while (read_nbytes == 0);
104 
105     return read_nbytes;
106 }
107 
108 /**
109  * @brief Resume audio pipe reader thread
110  *
111  * @param[in] pipe pointer to suspended audio pipe thread
112  */
_rt_audio_pipe_resume_reader(struct rt_audio_pipe * pipe)113 static void _rt_audio_pipe_resume_reader(struct rt_audio_pipe *pipe)
114 {
115     if (pipe->parent.rx_indicate)
116         pipe->parent.rx_indicate(&pipe->parent,
117                                  rt_ringbuffer_data_len(&pipe->ringbuffer));
118 
119     if (!rt_list_isempty(&pipe->suspended_read_list))
120     {
121         rt_thread_t thread;
122 
123         RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_RD);
124 
125         /* get suspended thread */
126         thread = RT_THREAD_LIST_NODE_ENTRY(pipe->suspended_read_list.next);
127 
128         /* resume the read thread */
129         rt_thread_resume(thread);
130 
131         rt_schedule();
132     }
133 }
134 
135 /**
136  * @brief Write data into audio pipe
137  *
138  * @param[in] dev pointer to audio pipe that has been configured
139  *
140  * @param[in] pos useless param
141  *
142  * @param[in] buffer pointer to buffer of ringbuffer
143  *
144  * @param[in] size size of data will be written
145  *
146  * @return number of written bytes
147  *
148  * @note The function will disable interrupt and may suspend current thread
149  */
rt_audio_pipe_write(rt_device_t dev,rt_off_t pos,const void * buffer,rt_size_t size)150 static rt_ssize_t rt_audio_pipe_write(rt_device_t dev,
151                                rt_off_t    pos,
152                                const void *buffer,
153                                rt_size_t   size)
154 {
155     rt_base_t level;
156     rt_thread_t thread;
157     struct rt_audio_pipe *pipe;
158     rt_size_t write_nbytes;
159 
160     pipe = (struct rt_audio_pipe *)dev;
161     RT_ASSERT(pipe != RT_NULL);
162 
163     if ((pipe->flag & RT_PIPE_FLAG_FORCE_WR) ||
164             !(pipe->flag & RT_PIPE_FLAG_BLOCK_WR))
165     {
166         level = rt_hw_interrupt_disable();
167 
168         if (pipe->flag & RT_PIPE_FLAG_FORCE_WR)
169             write_nbytes = rt_ringbuffer_put_force(&(pipe->ringbuffer),
170                                                    (const rt_uint8_t *)buffer, size);
171         else
172             write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer),
173                                              (const rt_uint8_t *)buffer, size);
174 
175         _rt_audio_pipe_resume_reader(pipe);
176 
177         rt_hw_interrupt_enable(level);
178 
179         return write_nbytes;
180     }
181 
182     thread = rt_thread_self();
183 
184     /* current context checking */
185     RT_DEBUG_NOT_IN_INTERRUPT;
186 
187     do
188     {
189         level = rt_hw_interrupt_disable();
190         write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer), (const rt_uint8_t *)buffer, size);
191         if (write_nbytes == 0)
192         {
193             /* pipe full, waiting on suspended write list */
194             rt_thread_suspend(thread);
195             /* waiting on suspended read list */
196             rt_list_insert_before(&(pipe->suspended_write_list),
197                                   &RT_THREAD_LIST_NODE(thread));
198             rt_hw_interrupt_enable(level);
199 
200             rt_schedule();
201         }
202         else
203         {
204             _rt_audio_pipe_resume_reader(pipe);
205             rt_hw_interrupt_enable(level);
206             break;
207         }
208     }
209     while (write_nbytes == 0);
210 
211     return write_nbytes;
212 }
213 
214 /**
215  * @brief Control audio pipe
216  *
217  * @param[in] dev pointer to pipe
218  *
219  * @param[in] cmd control command
220  *
221  * @param[in] args control argument
222  *
223  * @return error code, RT_EOK is successful otherwise means failure
224  */
rt_audio_pipe_control(rt_device_t dev,int cmd,void * args)225 static rt_err_t rt_audio_pipe_control(rt_device_t dev, int cmd, void *args)
226 {
227     struct rt_audio_pipe *pipe;
228 
229     pipe = (struct rt_audio_pipe *)dev;
230 
231     if (cmd == PIPE_CTRL_GET_SPACE && args)
232         *(rt_size_t *)args = rt_ringbuffer_space_len(&pipe->ringbuffer);
233     return RT_EOK;
234 }
235 
236 #ifdef RT_USING_DEVICE_OPS
237 const static struct rt_device_ops audio_pipe_ops =
238 {
239     RT_NULL,
240     RT_NULL,
241     RT_NULL,
242     rt_audio_pipe_read,
243     rt_audio_pipe_write,
244     rt_audio_pipe_control
245 };
246 #endif
247 
248 /**
249  * @brief Init audio pipe
250  *
251  * This function will initialize a pipe device and put it under control of
252  * resource management.
253  *
254  * @param pipe the pipe device
255  *
256  * @param name the name of pipe device
257  *
258  * @param flag the attribute of the pipe device
259  *
260  * @param buf  the buffer of pipe device
261  *
262  * @param size the size of pipe device buffer
263  *
264  * @return the operation status, RT_EOK on successful
265  */
rt_audio_pipe_init(struct rt_audio_pipe * pipe,const char * name,rt_int32_t flag,rt_uint8_t * buf,rt_size_t size)266 rt_err_t rt_audio_pipe_init(struct rt_audio_pipe *pipe,
267                             const char *name,
268                             rt_int32_t flag,
269                             rt_uint8_t *buf,
270                             rt_size_t size)
271 {
272     RT_ASSERT(pipe);
273     RT_ASSERT(buf);
274 
275     /* initialize suspended list */
276     rt_list_init(&pipe->suspended_read_list);
277     rt_list_init(&pipe->suspended_write_list);
278 
279     /* initialize ring buffer */
280     rt_ringbuffer_init(&pipe->ringbuffer, buf, size);
281 
282     pipe->flag = flag;
283 
284     /* create pipe */
285     pipe->parent.type    = RT_Device_Class_Pipe;
286 #ifdef RT_USING_DEVICE_OPS
287     pipe->parent.ops     = &audio_pipe_ops;
288 #else
289     pipe->parent.init    = RT_NULL;
290     pipe->parent.open    = RT_NULL;
291     pipe->parent.close   = RT_NULL;
292     pipe->parent.read    = rt_audio_pipe_read;
293     pipe->parent.write   = rt_audio_pipe_write;
294     pipe->parent.control = rt_audio_pipe_control;
295 #endif
296 
297     return rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR);
298 }
299 
300 /**
301  * @brief This function will detach a pipe device from resource management
302  *
303  * @param pipe the pipe device
304  *
305  * @return the operation status, RT_EOK on successful
306  */
rt_audio_pipe_detach(struct rt_audio_pipe * pipe)307 rt_err_t rt_audio_pipe_detach(struct rt_audio_pipe *pipe)
308 {
309     return rt_device_unregister(&pipe->parent);
310 }
311 
312 /**
313  * @brief Creat audio pipe
314  *
315  * @param[in] name pipe name
316  *
317  * @param[in] flag pipe flags, it can be one of enum rt_audio_pipe_flag items
318  *
319  * @param[in] size ringbuffer size
320  *
321  * @return error code, RT_EOK on initialization successfully
322  *
323  * @note depend on RT_USING_HEAP
324  */
325 #ifdef RT_USING_HEAP
rt_audio_pipe_create(const char * name,rt_int32_t flag,rt_size_t size)326 rt_err_t rt_audio_pipe_create(const char *name, rt_int32_t flag, rt_size_t size)
327 {
328     rt_uint8_t *rb_memptr = RT_NULL;
329     struct rt_audio_pipe *pipe = RT_NULL;
330 
331     /* get aligned size */
332     size = RT_ALIGN(size, RT_ALIGN_SIZE);
333     pipe = (struct rt_audio_pipe *)rt_calloc(1, sizeof(struct rt_audio_pipe));
334     if (pipe == RT_NULL)
335         return -RT_ENOMEM;
336 
337     /* create ring buffer of pipe */
338     rb_memptr = (rt_uint8_t *)rt_malloc(size);
339     if (rb_memptr == RT_NULL)
340     {
341         rt_free(pipe);
342         return -RT_ENOMEM;
343     }
344 
345     return rt_audio_pipe_init(pipe, name, flag, rb_memptr, size);
346 }
347 
348 /**
349  * @brief Detachaudio pipe and free its ringbuffer
350  *
351  * @param[in] pipe pointer to the pipe will be destory
352  *
353  * @note depend on RT_USING_HEAP
354  */
rt_audio_pipe_destroy(struct rt_audio_pipe * pipe)355 void rt_audio_pipe_destroy(struct rt_audio_pipe *pipe)
356 {
357     if (pipe == RT_NULL)
358         return;
359 
360     /* un-register pipe device */
361     rt_audio_pipe_detach(pipe);
362 
363     /* release memory */
364     rt_free(pipe->ringbuffer.buffer_ptr);
365     rt_free(pipe);
366 
367     return;
368 }
369 
370 #endif /* RT_USING_HEAP */