FreeRDP
Loading...
Searching...
No Matches
MessageQueue.c
1
20#include <winpr/config.h>
21
22#include <winpr/crt.h>
23#include <winpr/sysinfo.h>
24#include <winpr/assert.h>
25
26#include <winpr/collections.h>
27
28struct s_wMessageQueue
29{
30 size_t head;
31 size_t tail;
32 size_t size;
33 size_t capacity;
34 BOOL closed;
35 wMessage* array;
37 HANDLE event;
38
39 wObject object;
40};
41
51wObject* MessageQueue_Object(wMessageQueue* queue)
52{
53 WINPR_ASSERT(queue);
54 return &queue->object;
55}
56
61HANDLE MessageQueue_Event(wMessageQueue* queue)
62{
63 WINPR_ASSERT(queue);
64 return queue->event;
65}
66
71size_t MessageQueue_Size(wMessageQueue* queue)
72{
73 WINPR_ASSERT(queue);
74 EnterCriticalSection(&queue->lock);
75 const size_t ret = queue->size;
76 LeaveCriticalSection(&queue->lock);
77 return ret;
78}
79
80size_t MessageQueue_Capacity(wMessageQueue* queue)
81{
82 WINPR_ASSERT(queue);
83 EnterCriticalSection(&queue->lock);
84 const size_t ret = queue->capacity;
85 LeaveCriticalSection(&queue->lock);
86 return ret;
87}
88
93BOOL MessageQueue_Wait(wMessageQueue* queue)
94{
95 BOOL status = FALSE;
96
97 WINPR_ASSERT(queue);
98 if (WaitForSingleObject(queue->event, INFINITE) == WAIT_OBJECT_0)
99 status = TRUE;
100
101 return status;
102}
103
104static BOOL MessageQueue_EnsureCapacity(wMessageQueue* queue, size_t count)
105{
106 const size_t increment = 128;
107 WINPR_ASSERT(queue);
108
109 const size_t required = queue->size + count;
110 // check for overflow
111 if ((required < queue->size) || (required < count) ||
112 (required > (SIZE_MAX - increment) / sizeof(wMessage)))
113 return FALSE;
114
115 if (required > queue->capacity)
116 {
117 const size_t old_capacity = queue->capacity;
118 const size_t new_capacity = required + increment;
119
120 wMessage* new_arr = (wMessage*)realloc(queue->array, sizeof(wMessage) * new_capacity);
121 if (!new_arr)
122 return FALSE;
123 queue->array = new_arr;
124 queue->capacity = new_capacity;
125 ZeroMemory(&(queue->array[old_capacity]), (new_capacity - old_capacity) * sizeof(wMessage));
126
127 /* rearrange wrapped entries:
128 * fill up the newly available space and move tail
129 * back by the amount of elements that have been moved to the newly
130 * allocated space.
131 */
132 if (queue->tail <= queue->head)
133 {
134 size_t tocopy = queue->tail;
135 size_t slots = new_capacity - old_capacity;
136 const size_t batch = (tocopy < slots) ? tocopy : slots;
137 CopyMemory(&(queue->array[old_capacity]), queue->array, batch * sizeof(wMessage));
138
139 /* Tail is decremented. if the whole thing is appended
140 * just move the existing tail by old_capacity */
141 if (tocopy < slots)
142 {
143 ZeroMemory(queue->array, batch * sizeof(wMessage));
144 queue->tail += old_capacity;
145 }
146 else
147 {
148 const size_t remain = queue->tail - batch;
149 const size_t movesize = remain * sizeof(wMessage);
150 memmove_s(queue->array, queue->tail * sizeof(wMessage), &queue->array[batch],
151 movesize);
152
153 const size_t zerooffset = remain;
154 const size_t zerosize = (queue->tail - remain) * sizeof(wMessage);
155 ZeroMemory(&queue->array[zerooffset], zerosize);
156 queue->tail -= batch;
157 }
158 }
159 }
160
161 return TRUE;
162}
163
164BOOL MessageQueue_Dispatch(wMessageQueue* queue, const wMessage* message)
165{
166 wMessage* dst = NULL;
167 BOOL ret = FALSE;
168 WINPR_ASSERT(queue);
169
170 if (!message)
171 return FALSE;
172
173 WINPR_ASSERT(queue);
174 EnterCriticalSection(&queue->lock);
175
176 if (queue->closed)
177 goto out;
178
179 if (!MessageQueue_EnsureCapacity(queue, 1))
180 goto out;
181
182 dst = &(queue->array[queue->tail]);
183 *dst = *message;
184 dst->time = GetTickCount64();
185
186 queue->tail = (queue->tail + 1) % queue->capacity;
187 queue->size++;
188
189 if (queue->size > 0)
190 (void)SetEvent(queue->event);
191
192 if (message->id == WMQ_QUIT)
193 queue->closed = TRUE;
194
195 ret = TRUE;
196out:
197 LeaveCriticalSection(&queue->lock);
198 return ret;
199}
200
201BOOL MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam)
202{
203 wMessage message = { 0 };
204
205 message.context = context;
206 message.id = type;
207 message.wParam = wParam;
208 message.lParam = lParam;
209 message.Free = NULL;
210
211 return MessageQueue_Dispatch(queue, &message);
212}
213
214BOOL MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode)
215{
216 return MessageQueue_Post(queue, NULL, WMQ_QUIT, (void*)(size_t)nExitCode, NULL);
217}
218
219int MessageQueue_Get(wMessageQueue* queue, wMessage* message)
220{
221 int status = -1;
222
223 if (!MessageQueue_Wait(queue))
224 return status;
225
226 EnterCriticalSection(&queue->lock);
227
228 if (queue->size > 0)
229 {
230 CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
231 ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
232 queue->head = (queue->head + 1) % queue->capacity;
233 queue->size--;
234
235 if (queue->size < 1)
236 (void)ResetEvent(queue->event);
237
238 status = (message->id != WMQ_QUIT) ? 1 : 0;
239 }
240
241 LeaveCriticalSection(&queue->lock);
242
243 return status;
244}
245
246int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove)
247{
248 int status = 0;
249
250 WINPR_ASSERT(queue);
251 EnterCriticalSection(&queue->lock);
252
253 if (queue->size > 0)
254 {
255 CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
256 status = 1;
257
258 if (remove)
259 {
260 ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
261 queue->head = (queue->head + 1) % queue->capacity;
262 queue->size--;
263
264 if (queue->size < 1)
265 (void)ResetEvent(queue->event);
266 }
267 }
268
269 LeaveCriticalSection(&queue->lock);
270
271 return status;
272}
273
278wMessageQueue* MessageQueue_New(const wObject* callback)
279{
280 wMessageQueue* queue = NULL;
281
282 queue = (wMessageQueue*)calloc(1, sizeof(wMessageQueue));
283 if (!queue)
284 return NULL;
285
286 if (!InitializeCriticalSectionAndSpinCount(&queue->lock, 4000))
287 goto fail;
288
289 if (!MessageQueue_EnsureCapacity(queue, 32))
290 goto fail;
291
292 queue->event = CreateEvent(NULL, TRUE, FALSE, NULL);
293 if (!queue->event)
294 goto fail;
295
296 if (callback)
297 queue->object = *callback;
298
299 return queue;
300
301fail:
302 WINPR_PRAGMA_DIAG_PUSH
303 WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
304 MessageQueue_Free(queue);
305 WINPR_PRAGMA_DIAG_POP
306 return NULL;
307}
308
309void MessageQueue_Free(wMessageQueue* queue)
310{
311 if (!queue)
312 return;
313
314 if (queue->event)
315 MessageQueue_Clear(queue);
316
317 (void)CloseHandle(queue->event);
318 DeleteCriticalSection(&queue->lock);
319
320 free(queue->array);
321 free(queue);
322}
323
324int MessageQueue_Clear(wMessageQueue* queue)
325{
326 int status = 0;
327
328 WINPR_ASSERT(queue);
329 WINPR_ASSERT(queue->event);
330
331 EnterCriticalSection(&queue->lock);
332
333 while (queue->size > 0)
334 {
335 wMessage* msg = &(queue->array[queue->head]);
336
337 /* Free resources of message. */
338 if (queue->object.fnObjectUninit)
339 queue->object.fnObjectUninit(msg);
340 if (queue->object.fnObjectFree)
341 queue->object.fnObjectFree(msg);
342
343 ZeroMemory(msg, sizeof(wMessage));
344
345 queue->head = (queue->head + 1) % queue->capacity;
346 queue->size--;
347 }
348 (void)ResetEvent(queue->event);
349 queue->closed = FALSE;
350
351 LeaveCriticalSection(&queue->lock);
352
353 return status;
354}
This struct contains function pointer to initialize/free objects.
Definition collections.h:52