1 /*
2  * Copyright (c) 2006-2023, RT-Thread Development Team
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  *
6  * Change Logs:
7  * Date           Author       Notes
8  * 2012-09-30     Bernard      first version.
9  * 2016-10-31     armink       fix some resume push and pop thread bugs
10  * 2023-09-15     xqyjlj       perf rt_hw_interrupt_disable/enable
11  * 2024-01-25     Shell        porting to susp_list API
12  */
13 
14 #include <rthw.h>
15 #include <rtdevice.h>
16 
17 #define DATAQUEUE_MAGIC  0xbead0e0e
18 
19 struct rt_data_item
20 {
21     const void *data_ptr;
22     rt_size_t data_size;
23 };
24 
25 /**
26  * @brief    This function will initialize the data queue. Calling this function will
27  *           initialize the data queue control block and set the notification callback function.
28  *
29  * @param    queue is a pointer to the data queue object.
30  *
31  * @param    size is the maximum number of data in the data queue.
32  *
33  * @param    lwm is low water mark.
34  *           When the number of data in the data queue is less than this value, this function will
35  *           wake up the thread waiting for write data.
36  *
37  * @param    evt_notify is the notification callback function.
38  *
39  * @return   Return the operation status. When the return value is RT_EOK, the initialization is successful.
40  *           When the return value is -RT_ENOMEM, it means insufficient memory allocation failed.
41  */
42 rt_err_t
rt_data_queue_init(struct rt_data_queue * queue,rt_uint16_t size,rt_uint16_t lwm,void (* evt_notify)(struct rt_data_queue * queue,rt_uint32_t event))43 rt_data_queue_init(struct rt_data_queue *queue,
44                    rt_uint16_t size,
45                    rt_uint16_t lwm,
46                    void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event))
47 {
48     RT_ASSERT(queue != RT_NULL);
49     RT_ASSERT(size > 0);
50 
51     queue->evt_notify = evt_notify;
52 
53     queue->magic = DATAQUEUE_MAGIC;
54     queue->size = size;
55     queue->lwm = lwm;
56 
57     queue->get_index = 0;
58     queue->put_index = 0;
59     queue->is_empty = 1;
60     queue->is_full = 0;
61 
62     rt_spin_lock_init(&(queue->spinlock));
63 
64     rt_list_init(&(queue->suspended_push_list));
65     rt_list_init(&(queue->suspended_pop_list));
66 
67     queue->queue = (struct rt_data_item *)rt_malloc(sizeof(struct rt_data_item) * size);
68     if (queue->queue == RT_NULL)
69     {
70         return -RT_ENOMEM;
71     }
72 
73     return RT_EOK;
74 }
75 RTM_EXPORT(rt_data_queue_init);
76 
77 /**
78  * @brief    This function will write data to the data queue. If the data queue is full,
79  *           the thread will suspend for the specified amount of time.
80  *
81  * @param    queue is a pointer to the data queue object.
82  * .
83  * @param    data_ptr is the buffer pointer of the data to be written.
84  *
85  * @param    size is the size in bytes of the data to be written.
86  *
87  * @param    timeout is the waiting time.
88  *
89  * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
90  *           When the return value is -RT_ETIMEOUT, it means the specified time out.
91  */
rt_data_queue_push(struct rt_data_queue * queue,const void * data_ptr,rt_size_t data_size,rt_int32_t timeout)92 rt_err_t rt_data_queue_push(struct rt_data_queue *queue,
93                             const void *data_ptr,
94                             rt_size_t data_size,
95                             rt_int32_t timeout)
96 {
97     rt_base_t level;
98     rt_thread_t thread;
99     rt_err_t    result;
100 
101     RT_ASSERT(queue != RT_NULL);
102     RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
103 
104     /* current context checking */
105     RT_DEBUG_SCHEDULER_AVAILABLE(timeout != 0);
106 
107     result = RT_EOK;
108     thread = rt_thread_self();
109 
110     level = rt_spin_lock_irqsave(&(queue->spinlock));
111     while (queue->is_full)
112     {
113         /* queue is full */
114         if (timeout == 0)
115         {
116             result = -RT_ETIMEOUT;
117 
118             goto __exit;
119         }
120 
121         /* reset thread error number */
122         thread->error = RT_EOK;
123 
124         /* suspend thread on the push list */
125         result = rt_thread_suspend_to_list(thread, &queue->suspended_push_list,
126                                            RT_IPC_FLAG_FIFO, RT_UNINTERRUPTIBLE);
127         if (result == RT_EOK)
128         {
129             /* start timer */
130             if (timeout > 0)
131             {
132                 /* reset the timeout of thread timer and start it */
133                 rt_timer_control(&(thread->thread_timer),
134                                 RT_TIMER_CTRL_SET_TIME,
135                                 &timeout);
136                 rt_timer_start(&(thread->thread_timer));
137             }
138 
139             /* enable interrupt */
140             rt_spin_unlock_irqrestore(&(queue->spinlock), level);
141 
142             /* do schedule */
143             rt_schedule();
144 
145             /* thread is waked up */
146             level = rt_spin_lock_irqsave(&(queue->spinlock));
147 
148             /* error may be modified by waker, so take the lock before accessing it */
149             result = thread->error;
150         }
151         if (result != RT_EOK) goto __exit;
152     }
153 
154     queue->queue[queue->put_index].data_ptr  = data_ptr;
155     queue->queue[queue->put_index].data_size = data_size;
156     queue->put_index += 1;
157     if (queue->put_index == queue->size)
158     {
159         queue->put_index = 0;
160     }
161     queue->is_empty = 0;
162     if (queue->put_index == queue->get_index)
163     {
164         queue->is_full = 1;
165     }
166 
167     /* there is at least one thread in suspended list */
168     if (rt_susp_list_dequeue(&queue->suspended_pop_list,
169                              RT_THREAD_RESUME_RES_THR_ERR))
170     {
171         /* unlock and perform a schedule */
172         rt_spin_unlock_irqrestore(&(queue->spinlock), level);
173 
174         /* perform a schedule */
175         rt_schedule();
176 
177         return result;
178     }
179 
180 __exit:
181     rt_spin_unlock_irqrestore(&(queue->spinlock), level);
182     if ((result == RT_EOK) && queue->evt_notify != RT_NULL)
183     {
184         queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH);
185     }
186 
187     return result;
188 }
189 RTM_EXPORT(rt_data_queue_push);
190 
191 /**
192  * @brief    This function will pop data from the data queue. If the data queue is empty,the thread
193  *           will suspend for the specified amount of time.
194  *
195  * @note     When the number of data in the data queue is less than lwm(low water mark), will
196  *           wake up the thread waiting for write data.
197  *
198  * @param    queue is a pointer to the data queue object.
199  *
200  * @param    data_ptr is the buffer pointer of the data to be fetched.
201  *
202  * @param    size is the size in bytes of the data to be fetched.
203  *
204  * @param    timeout is the waiting time.
205  *
206  * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
207  *           When the return value is -RT_ETIMEOUT, it means the specified time out.
208  */
rt_data_queue_pop(struct rt_data_queue * queue,const void ** data_ptr,rt_size_t * size,rt_int32_t timeout)209 rt_err_t rt_data_queue_pop(struct rt_data_queue *queue,
210                            const void **data_ptr,
211                            rt_size_t *size,
212                            rt_int32_t timeout)
213 {
214     rt_base_t level;
215     rt_thread_t thread;
216     rt_err_t    result;
217 
218     RT_ASSERT(queue != RT_NULL);
219     RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
220     RT_ASSERT(data_ptr != RT_NULL);
221     RT_ASSERT(size != RT_NULL);
222 
223     /* current context checking */
224     RT_DEBUG_SCHEDULER_AVAILABLE(timeout != 0);
225 
226     result = RT_EOK;
227     thread = rt_thread_self();
228 
229     level = rt_spin_lock_irqsave(&(queue->spinlock));
230     while (queue->is_empty)
231     {
232         /* queue is empty */
233         if (timeout == 0)
234         {
235             result = -RT_ETIMEOUT;
236             goto __exit;
237         }
238 
239         /* reset thread error number */
240         thread->error = RT_EOK;
241 
242         /* suspend thread on the pop list */
243         result = rt_thread_suspend_to_list(thread, &queue->suspended_pop_list,
244                                            RT_IPC_FLAG_FIFO, RT_UNINTERRUPTIBLE);
245         if (result == RT_EOK)
246         {
247             /* start timer */
248             if (timeout > 0)
249             {
250                 /* reset the timeout of thread timer and start it */
251                 rt_timer_control(&(thread->thread_timer),
252                                 RT_TIMER_CTRL_SET_TIME,
253                                 &timeout);
254                 rt_timer_start(&(thread->thread_timer));
255             }
256 
257             /* enable interrupt */
258             rt_spin_unlock_irqrestore(&(queue->spinlock), level);
259 
260             /* do schedule */
261             rt_schedule();
262 
263             /* thread is waked up */
264             level  = rt_spin_lock_irqsave(&(queue->spinlock));
265             result = thread->error;
266             if (result != RT_EOK)
267                 goto __exit;
268         }
269     }
270 
271     *data_ptr = queue->queue[queue->get_index].data_ptr;
272     *size     = queue->queue[queue->get_index].data_size;
273     queue->get_index += 1;
274     if (queue->get_index == queue->size)
275     {
276         queue->get_index = 0;
277     }
278     queue->is_full = 0;
279     if (queue->put_index == queue->get_index)
280     {
281         queue->is_empty = 1;
282     }
283 
284     if (rt_data_queue_len(queue) <= queue->lwm)
285     {
286         /* there is at least one thread in suspended list */
287         if (rt_susp_list_dequeue(&queue->suspended_push_list,
288                                        RT_THREAD_RESUME_RES_THR_ERR))
289         {
290             /* unlock and perform a schedule */
291             rt_spin_unlock_irqrestore(&(queue->spinlock), level);
292 
293             /* perform a schedule */
294             rt_schedule();
295         }
296         else
297         {
298             rt_spin_unlock_irqrestore(&(queue->spinlock), level);
299         }
300 
301         if (queue->evt_notify != RT_NULL)
302             queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM);
303 
304         return result;
305     }
306 
307 __exit:
308     rt_spin_unlock_irqrestore(&(queue->spinlock), level);
309     if ((result == RT_EOK) && (queue->evt_notify != RT_NULL))
310     {
311         queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP);
312     }
313 
314     return result;
315 }
316 RTM_EXPORT(rt_data_queue_pop);
317 
318 /**
319  * @brief    This function will fetch but retaining data in the data queue.
320  *
321  * @param    queue is a pointer to the data queue object.
322  *
323  * @param    data_ptr is the buffer pointer of the data to be fetched.
324  *
325  * @param    size is the size in bytes of the data to be fetched.
326  *
327  * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
328  *           When the return value is -RT_EEMPTY, it means the data queue is empty.
329  */
rt_data_queue_peek(struct rt_data_queue * queue,const void ** data_ptr,rt_size_t * size)330 rt_err_t rt_data_queue_peek(struct rt_data_queue *queue,
331                             const void **data_ptr,
332                             rt_size_t *size)
333 {
334     rt_base_t level;
335 
336     RT_ASSERT(queue != RT_NULL);
337     RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
338 
339     if (queue->is_empty)
340     {
341         return -RT_EEMPTY;
342     }
343 
344     level = rt_spin_lock_irqsave(&(queue->spinlock));
345 
346     *data_ptr = queue->queue[queue->get_index].data_ptr;
347     *size     = queue->queue[queue->get_index].data_size;
348 
349     rt_spin_unlock_irqrestore(&(queue->spinlock), level);
350 
351     return RT_EOK;
352 }
353 RTM_EXPORT(rt_data_queue_peek);
354 
355 /**
356  * @brief    This function will reset the data queue.
357  *
358  * @note     Calling this function will wake up all threads on the data queue
359  *           that are hanging and waiting.
360  *
361  * @param    queue is a pointer to the data queue object.
362  */
rt_data_queue_reset(struct rt_data_queue * queue)363 void rt_data_queue_reset(struct rt_data_queue *queue)
364 {
365     rt_base_t level;
366 
367     RT_ASSERT(queue != RT_NULL);
368     RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
369 
370     level = rt_spin_lock_irqsave(&(queue->spinlock));
371 
372     queue->get_index = 0;
373     queue->put_index = 0;
374     queue->is_empty = 1;
375     queue->is_full = 0;
376 
377     rt_spin_unlock_irqrestore(&(queue->spinlock), level);
378 
379     rt_enter_critical();
380     /* wakeup all suspend threads */
381 
382     /* resume on pop list */
383     rt_susp_list_resume_all_irq(&queue->suspended_pop_list, RT_ERROR,
384                                 &(queue->spinlock));
385 
386     /* resume on push list */
387     rt_susp_list_resume_all_irq(&queue->suspended_push_list, RT_ERROR,
388                                 &(queue->spinlock));
389 
390     rt_exit_critical();
391 
392     rt_schedule();
393 }
394 RTM_EXPORT(rt_data_queue_reset);
395 
396 /**
397  * @brief    This function will deinit the data queue.
398  *
399  * @param    queue is a pointer to the data queue object.
400  *
401  * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
402  */
rt_data_queue_deinit(struct rt_data_queue * queue)403 rt_err_t rt_data_queue_deinit(struct rt_data_queue *queue)
404 {
405     rt_base_t level;
406 
407     RT_ASSERT(queue != RT_NULL);
408     RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
409 
410     /* wakeup all suspend threads */
411     rt_data_queue_reset(queue);
412 
413     level = rt_spin_lock_irqsave(&(queue->spinlock));
414     queue->magic = 0;
415     rt_spin_unlock_irqrestore(&(queue->spinlock), level);
416 
417     rt_free(queue->queue);
418 
419     return RT_EOK;
420 }
421 RTM_EXPORT(rt_data_queue_deinit);
422 
423 /**
424  * @brief    This function will get the number of data in the data queue.
425  *
426  * @param    queue is a pointer to the data queue object.
427  *
428  * @return   Return the number of data in the data queue.
429  */
rt_data_queue_len(struct rt_data_queue * queue)430 rt_uint16_t rt_data_queue_len(struct rt_data_queue *queue)
431 {
432     rt_base_t level;
433     rt_int16_t len;
434 
435     RT_ASSERT(queue != RT_NULL);
436     RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
437 
438     if (queue->is_empty)
439     {
440         return 0;
441     }
442 
443     level = rt_spin_lock_irqsave(&(queue->spinlock));
444 
445     if (queue->put_index > queue->get_index)
446     {
447         len = queue->put_index - queue->get_index;
448     }
449     else
450     {
451         len = queue->size + queue->put_index - queue->get_index;
452     }
453 
454     rt_spin_unlock_irqrestore(&(queue->spinlock), level);
455 
456     return len;
457 }
458 RTM_EXPORT(rt_data_queue_len);
459 
460