FreeRDP
StreamPool.c
1 
20 #include <winpr/config.h>
21 
22 #include <winpr/crt.h>
23 #include <winpr/wlog.h>
24 
25 #include <winpr/collections.h>
26 
27 #include "../stream.h"
28 #include "../log.h"
29 #define TAG WINPR_TAG("utils.streampool")
30 
31 struct s_StreamPoolEntry
32 {
33 #if defined(WITH_STREAMPOOL_DEBUG)
34  char** msg;
35  size_t lines;
36 #endif
37  wStream* s;
38 };
39 
40 struct s_wStreamPool
41 {
42  size_t aSize;
43  size_t aCapacity;
44  struct s_StreamPoolEntry* aArray;
45 
46  size_t uSize;
47  size_t uCapacity;
48  struct s_StreamPoolEntry* uArray;
49 
50  CRITICAL_SECTION lock;
51  BOOL synchronized;
52  size_t defaultSize;
53 };
54 
55 static void discard_entry(struct s_StreamPoolEntry* entry, BOOL discardStream)
56 {
57  if (!entry)
58  return;
59 
60 #if defined(WITH_STREAMPOOL_DEBUG)
61  free((void*)entry->msg);
62 #endif
63 
64  if (discardStream && entry->s)
65  Stream_Free(entry->s, entry->s->isAllocatedStream);
66 
67  const struct s_StreamPoolEntry empty = { 0 };
68  *entry = empty;
69 }
70 
71 static struct s_StreamPoolEntry add_entry(wStream* s)
72 {
73  struct s_StreamPoolEntry entry = { 0 };
74 
75 #if defined(WITH_STREAMPOOL_DEBUG)
76  void* stack = winpr_backtrace(20);
77  if (stack)
78  entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
79  winpr_backtrace_free(stack);
80 #endif
81 
82  entry.s = s;
83  return entry;
84 }
85 
90 static INLINE void StreamPool_Lock(wStreamPool* pool)
91 {
92  WINPR_ASSERT(pool);
93  if (pool->synchronized)
94  EnterCriticalSection(&pool->lock);
95 }
96 
101 static INLINE void StreamPool_Unlock(wStreamPool* pool)
102 {
103  WINPR_ASSERT(pool);
104  if (pool->synchronized)
105  LeaveCriticalSection(&pool->lock);
106 }
107 
108 static BOOL StreamPool_EnsureCapacity(wStreamPool* pool, size_t count, BOOL usedOrAvailable)
109 {
110  WINPR_ASSERT(pool);
111 
112  size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
113  size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
114  struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
115 
116  size_t new_cap = 0;
117  if (*cap == 0)
118  new_cap = *size + count;
119  else if (*size + count > *cap)
120  new_cap = (*size + count + 2) / 2 * 3;
121  else if ((*size + count) < *cap / 3)
122  new_cap = *cap / 2;
123 
124  if (new_cap > 0)
125  {
126  struct s_StreamPoolEntry* new_arr = NULL;
127 
128  if (*cap < *size + count)
129  *cap += count;
130 
131  new_arr =
132  (struct s_StreamPoolEntry*)realloc(*array, sizeof(struct s_StreamPoolEntry) * new_cap);
133  if (!new_arr)
134  return FALSE;
135  *cap = new_cap;
136  *array = new_arr;
137  }
138  return TRUE;
139 }
140 
145 static void StreamPool_ShiftUsed(wStreamPool* pool, size_t index)
146 {
147  WINPR_ASSERT(pool);
148 
149  const size_t pcount = 1;
150  const size_t off = index + pcount;
151  if (pool->uSize >= off)
152  {
153  for (size_t x = 0; x < pcount; x++)
154  {
155  struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
156  discard_entry(cur, FALSE);
157  }
158  MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
159  (pool->uSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
160  pool->uSize -= pcount;
161  }
162 }
163 
168 static void StreamPool_AddUsed(wStreamPool* pool, wStream* s)
169 {
170  StreamPool_EnsureCapacity(pool, 1, TRUE);
171  pool->uArray[pool->uSize] = add_entry(s);
172  pool->uSize++;
173 }
174 
179 static void StreamPool_RemoveUsed(wStreamPool* pool, wStream* s)
180 {
181  WINPR_ASSERT(pool);
182  for (size_t index = 0; index < pool->uSize; index++)
183  {
184  struct s_StreamPoolEntry* cur = &pool->uArray[index];
185  if (cur->s == s)
186  {
187  StreamPool_ShiftUsed(pool, index);
188  break;
189  }
190  }
191 }
192 
193 static void StreamPool_ShiftAvailable(wStreamPool* pool, size_t index)
194 {
195  WINPR_ASSERT(pool);
196 
197  const size_t pcount = 1;
198  const size_t off = index + pcount;
199  if (pool->aSize >= off)
200  {
201  for (size_t x = 0; x < pcount; x++)
202  {
203  struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
204  discard_entry(cur, FALSE);
205  }
206 
207  MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
208  (pool->aSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
209  pool->aSize -= pcount;
210  }
211 }
212 
217 wStream* StreamPool_Take(wStreamPool* pool, size_t size)
218 {
219  BOOL found = FALSE;
220  size_t foundIndex = 0;
221  wStream* s = NULL;
222 
223  StreamPool_Lock(pool);
224 
225  if (size == 0)
226  size = pool->defaultSize;
227 
228  for (size_t index = 0; index < pool->aSize; index++)
229  {
230  struct s_StreamPoolEntry* cur = &pool->aArray[index];
231  s = cur->s;
232 
233  if (Stream_Capacity(s) >= size)
234  {
235  found = TRUE;
236  foundIndex = index;
237  break;
238  }
239  }
240 
241  if (!found)
242  {
243  s = Stream_New(NULL, size);
244  if (!s)
245  goto out_fail;
246  }
247  else if (s)
248  {
249  Stream_SetPosition(s, 0);
250  Stream_SetLength(s, Stream_Capacity(s));
251  StreamPool_ShiftAvailable(pool, foundIndex);
252  }
253 
254  if (s)
255  {
256  s->pool = pool;
257  s->count = 1;
258  StreamPool_AddUsed(pool, s);
259  }
260 
261 out_fail:
262  StreamPool_Unlock(pool);
263 
264  return s;
265 }
266 
271 static void StreamPool_Remove(wStreamPool* pool, wStream* s)
272 {
273  StreamPool_EnsureCapacity(pool, 1, FALSE);
274  Stream_EnsureValidity(s);
275  for (size_t x = 0; x < pool->aSize; x++)
276  {
277  wStream* cs = pool->aArray[x].s;
278  if (cs == s)
279  return;
280  }
281  pool->aArray[(pool->aSize)++] = add_entry(s);
282  StreamPool_RemoveUsed(pool, s);
283 }
284 
285 static void StreamPool_ReleaseOrReturn(wStreamPool* pool, wStream* s)
286 {
287  StreamPool_Lock(pool);
288  if (s->count > 0)
289  s->count--;
290  if (s->count == 0)
291  StreamPool_Remove(pool, s);
292  StreamPool_Unlock(pool);
293 }
294 
295 void StreamPool_Return(wStreamPool* pool, wStream* s)
296 {
297  WINPR_ASSERT(pool);
298  if (!s)
299  return;
300 
301  StreamPool_Lock(pool);
302  StreamPool_Remove(pool, s);
303  StreamPool_Unlock(pool);
304 }
305 
310 void Stream_AddRef(wStream* s)
311 {
312  WINPR_ASSERT(s);
313  if (s->pool)
314  {
315  StreamPool_Lock(s->pool);
316  s->count++;
317  StreamPool_Unlock(s->pool);
318  }
319 }
320 
325 void Stream_Release(wStream* s)
326 {
327  WINPR_ASSERT(s);
328  if (s->pool)
329  StreamPool_ReleaseOrReturn(s->pool, s);
330 }
331 
336 wStream* StreamPool_Find(wStreamPool* pool, const BYTE* ptr)
337 {
338  wStream* s = NULL;
339 
340  StreamPool_Lock(pool);
341 
342  for (size_t index = 0; index < pool->uSize; index++)
343  {
344  struct s_StreamPoolEntry* cur = &pool->uArray[index];
345 
346  if ((ptr >= Stream_Buffer(cur->s)) &&
347  (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
348  {
349  s = cur->s;
350  break;
351  }
352  }
353 
354  StreamPool_Unlock(pool);
355 
356  return s;
357 }
358 
363 void StreamPool_Clear(wStreamPool* pool)
364 {
365  StreamPool_Lock(pool);
366 
367  for (size_t x = 0; x < pool->aSize; x++)
368  {
369  struct s_StreamPoolEntry* cur = &pool->aArray[x];
370  discard_entry(cur, TRUE);
371  }
372 
373  if (pool->uSize > 0)
374  {
375  WLog_WARN(TAG, "Clearing StreamPool, but there are %" PRIuz " streams currently in use",
376  pool->uSize);
377  for (size_t x = 0; x < pool->uSize; x++)
378  {
379  struct s_StreamPoolEntry* cur = &pool->uArray[x];
380  discard_entry(cur, TRUE);
381  }
382  }
383 
384  StreamPool_Unlock(pool);
385 }
386 
387 size_t StreamPool_UsedCount(wStreamPool* pool)
388 {
389  StreamPool_Lock(pool);
390  size_t usize = pool->uSize;
391  StreamPool_Unlock(pool);
392  return usize;
393 }
394 
399 wStreamPool* StreamPool_New(BOOL synchronized, size_t defaultSize)
400 {
401  wStreamPool* pool = NULL;
402 
403  pool = (wStreamPool*)calloc(1, sizeof(wStreamPool));
404 
405  if (pool)
406  {
407  pool->synchronized = synchronized;
408  pool->defaultSize = defaultSize;
409 
410  if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
411  goto fail;
412  if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
413  goto fail;
414 
415  InitializeCriticalSectionAndSpinCount(&pool->lock, 4000);
416  }
417 
418  return pool;
419 fail:
420  WINPR_PRAGMA_DIAG_PUSH
421  WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
422  StreamPool_Free(pool);
423  WINPR_PRAGMA_DIAG_POP
424  return NULL;
425 }
426 
427 void StreamPool_Free(wStreamPool* pool)
428 {
429  if (pool)
430  {
431  StreamPool_Clear(pool);
432 
433  DeleteCriticalSection(&pool->lock);
434 
435  free(pool->aArray);
436  free(pool->uArray);
437 
438  free(pool);
439  }
440 }
441 
442 char* StreamPool_GetStatistics(wStreamPool* pool, char* buffer, size_t size)
443 {
444  WINPR_ASSERT(pool);
445 
446  if (!buffer || (size < 1))
447  return NULL;
448 
449  size_t used = 0;
450  int offset = _snprintf(buffer, size - 1,
451  "aSize =%" PRIuz ", uSize =%" PRIuz ", aCapacity=%" PRIuz
452  ", uCapacity=%" PRIuz,
453  pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
454  if ((offset > 0) && ((size_t)offset < size))
455  used += (size_t)offset;
456 
457 #if defined(WITH_STREAMPOOL_DEBUG)
458  StreamPool_Lock(pool);
459 
460  offset = _snprintf(&buffer[used], size - 1 - used, "\n-- dump used array take locations --\n");
461  if ((offset > 0) && ((size_t)offset < size - used))
462  used += (size_t)offset;
463  for (size_t x = 0; x < pool->uSize; x++)
464  {
465  const struct s_StreamPoolEntry* cur = &pool->uArray[x];
466  WINPR_ASSERT(cur->msg || (cur->lines == 0));
467 
468  for (size_t y = 0; y < cur->lines; y++)
469  {
470  offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz " | %" PRIuz "]: %s\n", x,
471  y, cur->msg[y]);
472  if ((offset > 0) && ((size_t)offset < size - used))
473  used += (size_t)offset;
474  }
475  }
476 
477  offset = _snprintf(&buffer[used], size - 1 - used, "\n-- statistics called from --\n");
478  if ((offset > 0) && ((size_t)offset < size - used))
479  used += (size_t)offset;
480 
481  struct s_StreamPoolEntry entry = { 0 };
482  void* stack = winpr_backtrace(20);
483  if (stack)
484  entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
485  winpr_backtrace_free(stack);
486 
487  for (size_t x = 0; x < entry.lines; x++)
488  {
489  const char* msg = entry.msg[x];
490  offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz "]: %s\n", x, msg);
491  if ((offset > 0) && ((size_t)offset < size - used))
492  used += (size_t)offset;
493  }
494  free((void*)entry.msg);
495  StreamPool_Unlock(pool);
496 #endif
497  buffer[used] = '\0';
498  return buffer;
499 }
500 
501 BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
502 {
503  wLog* log = WLog_Get(TAG);
504 
505  /* HACK: We disconnected the transport above, now wait without a read or write lock until all
506  * streams in use have been returned to the pool. */
507  while (timeoutMS > 0)
508  {
509  const size_t used = StreamPool_UsedCount(pool);
510  if (used == 0)
511  return TRUE;
512  WLog_Print(log, WLOG_DEBUG, "%" PRIuz " streams still in use, sleeping...", used);
513 
514  char buffer[4096] = { 0 };
515  StreamPool_GetStatistics(pool, buffer, sizeof(buffer));
516  WLog_Print(log, WLOG_TRACE, "Pool statistics: %s", buffer);
517 
518  UINT32 diff = 10;
519  if (timeoutMS != INFINITE)
520  {
521  diff = timeoutMS > 10 ? 10 : timeoutMS;
522  timeoutMS -= diff;
523  }
524  Sleep(diff);
525  }
526 
527  return FALSE;
528 }