FreeRDP
MessageQueue.c
1 
20 #include <winpr/config.h>
21 
22 #include <winpr/crt.h>
23 #include <winpr/sysinfo.h>
24 #include <winpr/assert.h>
25 
26 #include <winpr/collections.h>
27 
28 struct s_wMessageQueue
29 {
30  size_t head;
31  size_t tail;
32  size_t size;
33  size_t capacity;
34  BOOL closed;
35  wMessage* array;
36  CRITICAL_SECTION lock;
37  HANDLE event;
38 
39  wObject object;
40 };
41 
51 wObject* MessageQueue_Object(wMessageQueue* queue)
52 {
53  WINPR_ASSERT(queue);
54  return &queue->object;
55 }
56 
61 HANDLE MessageQueue_Event(wMessageQueue* queue)
62 {
63  WINPR_ASSERT(queue);
64  return queue->event;
65 }
66 
71 size_t MessageQueue_Size(wMessageQueue* queue)
72 {
73  WINPR_ASSERT(queue);
74  EnterCriticalSection(&queue->lock);
75  const size_t ret = queue->size;
76  LeaveCriticalSection(&queue->lock);
77  return ret;
78 }
79 
84 BOOL MessageQueue_Wait(wMessageQueue* queue)
85 {
86  BOOL status = FALSE;
87 
88  WINPR_ASSERT(queue);
89  if (WaitForSingleObject(queue->event, INFINITE) == WAIT_OBJECT_0)
90  status = TRUE;
91 
92  return status;
93 }
94 
95 static BOOL MessageQueue_EnsureCapacity(wMessageQueue* queue, size_t count)
96 {
97  WINPR_ASSERT(queue);
98 
99  if (queue->size + count >= queue->capacity)
100  {
101  wMessage* new_arr = NULL;
102  size_t old_capacity = queue->capacity;
103  size_t new_capacity = queue->capacity * 2;
104 
105  if (new_capacity < queue->size + count)
106  new_capacity = queue->size + count;
107 
108  new_arr = (wMessage*)realloc(queue->array, sizeof(wMessage) * new_capacity);
109  if (!new_arr)
110  return FALSE;
111  queue->array = new_arr;
112  queue->capacity = new_capacity;
113  ZeroMemory(&(queue->array[old_capacity]), (new_capacity - old_capacity) * sizeof(wMessage));
114 
115  /* rearrange wrapped entries */
116  if (queue->tail <= queue->head)
117  {
118  CopyMemory(&(queue->array[old_capacity]), queue->array, queue->tail * sizeof(wMessage));
119  queue->tail += old_capacity;
120  }
121  }
122 
123  return TRUE;
124 }
125 
126 BOOL MessageQueue_Dispatch(wMessageQueue* queue, const wMessage* message)
127 {
128  wMessage* dst = NULL;
129  BOOL ret = FALSE;
130  WINPR_ASSERT(queue);
131 
132  if (!message)
133  return FALSE;
134 
135  WINPR_ASSERT(queue);
136  EnterCriticalSection(&queue->lock);
137 
138  if (queue->closed)
139  goto out;
140 
141  if (!MessageQueue_EnsureCapacity(queue, 1))
142  goto out;
143 
144  dst = &(queue->array[queue->tail]);
145  *dst = *message;
146  dst->time = GetTickCount64();
147 
148  queue->tail = (queue->tail + 1) % queue->capacity;
149  queue->size++;
150 
151  if (queue->size > 0)
152  (void)SetEvent(queue->event);
153 
154  if (message->id == WMQ_QUIT)
155  queue->closed = TRUE;
156 
157  ret = TRUE;
158 out:
159  LeaveCriticalSection(&queue->lock);
160  return ret;
161 }
162 
163 BOOL MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam)
164 {
165  wMessage message = { 0 };
166 
167  message.context = context;
168  message.id = type;
169  message.wParam = wParam;
170  message.lParam = lParam;
171  message.Free = NULL;
172 
173  return MessageQueue_Dispatch(queue, &message);
174 }
175 
176 BOOL MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode)
177 {
178  return MessageQueue_Post(queue, NULL, WMQ_QUIT, (void*)(size_t)nExitCode, NULL);
179 }
180 
181 int MessageQueue_Get(wMessageQueue* queue, wMessage* message)
182 {
183  int status = -1;
184 
185  if (!MessageQueue_Wait(queue))
186  return status;
187 
188  EnterCriticalSection(&queue->lock);
189 
190  if (queue->size > 0)
191  {
192  CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
193  ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
194  queue->head = (queue->head + 1) % queue->capacity;
195  queue->size--;
196 
197  if (queue->size < 1)
198  (void)ResetEvent(queue->event);
199 
200  status = (message->id != WMQ_QUIT) ? 1 : 0;
201  }
202 
203  LeaveCriticalSection(&queue->lock);
204 
205  return status;
206 }
207 
208 int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove)
209 {
210  int status = 0;
211 
212  WINPR_ASSERT(queue);
213  EnterCriticalSection(&queue->lock);
214 
215  if (queue->size > 0)
216  {
217  CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
218  status = 1;
219 
220  if (remove)
221  {
222  ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
223  queue->head = (queue->head + 1) % queue->capacity;
224  queue->size--;
225 
226  if (queue->size < 1)
227  (void)ResetEvent(queue->event);
228  }
229  }
230 
231  LeaveCriticalSection(&queue->lock);
232 
233  return status;
234 }
235 
240 wMessageQueue* MessageQueue_New(const wObject* callback)
241 {
242  wMessageQueue* queue = NULL;
243 
244  queue = (wMessageQueue*)calloc(1, sizeof(wMessageQueue));
245  if (!queue)
246  return NULL;
247 
248  if (!InitializeCriticalSectionAndSpinCount(&queue->lock, 4000))
249  goto fail;
250 
251  if (!MessageQueue_EnsureCapacity(queue, 32))
252  goto fail;
253 
254  queue->event = CreateEvent(NULL, TRUE, FALSE, NULL);
255  if (!queue->event)
256  goto fail;
257 
258  if (callback)
259  queue->object = *callback;
260 
261  return queue;
262 
263 fail:
264  WINPR_PRAGMA_DIAG_PUSH
265  WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
266  MessageQueue_Free(queue);
267  WINPR_PRAGMA_DIAG_POP
268  return NULL;
269 }
270 
271 void MessageQueue_Free(wMessageQueue* queue)
272 {
273  if (!queue)
274  return;
275 
276  if (queue->event)
277  MessageQueue_Clear(queue);
278 
279  (void)CloseHandle(queue->event);
280  DeleteCriticalSection(&queue->lock);
281 
282  free(queue->array);
283  free(queue);
284 }
285 
286 int MessageQueue_Clear(wMessageQueue* queue)
287 {
288  int status = 0;
289 
290  WINPR_ASSERT(queue);
291  WINPR_ASSERT(queue->event);
292 
293  EnterCriticalSection(&queue->lock);
294 
295  while (queue->size > 0)
296  {
297  wMessage* msg = &(queue->array[queue->head]);
298 
299  /* Free resources of message. */
300  if (queue->object.fnObjectUninit)
301  queue->object.fnObjectUninit(msg);
302  if (queue->object.fnObjectFree)
303  queue->object.fnObjectFree(msg);
304 
305  ZeroMemory(msg, sizeof(wMessage));
306 
307  queue->head = (queue->head + 1) % queue->capacity;
308  queue->size--;
309  }
310  (void)ResetEvent(queue->event);
311  queue->closed = FALSE;
312 
313  LeaveCriticalSection(&queue->lock);
314 
315  return status;
316 }
This struct contains function pointer to initialize/free objects.
Definition: collections.h:57