1 /*
2 * Copyright (c) 2006-2023, RT-Thread Development Team
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 *
6 * Change Logs:
7 * Date Author Notes
8 * 2012-09-30 Bernard first version.
9 * 2016-10-31 armink fix some resume push and pop thread bugs
10 * 2023-09-15 xqyjlj perf rt_hw_interrupt_disable/enable
11 * 2024-01-25 Shell porting to susp_list API
12 */
13
14 #include <rthw.h>
15 #include <rtdevice.h>
16
17 #define DATAQUEUE_MAGIC 0xbead0e0e
18
19 struct rt_data_item
20 {
21 const void *data_ptr;
22 rt_size_t data_size;
23 };
24
25 /**
26 * @brief This function will initialize the data queue. Calling this function will
27 * initialize the data queue control block and set the notification callback function.
28 *
29 * @param queue is a pointer to the data queue object.
30 *
31 * @param size is the maximum number of data in the data queue.
32 *
33 * @param lwm is low water mark.
34 * When the number of data in the data queue is less than this value, this function will
35 * wake up the thread waiting for write data.
36 *
37 * @param evt_notify is the notification callback function.
38 *
39 * @return Return the operation status. When the return value is RT_EOK, the initialization is successful.
40 * When the return value is -RT_ENOMEM, it means insufficient memory allocation failed.
41 */
42 rt_err_t
rt_data_queue_init(struct rt_data_queue * queue,rt_uint16_t size,rt_uint16_t lwm,void (* evt_notify)(struct rt_data_queue * queue,rt_uint32_t event))43 rt_data_queue_init(struct rt_data_queue *queue,
44 rt_uint16_t size,
45 rt_uint16_t lwm,
46 void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event))
47 {
48 RT_ASSERT(queue != RT_NULL);
49 RT_ASSERT(size > 0);
50
51 queue->evt_notify = evt_notify;
52
53 queue->magic = DATAQUEUE_MAGIC;
54 queue->size = size;
55 queue->lwm = lwm;
56
57 queue->get_index = 0;
58 queue->put_index = 0;
59 queue->is_empty = 1;
60 queue->is_full = 0;
61
62 rt_spin_lock_init(&(queue->spinlock));
63
64 rt_list_init(&(queue->suspended_push_list));
65 rt_list_init(&(queue->suspended_pop_list));
66
67 queue->queue = (struct rt_data_item *)rt_malloc(sizeof(struct rt_data_item) * size);
68 if (queue->queue == RT_NULL)
69 {
70 return -RT_ENOMEM;
71 }
72
73 return RT_EOK;
74 }
75 RTM_EXPORT(rt_data_queue_init);
76
77 /**
78 * @brief This function will write data to the data queue. If the data queue is full,
79 * the thread will suspend for the specified amount of time.
80 *
81 * @param queue is a pointer to the data queue object.
82 * .
83 * @param data_ptr is the buffer pointer of the data to be written.
84 *
85 * @param size is the size in bytes of the data to be written.
86 *
87 * @param timeout is the waiting time.
88 *
89 * @return Return the operation status. When the return value is RT_EOK, the operation is successful.
90 * When the return value is -RT_ETIMEOUT, it means the specified time out.
91 */
rt_data_queue_push(struct rt_data_queue * queue,const void * data_ptr,rt_size_t data_size,rt_int32_t timeout)92 rt_err_t rt_data_queue_push(struct rt_data_queue *queue,
93 const void *data_ptr,
94 rt_size_t data_size,
95 rt_int32_t timeout)
96 {
97 rt_base_t level;
98 rt_thread_t thread;
99 rt_err_t result;
100
101 RT_ASSERT(queue != RT_NULL);
102 RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
103
104 /* current context checking */
105 RT_DEBUG_SCHEDULER_AVAILABLE(timeout != 0);
106
107 result = RT_EOK;
108 thread = rt_thread_self();
109
110 level = rt_spin_lock_irqsave(&(queue->spinlock));
111 while (queue->is_full)
112 {
113 /* queue is full */
114 if (timeout == 0)
115 {
116 result = -RT_ETIMEOUT;
117
118 goto __exit;
119 }
120
121 /* reset thread error number */
122 thread->error = RT_EOK;
123
124 /* suspend thread on the push list */
125 result = rt_thread_suspend_to_list(thread, &queue->suspended_push_list,
126 RT_IPC_FLAG_FIFO, RT_UNINTERRUPTIBLE);
127 if (result == RT_EOK)
128 {
129 /* start timer */
130 if (timeout > 0)
131 {
132 /* reset the timeout of thread timer and start it */
133 rt_timer_control(&(thread->thread_timer),
134 RT_TIMER_CTRL_SET_TIME,
135 &timeout);
136 rt_timer_start(&(thread->thread_timer));
137 }
138
139 /* enable interrupt */
140 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
141
142 /* do schedule */
143 rt_schedule();
144
145 /* thread is waked up */
146 level = rt_spin_lock_irqsave(&(queue->spinlock));
147
148 /* error may be modified by waker, so take the lock before accessing it */
149 result = thread->error;
150 }
151 if (result != RT_EOK) goto __exit;
152 }
153
154 queue->queue[queue->put_index].data_ptr = data_ptr;
155 queue->queue[queue->put_index].data_size = data_size;
156 queue->put_index += 1;
157 if (queue->put_index == queue->size)
158 {
159 queue->put_index = 0;
160 }
161 queue->is_empty = 0;
162 if (queue->put_index == queue->get_index)
163 {
164 queue->is_full = 1;
165 }
166
167 /* there is at least one thread in suspended list */
168 if (rt_susp_list_dequeue(&queue->suspended_pop_list,
169 RT_THREAD_RESUME_RES_THR_ERR))
170 {
171 /* unlock and perform a schedule */
172 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
173
174 /* perform a schedule */
175 rt_schedule();
176
177 return result;
178 }
179
180 __exit:
181 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
182 if ((result == RT_EOK) && queue->evt_notify != RT_NULL)
183 {
184 queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH);
185 }
186
187 return result;
188 }
189 RTM_EXPORT(rt_data_queue_push);
190
191 /**
192 * @brief This function will pop data from the data queue. If the data queue is empty,the thread
193 * will suspend for the specified amount of time.
194 *
195 * @note When the number of data in the data queue is less than lwm(low water mark), will
196 * wake up the thread waiting for write data.
197 *
198 * @param queue is a pointer to the data queue object.
199 *
200 * @param data_ptr is the buffer pointer of the data to be fetched.
201 *
202 * @param size is the size in bytes of the data to be fetched.
203 *
204 * @param timeout is the waiting time.
205 *
206 * @return Return the operation status. When the return value is RT_EOK, the operation is successful.
207 * When the return value is -RT_ETIMEOUT, it means the specified time out.
208 */
rt_data_queue_pop(struct rt_data_queue * queue,const void ** data_ptr,rt_size_t * size,rt_int32_t timeout)209 rt_err_t rt_data_queue_pop(struct rt_data_queue *queue,
210 const void **data_ptr,
211 rt_size_t *size,
212 rt_int32_t timeout)
213 {
214 rt_base_t level;
215 rt_thread_t thread;
216 rt_err_t result;
217
218 RT_ASSERT(queue != RT_NULL);
219 RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
220 RT_ASSERT(data_ptr != RT_NULL);
221 RT_ASSERT(size != RT_NULL);
222
223 /* current context checking */
224 RT_DEBUG_SCHEDULER_AVAILABLE(timeout != 0);
225
226 result = RT_EOK;
227 thread = rt_thread_self();
228
229 level = rt_spin_lock_irqsave(&(queue->spinlock));
230 while (queue->is_empty)
231 {
232 /* queue is empty */
233 if (timeout == 0)
234 {
235 result = -RT_ETIMEOUT;
236 goto __exit;
237 }
238
239 /* reset thread error number */
240 thread->error = RT_EOK;
241
242 /* suspend thread on the pop list */
243 result = rt_thread_suspend_to_list(thread, &queue->suspended_pop_list,
244 RT_IPC_FLAG_FIFO, RT_UNINTERRUPTIBLE);
245 if (result == RT_EOK)
246 {
247 /* start timer */
248 if (timeout > 0)
249 {
250 /* reset the timeout of thread timer and start it */
251 rt_timer_control(&(thread->thread_timer),
252 RT_TIMER_CTRL_SET_TIME,
253 &timeout);
254 rt_timer_start(&(thread->thread_timer));
255 }
256
257 /* enable interrupt */
258 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
259
260 /* do schedule */
261 rt_schedule();
262
263 /* thread is waked up */
264 level = rt_spin_lock_irqsave(&(queue->spinlock));
265 result = thread->error;
266 if (result != RT_EOK)
267 goto __exit;
268 }
269 }
270
271 *data_ptr = queue->queue[queue->get_index].data_ptr;
272 *size = queue->queue[queue->get_index].data_size;
273 queue->get_index += 1;
274 if (queue->get_index == queue->size)
275 {
276 queue->get_index = 0;
277 }
278 queue->is_full = 0;
279 if (queue->put_index == queue->get_index)
280 {
281 queue->is_empty = 1;
282 }
283
284 if (rt_data_queue_len(queue) <= queue->lwm)
285 {
286 /* there is at least one thread in suspended list */
287 if (rt_susp_list_dequeue(&queue->suspended_push_list,
288 RT_THREAD_RESUME_RES_THR_ERR))
289 {
290 /* unlock and perform a schedule */
291 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
292
293 /* perform a schedule */
294 rt_schedule();
295 }
296 else
297 {
298 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
299 }
300
301 if (queue->evt_notify != RT_NULL)
302 queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM);
303
304 return result;
305 }
306
307 __exit:
308 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
309 if ((result == RT_EOK) && (queue->evt_notify != RT_NULL))
310 {
311 queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP);
312 }
313
314 return result;
315 }
316 RTM_EXPORT(rt_data_queue_pop);
317
318 /**
319 * @brief This function will fetch but retaining data in the data queue.
320 *
321 * @param queue is a pointer to the data queue object.
322 *
323 * @param data_ptr is the buffer pointer of the data to be fetched.
324 *
325 * @param size is the size in bytes of the data to be fetched.
326 *
327 * @return Return the operation status. When the return value is RT_EOK, the operation is successful.
328 * When the return value is -RT_EEMPTY, it means the data queue is empty.
329 */
rt_data_queue_peek(struct rt_data_queue * queue,const void ** data_ptr,rt_size_t * size)330 rt_err_t rt_data_queue_peek(struct rt_data_queue *queue,
331 const void **data_ptr,
332 rt_size_t *size)
333 {
334 rt_base_t level;
335
336 RT_ASSERT(queue != RT_NULL);
337 RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
338
339 if (queue->is_empty)
340 {
341 return -RT_EEMPTY;
342 }
343
344 level = rt_spin_lock_irqsave(&(queue->spinlock));
345
346 *data_ptr = queue->queue[queue->get_index].data_ptr;
347 *size = queue->queue[queue->get_index].data_size;
348
349 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
350
351 return RT_EOK;
352 }
353 RTM_EXPORT(rt_data_queue_peek);
354
355 /**
356 * @brief This function will reset the data queue.
357 *
358 * @note Calling this function will wake up all threads on the data queue
359 * that are hanging and waiting.
360 *
361 * @param queue is a pointer to the data queue object.
362 */
rt_data_queue_reset(struct rt_data_queue * queue)363 void rt_data_queue_reset(struct rt_data_queue *queue)
364 {
365 rt_base_t level;
366
367 RT_ASSERT(queue != RT_NULL);
368 RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
369
370 level = rt_spin_lock_irqsave(&(queue->spinlock));
371
372 queue->get_index = 0;
373 queue->put_index = 0;
374 queue->is_empty = 1;
375 queue->is_full = 0;
376
377 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
378
379 rt_enter_critical();
380 /* wakeup all suspend threads */
381
382 /* resume on pop list */
383 rt_susp_list_resume_all_irq(&queue->suspended_pop_list, RT_ERROR,
384 &(queue->spinlock));
385
386 /* resume on push list */
387 rt_susp_list_resume_all_irq(&queue->suspended_push_list, RT_ERROR,
388 &(queue->spinlock));
389
390 rt_exit_critical();
391
392 rt_schedule();
393 }
394 RTM_EXPORT(rt_data_queue_reset);
395
396 /**
397 * @brief This function will deinit the data queue.
398 *
399 * @param queue is a pointer to the data queue object.
400 *
401 * @return Return the operation status. When the return value is RT_EOK, the operation is successful.
402 */
rt_data_queue_deinit(struct rt_data_queue * queue)403 rt_err_t rt_data_queue_deinit(struct rt_data_queue *queue)
404 {
405 rt_base_t level;
406
407 RT_ASSERT(queue != RT_NULL);
408 RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
409
410 /* wakeup all suspend threads */
411 rt_data_queue_reset(queue);
412
413 level = rt_spin_lock_irqsave(&(queue->spinlock));
414 queue->magic = 0;
415 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
416
417 rt_free(queue->queue);
418
419 return RT_EOK;
420 }
421 RTM_EXPORT(rt_data_queue_deinit);
422
423 /**
424 * @brief This function will get the number of data in the data queue.
425 *
426 * @param queue is a pointer to the data queue object.
427 *
428 * @return Return the number of data in the data queue.
429 */
rt_data_queue_len(struct rt_data_queue * queue)430 rt_uint16_t rt_data_queue_len(struct rt_data_queue *queue)
431 {
432 rt_base_t level;
433 rt_int16_t len;
434
435 RT_ASSERT(queue != RT_NULL);
436 RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
437
438 if (queue->is_empty)
439 {
440 return 0;
441 }
442
443 level = rt_spin_lock_irqsave(&(queue->spinlock));
444
445 if (queue->put_index > queue->get_index)
446 {
447 len = queue->put_index - queue->get_index;
448 }
449 else
450 {
451 len = queue->size + queue->put_index - queue->get_index;
452 }
453
454 rt_spin_unlock_irqrestore(&(queue->spinlock), level);
455
456 return len;
457 }
458 RTM_EXPORT(rt_data_queue_len);
459
460