FreeRDP
Loading...
Searching...
No Matches
MessageQueue.c
1
20#include <winpr/config.h>
21
22#include <winpr/crt.h>
23#include <winpr/sysinfo.h>
24#include <winpr/assert.h>
25
26#include <winpr/collections.h>
27
28struct s_wMessageQueue
29{
30 size_t head;
31 size_t tail;
32 size_t size;
33 size_t capacity;
34 BOOL closed;
35 wMessage* array;
37 HANDLE event;
38
39 wObject object;
40};
41
51wObject* MessageQueue_Object(wMessageQueue* queue)
52{
53 WINPR_ASSERT(queue);
54 return &queue->object;
55}
56
61HANDLE MessageQueue_Event(wMessageQueue* queue)
62{
63 WINPR_ASSERT(queue);
64 return queue->event;
65}
66
71size_t MessageQueue_Size(wMessageQueue* queue)
72{
73 WINPR_ASSERT(queue);
74 EnterCriticalSection(&queue->lock);
75 const size_t ret = queue->size;
76 LeaveCriticalSection(&queue->lock);
77 return ret;
78}
79
80size_t MessageQueue_Capacity(wMessageQueue* queue)
81{
82 WINPR_ASSERT(queue);
83 EnterCriticalSection(&queue->lock);
84 const size_t ret = queue->capacity;
85 LeaveCriticalSection(&queue->lock);
86 return ret;
87}
88
93BOOL MessageQueue_Wait(wMessageQueue* queue)
94{
95 BOOL status = FALSE;
96
97 WINPR_ASSERT(queue);
98 if (WaitForSingleObject(queue->event, INFINITE) == WAIT_OBJECT_0)
99 status = TRUE;
100
101 return status;
102}
103
104static BOOL MessageQueue_EnsureCapacity(wMessageQueue* queue, size_t count)
105{
106 BOOL res = TRUE;
107 const size_t increment = 128;
108 WINPR_ASSERT(queue);
109
110 const size_t required = queue->size + count;
111 // check for overflow
112 if ((required < queue->size) || (required < count) ||
113 (required > (SIZE_MAX - increment) / sizeof(wMessage)))
114 return FALSE;
115
116 if (required > queue->capacity)
117 {
118 const size_t old_capacity = queue->capacity;
119 const size_t new_capacity = required + increment;
120
121 wMessage* new_arr = (wMessage*)realloc(queue->array, sizeof(wMessage) * new_capacity);
122 if (!new_arr)
123 return FALSE;
124 queue->array = new_arr;
125 queue->capacity = new_capacity;
126 ZeroMemory(&(queue->array[old_capacity]), (new_capacity - old_capacity) * sizeof(wMessage));
127
128 /* rearrange wrapped entries:
129 * fill up the newly available space and move tail
130 * back by the amount of elements that have been moved to the newly
131 * allocated space.
132 */
133 if (queue->tail <= queue->head)
134 {
135 size_t tocopy = queue->tail;
136 size_t slots = new_capacity - old_capacity;
137 const size_t batch = (tocopy < slots) ? tocopy : slots;
138 CopyMemory(&(queue->array[old_capacity]), queue->array, batch * sizeof(wMessage));
139
140 /* Tail is decremented. if the whole thing is appended
141 * just move the existing tail by old_capacity */
142 if (tocopy < slots)
143 {
144 ZeroMemory(queue->array, batch * sizeof(wMessage));
145 queue->tail += old_capacity;
146 }
147 else
148 {
149 const size_t remain = queue->tail - batch;
150 const size_t movesize = remain * sizeof(wMessage);
151 res = memmove_s(queue->array, queue->tail * sizeof(wMessage), &queue->array[batch],
152 movesize) >= 0;
153
154 const size_t zerooffset = remain;
155 const size_t zerosize = (queue->tail - remain) * sizeof(wMessage);
156 ZeroMemory(&queue->array[zerooffset], zerosize);
157 queue->tail -= batch;
158 }
159 }
160 }
161
162 return res;
163}
164
165BOOL MessageQueue_Dispatch(wMessageQueue* queue, const wMessage* message)
166{
167 wMessage* dst = nullptr;
168 BOOL ret = FALSE;
169 WINPR_ASSERT(queue);
170
171 if (!message)
172 return FALSE;
173
174 WINPR_ASSERT(queue);
175 EnterCriticalSection(&queue->lock);
176
177 if (queue->closed)
178 goto out;
179
180 if (!MessageQueue_EnsureCapacity(queue, 1))
181 goto out;
182
183 dst = &(queue->array[queue->tail]);
184 *dst = *message;
185 dst->time = GetTickCount64();
186
187 queue->tail = (queue->tail + 1) % queue->capacity;
188 queue->size++;
189
190 if (queue->size > 0)
191 (void)SetEvent(queue->event);
192
193 if (message->id == WMQ_QUIT)
194 queue->closed = TRUE;
195
196 ret = TRUE;
197out:
198 LeaveCriticalSection(&queue->lock);
199 return ret;
200}
201
202BOOL MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam)
203{
204 wMessage message = WINPR_C_ARRAY_INIT;
205
206 message.context = context;
207 message.id = type;
208 message.wParam = wParam;
209 message.lParam = lParam;
210 message.Free = nullptr;
211
212 return MessageQueue_Dispatch(queue, &message);
213}
214
215BOOL MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode)
216{
217 return MessageQueue_Post(queue, nullptr, WMQ_QUIT, (void*)(size_t)nExitCode, nullptr);
218}
219
220int MessageQueue_Get(wMessageQueue* queue, wMessage* message)
221{
222 int status = -1;
223
224 if (!MessageQueue_Wait(queue))
225 return status;
226
227 EnterCriticalSection(&queue->lock);
228
229 if (queue->size > 0)
230 {
231 CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
232 ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
233 queue->head = (queue->head + 1) % queue->capacity;
234 queue->size--;
235
236 if (queue->size < 1)
237 (void)ResetEvent(queue->event);
238
239 status = (message->id != WMQ_QUIT) ? 1 : 0;
240 }
241
242 LeaveCriticalSection(&queue->lock);
243
244 return status;
245}
246
247int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove)
248{
249 int status = 0;
250
251 WINPR_ASSERT(queue);
252 EnterCriticalSection(&queue->lock);
253
254 if (queue->size > 0)
255 {
256 CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
257 status = 1;
258
259 if (remove)
260 {
261 ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
262 queue->head = (queue->head + 1) % queue->capacity;
263 queue->size--;
264
265 if (queue->size < 1)
266 {
267 if (!ResetEvent(queue->event))
268 status = -1;
269 }
270 }
271 }
272
273 LeaveCriticalSection(&queue->lock);
274
275 return status;
276}
277
282wMessageQueue* MessageQueue_New(const wObject* callback)
283{
284 wMessageQueue* queue = nullptr;
285
286 queue = (wMessageQueue*)calloc(1, sizeof(wMessageQueue));
287 if (!queue)
288 return nullptr;
289
290 if (!InitializeCriticalSectionAndSpinCount(&queue->lock, 4000))
291 goto fail;
292
293 if (!MessageQueue_EnsureCapacity(queue, 32))
294 goto fail;
295
296 queue->event = CreateEvent(nullptr, TRUE, FALSE, nullptr);
297 if (!queue->event)
298 goto fail;
299
300 if (callback)
301 queue->object = *callback;
302
303 return queue;
304
305fail:
306 WINPR_PRAGMA_DIAG_PUSH
307 WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
308 MessageQueue_Free(queue);
309 WINPR_PRAGMA_DIAG_POP
310 return nullptr;
311}
312
313void MessageQueue_Free(wMessageQueue* queue)
314{
315 if (!queue)
316 return;
317
318 if (queue->event)
319 MessageQueue_Clear(queue);
320
321 (void)CloseHandle(queue->event);
322 DeleteCriticalSection(&queue->lock);
323
324 free(queue->array);
325 free(queue);
326}
327
328int MessageQueue_Clear(wMessageQueue* queue)
329{
330 int status = 0;
331
332 WINPR_ASSERT(queue);
333 WINPR_ASSERT(queue->event);
334
335 EnterCriticalSection(&queue->lock);
336
337 while (queue->size > 0)
338 {
339 wMessage* msg = &(queue->array[queue->head]);
340
341 /* Free resources of message. */
342 if (queue->object.fnObjectUninit)
343 queue->object.fnObjectUninit(msg);
344 if (queue->object.fnObjectFree)
345 queue->object.fnObjectFree(msg);
346
347 ZeroMemory(msg, sizeof(wMessage));
348
349 queue->head = (queue->head + 1) % queue->capacity;
350 queue->size--;
351 }
352 if (!ResetEvent(queue->event))
353 status = -1;
354 queue->closed = FALSE;
355
356 LeaveCriticalSection(&queue->lock);
357
358 return status;
359}
This struct contains function pointer to initialize/free objects.
Definition collections.h:52