1 /*
2 * COPYRIGHT (C) 2011-2021, Real-Thread Information Technology Ltd
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 *
6 * Change Logs:
7 * Date Author Notes
8 * 2013-11-04 Grissiom add comment
9 */
10
11 #include <rthw.h>
12 #include <rtthread.h>
13
14 #include "prio_queue.h"
15
16 struct rt_prio_queue_item {
17 struct rt_prio_queue_item *next;
18 /* data follows */
19 };
20
_do_push(struct rt_prio_queue * que,rt_uint8_t prio,struct rt_prio_queue_item * item)21 static void _do_push(struct rt_prio_queue *que,
22 rt_uint8_t prio,
23 struct rt_prio_queue_item *item)
24 {
25 if (que->head[prio] == RT_NULL)
26 {
27 que->head[prio] = item;
28 que->bitmap |= 1 << prio;
29 }
30 else
31 {
32 RT_ASSERT(que->tail[prio]);
33 que->tail[prio]->next = item;
34 }
35 que->tail[prio] = item;
36 }
37
_do_pop(struct rt_prio_queue * que)38 static struct rt_prio_queue_item* _do_pop(struct rt_prio_queue *que)
39 {
40 int ffs;
41 struct rt_prio_queue_item *item;
42
43 ffs = __rt_ffs(que->bitmap);
44 if (ffs == 0)
45 return RT_NULL;
46 ffs--;
47
48 item = que->head[ffs];
49 RT_ASSERT(item);
50
51 que->head[ffs] = item->next;
52 if (que->head[ffs] == RT_NULL)
53 {
54 que->bitmap &= ~(1 << ffs);
55 }
56
57 return item;
58 }
59
rt_prio_queue_init(struct rt_prio_queue * que,const char * name,void * buf,rt_size_t bufsz,rt_size_t itemsz)60 rt_err_t rt_prio_queue_init(struct rt_prio_queue *que,
61 const char *name,
62 void *buf,
63 rt_size_t bufsz,
64 rt_size_t itemsz)
65 {
66 RT_ASSERT(que);
67
68 rt_memset(que, 0, sizeof(*que));
69
70 rt_list_init(&(que->suspended_pop_list));
71
72 rt_mp_init(&que->pool, name, buf, bufsz,
73 sizeof(struct rt_prio_queue_item) + itemsz);
74
75 que->item_sz = itemsz;
76
77 return RT_EOK;
78 }
79
rt_prio_queue_detach(struct rt_prio_queue * que)80 void rt_prio_queue_detach(struct rt_prio_queue *que)
81 {
82 /* wake up all suspended pop threads, push thread is suspended on mempool.
83 */
84 while (!rt_list_isempty(&(que->suspended_pop_list)))
85 {
86 rt_thread_t thread;
87
88 /* disable interrupt */
89 rt_base_t level = rt_hw_interrupt_disable();
90
91 /* get next suspend thread */
92 thread = RT_THREAD_LIST_NODE_ENTRY(que->suspended_pop_list.next);
93 /* set error code to -RT_ERROR */
94 thread->error = -RT_ERROR;
95
96 rt_thread_resume(thread);
97
98 /* enable interrupt */
99 rt_hw_interrupt_enable(level);
100 }
101 rt_mp_detach(&que->pool);
102 }
103
104 #ifdef RT_USING_HEAP
rt_prio_queue_create(const char * name,rt_size_t item_nr,rt_size_t item_sz)105 struct rt_prio_queue* rt_prio_queue_create(const char *name,
106 rt_size_t item_nr,
107 rt_size_t item_sz)
108 {
109 struct rt_prio_queue *que;
110 rt_size_t bufsz;
111
112 bufsz = item_nr * (sizeof(struct rt_prio_queue_item)
113 + item_sz
114 + sizeof(void*));
115
116 RT_ASSERT(item_nr);
117
118 que = rt_malloc(sizeof(*que) + bufsz);
119 if (!que)
120 return RT_NULL;
121
122 rt_prio_queue_init(que, name, que+1, bufsz, item_sz);
123
124 return que;
125 }
126
rt_prio_queue_delete(struct rt_prio_queue * que)127 void rt_prio_queue_delete(struct rt_prio_queue *que)
128 {
129 rt_prio_queue_detach(que);
130 rt_free(que);
131 }
132 #endif
133
rt_prio_queue_push(struct rt_prio_queue * que,rt_uint8_t prio,void * data,rt_int32_t timeout)134 rt_err_t rt_prio_queue_push(struct rt_prio_queue *que,
135 rt_uint8_t prio,
136 void *data,
137 rt_int32_t timeout)
138 {
139 rt_base_t level;
140 struct rt_prio_queue_item *item;
141
142 RT_ASSERT(que);
143
144 if (prio >= RT_PRIO_QUEUE_PRIO_MAX)
145 return -RT_ERROR;
146
147 item = rt_mp_alloc(&que->pool, timeout);
148 if (item == RT_NULL)
149 return -RT_ENOMEM;
150
151 rt_memcpy(item+1, data, que->item_sz);
152 item->next = RT_NULL;
153
154 level = rt_hw_interrupt_disable();
155
156 _do_push(que, prio, item);
157
158 if (!rt_list_isempty(&(que->suspended_pop_list)))
159 {
160 rt_thread_t thread;
161
162 /* get thread entry */
163 thread = RT_THREAD_LIST_NODE_ENTRY(que->suspended_pop_list.next);
164 /* resume it */
165 rt_thread_resume(thread);
166 rt_hw_interrupt_enable(level);
167
168 /* perform a schedule */
169 rt_schedule();
170
171 return RT_EOK;
172 }
173
174 rt_hw_interrupt_enable(level);
175
176 return RT_EOK;
177 }
178
rt_prio_queue_pop(struct rt_prio_queue * que,void * data,rt_int32_t timeout)179 rt_err_t rt_prio_queue_pop(struct rt_prio_queue *que,
180 void *data,
181 rt_int32_t timeout)
182 {
183 rt_base_t level;
184 struct rt_prio_queue_item *item;
185
186 RT_ASSERT(que);
187 RT_ASSERT(data);
188
189 level = rt_hw_interrupt_disable();
190 for (item = _do_pop(que);
191 item == RT_NULL;
192 item = _do_pop(que))
193 {
194 rt_thread_t thread;
195
196 if (timeout == 0)
197 {
198 rt_hw_interrupt_enable(level);
199 return -RT_ETIMEOUT;
200 }
201
202 RT_DEBUG_NOT_IN_INTERRUPT;
203
204 thread = rt_thread_self();
205 thread->error = RT_EOK;
206 rt_thread_suspend(thread);
207
208 rt_list_insert_before(&(que->suspended_pop_list), &RT_THREAD_LIST_NODE(thread));
209
210 if (timeout > 0)
211 {
212 rt_timer_control(&(thread->thread_timer),
213 RT_TIMER_CTRL_SET_TIME,
214 &timeout);
215 rt_timer_start(&(thread->thread_timer));
216 }
217
218 rt_hw_interrupt_enable(level);
219
220 rt_schedule();
221
222 /* thread is waked up */
223 if (thread->error != RT_EOK)
224 return thread->error;
225 level = rt_hw_interrupt_disable();
226 }
227
228 rt_hw_interrupt_enable(level);
229
230 rt_memcpy(data, item+1, que->item_sz);
231 rt_mp_free(item);
232
233 return RT_EOK;
234 }
235
rt_prio_queue_dump(struct rt_prio_queue * que)236 void rt_prio_queue_dump(struct rt_prio_queue *que)
237 {
238 int level = 0;
239
240 rt_kprintf("bitmap: %08x\n", que->bitmap);
241 for (level = 0; level < RT_PRIO_QUEUE_PRIO_MAX; level++)
242 {
243 struct rt_prio_queue_item *item;
244
245 rt_kprintf("%2d: ", level);
246 for (item = que->head[level];
247 item;
248 item = item->next)
249 {
250 rt_kprintf("%p, ", item);
251 }
252 rt_kprintf("\n");
253 }
254 }
255
256