FreeRDP
apc.c
1 
19 #ifndef _WIN32
20 
21 #include "apc.h"
22 #include "thread.h"
23 #include "../log.h"
24 #include "../synch/pollset.h"
25 #include <winpr/assert.h>
26 
27 #define TAG WINPR_TAG("apc")
28 
29 BOOL apc_init(APC_QUEUE* apc)
30 {
31  pthread_mutexattr_t attr;
32  BOOL ret = FALSE;
33 
34  WINPR_ASSERT(apc);
35 
36  pthread_mutexattr_init(&attr);
37  if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) != 0)
38  {
39  WLog_ERR(TAG, "failed to initialize mutex attributes to recursive");
40  return FALSE;
41  }
42 
43  memset(apc, 0, sizeof(*apc));
44 
45  if (pthread_mutex_init(&apc->mutex, &attr) != 0)
46  {
47  WLog_ERR(TAG, "failed to initialize main thread APC mutex");
48  goto out;
49  }
50 
51  ret = TRUE;
52 out:
53  pthread_mutexattr_destroy(&attr);
54  return ret;
55 }
56 
57 BOOL apc_uninit(APC_QUEUE* apc)
58 {
59  WINPR_ASSERT(apc);
60  return pthread_mutex_destroy(&apc->mutex) == 0;
61 }
62 
63 void apc_register(WINPR_THREAD* thread, WINPR_APC_ITEM* addItem)
64 {
65  WINPR_APC_ITEM** nextp = NULL;
66  APC_QUEUE* apc = NULL;
67 
68  WINPR_ASSERT(thread);
69  WINPR_ASSERT(addItem);
70 
71  apc = &thread->apc;
72  WINPR_ASSERT(apc);
73 
74  pthread_mutex_lock(&apc->mutex);
75  if (apc->tail)
76  {
77  nextp = &apc->tail->next;
78  addItem->last = apc->tail;
79  }
80  else
81  {
82  nextp = &apc->head;
83  }
84 
85  *nextp = addItem;
86  apc->tail = addItem;
87  apc->length++;
88 
89  addItem->markedForRemove = FALSE;
90  addItem->boundThread = GetCurrentThreadId();
91  addItem->linked = TRUE;
92  pthread_mutex_unlock(&apc->mutex);
93 }
94 
95 static INLINE void apc_item_remove(APC_QUEUE* apc, WINPR_APC_ITEM* item)
96 {
97  WINPR_ASSERT(apc);
98  WINPR_ASSERT(item);
99 
100  if (!item->last)
101  apc->head = item->next;
102  else
103  item->last->next = item->next;
104 
105  if (!item->next)
106  apc->tail = item->last;
107  else
108  item->next->last = item->last;
109 
110  apc->length--;
111 }
112 
113 APC_REMOVE_RESULT apc_remove(WINPR_APC_ITEM* item)
114 {
115  WINPR_THREAD* thread = winpr_GetCurrentThread();
116  APC_QUEUE* apc = NULL;
117  APC_REMOVE_RESULT ret = APC_REMOVE_OK;
118 
119  WINPR_ASSERT(item);
120 
121  if (!item->linked)
122  return APC_REMOVE_OK;
123 
124  if (item->boundThread != GetCurrentThreadId())
125  {
126  WLog_ERR(TAG, "removing an APC entry should be done in the creating thread");
127  return APC_REMOVE_ERROR;
128  }
129 
130  if (!thread)
131  {
132  WLog_ERR(TAG, "unable to retrieve current thread");
133  return APC_REMOVE_ERROR;
134  }
135 
136  apc = &thread->apc;
137  WINPR_ASSERT(apc);
138 
139  pthread_mutex_lock(&apc->mutex);
140  if (apc->treatingCompletions)
141  {
142  item->markedForRemove = TRUE;
143  ret = APC_REMOVE_DELAY_FREE;
144  goto out;
145  }
146 
147  apc_item_remove(apc, item);
148 
149 out:
150  pthread_mutex_unlock(&apc->mutex);
151  item->boundThread = 0xFFFFFFFF;
152  item->linked = FALSE;
153  return ret;
154 }
155 
156 BOOL apc_collectFds(WINPR_THREAD* thread, WINPR_POLL_SET* set, BOOL* haveAutoSignaled)
157 {
158  WINPR_APC_ITEM* item = NULL;
159  BOOL ret = FALSE;
160  APC_QUEUE* apc = NULL;
161 
162  WINPR_ASSERT(thread);
163  WINPR_ASSERT(haveAutoSignaled);
164 
165  apc = &thread->apc;
166  WINPR_ASSERT(apc);
167 
168  *haveAutoSignaled = FALSE;
169  pthread_mutex_lock(&apc->mutex);
170  item = apc->head;
171  for (; item; item = item->next)
172  {
173  if (item->alwaysSignaled)
174  {
175  *haveAutoSignaled = TRUE;
176  }
177  else if (!pollset_add(set, item->pollFd, item->pollMode))
178  goto out;
179  }
180 
181  ret = TRUE;
182 out:
183  pthread_mutex_unlock(&apc->mutex);
184  return ret;
185 }
186 
187 int apc_executeCompletions(WINPR_THREAD* thread, WINPR_POLL_SET* set, size_t startIndex)
188 {
189  APC_QUEUE* apc = NULL;
190  WINPR_APC_ITEM* nextItem = NULL;
191  size_t idx = startIndex;
192  int ret = 0;
193 
194  WINPR_ASSERT(thread);
195 
196  apc = &thread->apc;
197  WINPR_ASSERT(apc);
198 
199  pthread_mutex_lock(&apc->mutex);
200  apc->treatingCompletions = TRUE;
201 
202  /* first pass to compute signaled items */
203  for (WINPR_APC_ITEM* item = apc->head; item; item = item->next)
204  {
205  item->isSignaled = item->alwaysSignaled || pollset_isSignaled(set, idx);
206  if (!item->alwaysSignaled)
207  idx++;
208  }
209 
210  /* second pass: run completions */
211  for (WINPR_APC_ITEM* item = apc->head; item; item = nextItem)
212  {
213  if (item->isSignaled)
214  {
215  if (item->completion && !item->markedForRemove)
216  item->completion(item->completionArgs);
217  ret++;
218  }
219 
220  nextItem = item->next;
221  }
222 
223  /* third pass: to do final cleanup */
224  for (WINPR_APC_ITEM* item = apc->head; item; item = nextItem)
225  {
226  nextItem = item->next;
227 
228  if (item->markedForRemove)
229  {
230  apc_item_remove(apc, item);
231  if (item->markedForFree)
232  free(item);
233  }
234  }
235 
236  apc->treatingCompletions = FALSE;
237  pthread_mutex_unlock(&apc->mutex);
238 
239  return ret;
240 }
241 
242 void apc_cleanupThread(WINPR_THREAD* thread)
243 {
244  WINPR_APC_ITEM* item = NULL;
245  WINPR_APC_ITEM* nextItem = NULL;
246  APC_QUEUE* apc = NULL;
247 
248  WINPR_ASSERT(thread);
249 
250  apc = &thread->apc;
251  WINPR_ASSERT(apc);
252 
253  pthread_mutex_lock(&apc->mutex);
254  item = apc->head;
255  for (; item; item = nextItem)
256  {
257  nextItem = item->next;
258 
259  if (item->type == APC_TYPE_HANDLE_FREE)
260  item->completion(item->completionArgs);
261 
262  item->last = item->next = NULL;
263  item->linked = FALSE;
264  if (item->markedForFree)
265  free(item);
266  }
267 
268  apc->head = apc->tail = NULL;
269  pthread_mutex_unlock(&apc->mutex);
270 }
271 
272 #endif
Definition: apc.h:69