FreeRDP
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Modules Pages
StreamPool.c
1
20#include <winpr/config.h>
21
22#include <winpr/crt.h>
23#include <winpr/wlog.h>
24
25#include <winpr/collections.h>
26
27#include "../stream.h"
28#include "../log.h"
29#define TAG WINPR_TAG("utils.streampool")
30
31struct s_StreamPoolEntry
32{
33#if defined(WITH_STREAMPOOL_DEBUG)
34 char** msg;
35 size_t lines;
36#endif
37 wStream* s;
38};
39
40struct s_wStreamPool
41{
42 size_t aSize;
43 size_t aCapacity;
44 struct s_StreamPoolEntry* aArray;
45
46 size_t uSize;
47 size_t uCapacity;
48 struct s_StreamPoolEntry* uArray;
49
51 BOOL synchronized;
52 size_t defaultSize;
53};
54
55static void discard_entry(struct s_StreamPoolEntry* entry, BOOL discardStream)
56{
57 if (!entry)
58 return;
59
60#if defined(WITH_STREAMPOOL_DEBUG)
61 free((void*)entry->msg);
62#endif
63
64 if (discardStream && entry->s)
65 Stream_Free(entry->s, entry->s->isAllocatedStream);
66
67 const struct s_StreamPoolEntry empty = { 0 };
68 *entry = empty;
69}
70
71static struct s_StreamPoolEntry add_entry(wStream* s)
72{
73 struct s_StreamPoolEntry entry = { 0 };
74
75#if defined(WITH_STREAMPOOL_DEBUG)
76 void* stack = winpr_backtrace(20);
77 if (stack)
78 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
79 winpr_backtrace_free(stack);
80#endif
81
82 entry.s = s;
83 return entry;
84}
85
90static INLINE void StreamPool_Lock(wStreamPool* pool)
91{
92 WINPR_ASSERT(pool);
93 if (pool->synchronized)
94 EnterCriticalSection(&pool->lock);
95}
96
101static INLINE void StreamPool_Unlock(wStreamPool* pool)
102{
103 WINPR_ASSERT(pool);
104 if (pool->synchronized)
105 LeaveCriticalSection(&pool->lock);
106}
107
108static BOOL StreamPool_EnsureCapacity(wStreamPool* pool, size_t count, BOOL usedOrAvailable)
109{
110 WINPR_ASSERT(pool);
111
112 size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
113 size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
114 struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
115
116 size_t new_cap = 0;
117 if (*cap == 0)
118 new_cap = *size + count;
119 else if (*size + count > *cap)
120 new_cap = (*size + count + 2) / 2 * 3;
121 else if ((*size + count) < *cap / 3)
122 new_cap = *cap / 2;
123
124 if (new_cap > 0)
125 {
126 struct s_StreamPoolEntry* new_arr = NULL;
127
128 if (*cap < *size + count)
129 *cap += count;
130
131 new_arr =
132 (struct s_StreamPoolEntry*)realloc(*array, sizeof(struct s_StreamPoolEntry) * new_cap);
133 if (!new_arr)
134 return FALSE;
135 *cap = new_cap;
136 *array = new_arr;
137 }
138 return TRUE;
139}
140
145static void StreamPool_ShiftUsed(wStreamPool* pool, size_t index)
146{
147 WINPR_ASSERT(pool);
148
149 const size_t pcount = 1;
150 const size_t off = index + pcount;
151 if (pool->uSize >= off)
152 {
153 for (size_t x = 0; x < pcount; x++)
154 {
155 struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
156 discard_entry(cur, FALSE);
157 }
158 MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
159 (pool->uSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
160 pool->uSize -= pcount;
161 }
162}
163
168static void StreamPool_AddUsed(wStreamPool* pool, wStream* s)
169{
170 StreamPool_EnsureCapacity(pool, 1, TRUE);
171 pool->uArray[pool->uSize] = add_entry(s);
172 pool->uSize++;
173}
174
179static void StreamPool_RemoveUsed(wStreamPool* pool, wStream* s)
180{
181 WINPR_ASSERT(pool);
182 for (size_t index = 0; index < pool->uSize; index++)
183 {
184 struct s_StreamPoolEntry* cur = &pool->uArray[index];
185 if (cur->s == s)
186 {
187 StreamPool_ShiftUsed(pool, index);
188 break;
189 }
190 }
191}
192
193static void StreamPool_ShiftAvailable(wStreamPool* pool, size_t index)
194{
195 WINPR_ASSERT(pool);
196
197 const size_t pcount = 1;
198 const size_t off = index + pcount;
199 if (pool->aSize >= off)
200 {
201 for (size_t x = 0; x < pcount; x++)
202 {
203 struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
204 discard_entry(cur, FALSE);
205 }
206
207 MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
208 (pool->aSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
209 pool->aSize -= pcount;
210 }
211}
212
217wStream* StreamPool_Take(wStreamPool* pool, size_t size)
218{
219 BOOL found = FALSE;
220 size_t foundIndex = 0;
221 wStream* s = NULL;
222
223 StreamPool_Lock(pool);
224
225 if (size == 0)
226 size = pool->defaultSize;
227
228 for (size_t index = 0; index < pool->aSize; index++)
229 {
230 struct s_StreamPoolEntry* cur = &pool->aArray[index];
231 s = cur->s;
232
233 if (Stream_Capacity(s) >= size)
234 {
235 found = TRUE;
236 foundIndex = index;
237 break;
238 }
239 }
240
241 if (!found)
242 {
243 s = Stream_New(NULL, size);
244 if (!s)
245 goto out_fail;
246 }
247 else if (s)
248 {
249 Stream_SetPosition(s, 0);
250 Stream_SetLength(s, Stream_Capacity(s));
251 StreamPool_ShiftAvailable(pool, foundIndex);
252 }
253
254 if (s)
255 {
256 s->pool = pool;
257 s->count = 1;
258 StreamPool_AddUsed(pool, s);
259 }
260
261out_fail:
262 StreamPool_Unlock(pool);
263
264 return s;
265}
266
271static void StreamPool_Remove(wStreamPool* pool, wStream* s)
272{
273 StreamPool_EnsureCapacity(pool, 1, FALSE);
274 Stream_EnsureValidity(s);
275 for (size_t x = 0; x < pool->aSize; x++)
276 {
277 wStream* cs = pool->aArray[x].s;
278 if (cs == s)
279 return;
280 }
281 pool->aArray[(pool->aSize)++] = add_entry(s);
282 StreamPool_RemoveUsed(pool, s);
283}
284
285static void StreamPool_ReleaseOrReturn(wStreamPool* pool, wStream* s)
286{
287 StreamPool_Lock(pool);
288 StreamPool_Remove(pool, s);
289 StreamPool_Unlock(pool);
290}
291
292void StreamPool_Return(wStreamPool* pool, wStream* s)
293{
294 WINPR_ASSERT(pool);
295 if (!s)
296 return;
297
298 StreamPool_Lock(pool);
299 StreamPool_Remove(pool, s);
300 StreamPool_Unlock(pool);
301}
302
307void Stream_AddRef(wStream* s)
308{
309 WINPR_ASSERT(s);
310 s->count++;
311}
312
317void Stream_Release(wStream* s)
318{
319 WINPR_ASSERT(s);
320
321 if (s->count > 0)
322 s->count--;
323 if (s->count == 0)
324 {
325 if (s->pool)
326 StreamPool_ReleaseOrReturn(s->pool, s);
327 else
328 Stream_Free(s, TRUE);
329 }
330}
331
336wStream* StreamPool_Find(wStreamPool* pool, const BYTE* ptr)
337{
338 wStream* s = NULL;
339
340 StreamPool_Lock(pool);
341
342 for (size_t index = 0; index < pool->uSize; index++)
343 {
344 struct s_StreamPoolEntry* cur = &pool->uArray[index];
345
346 if ((ptr >= Stream_Buffer(cur->s)) &&
347 (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
348 {
349 s = cur->s;
350 break;
351 }
352 }
353
354 StreamPool_Unlock(pool);
355
356 return s;
357}
358
363void StreamPool_Clear(wStreamPool* pool)
364{
365 StreamPool_Lock(pool);
366
367 for (size_t x = 0; x < pool->aSize; x++)
368 {
369 struct s_StreamPoolEntry* cur = &pool->aArray[x];
370 discard_entry(cur, TRUE);
371 }
372 pool->aSize = 0;
373
374 if (pool->uSize > 0)
375 {
376 WLog_WARN(TAG, "Clearing StreamPool, but there are %" PRIuz " streams currently in use",
377 pool->uSize);
378 for (size_t x = 0; x < pool->uSize; x++)
379 {
380 struct s_StreamPoolEntry* cur = &pool->uArray[x];
381 discard_entry(cur, TRUE);
382 }
383 pool->uSize = 0;
384 }
385
386 StreamPool_Unlock(pool);
387}
388
389size_t StreamPool_UsedCount(wStreamPool* pool)
390{
391 StreamPool_Lock(pool);
392 size_t usize = pool->uSize;
393 StreamPool_Unlock(pool);
394 return usize;
395}
396
401wStreamPool* StreamPool_New(BOOL synchronized, size_t defaultSize)
402{
403 wStreamPool* pool = NULL;
404
405 pool = (wStreamPool*)calloc(1, sizeof(wStreamPool));
406
407 if (pool)
408 {
409 pool->synchronized = synchronized;
410 pool->defaultSize = defaultSize;
411
412 if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
413 goto fail;
414 if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
415 goto fail;
416
417 InitializeCriticalSectionAndSpinCount(&pool->lock, 4000);
418 }
419
420 return pool;
421fail:
422 WINPR_PRAGMA_DIAG_PUSH
423 WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
424 StreamPool_Free(pool);
425 WINPR_PRAGMA_DIAG_POP
426 return NULL;
427}
428
429void StreamPool_Free(wStreamPool* pool)
430{
431 if (pool)
432 {
433 StreamPool_Clear(pool);
434
435 DeleteCriticalSection(&pool->lock);
436
437 free(pool->aArray);
438 free(pool->uArray);
439
440 free(pool);
441 }
442}
443
444char* StreamPool_GetStatistics(wStreamPool* pool, char* buffer, size_t size)
445{
446 WINPR_ASSERT(pool);
447
448 if (!buffer || (size < 1))
449 return NULL;
450
451 size_t used = 0;
452 int offset = _snprintf(buffer, size - 1,
453 "aSize =%" PRIuz ", uSize =%" PRIuz ", aCapacity=%" PRIuz
454 ", uCapacity=%" PRIuz,
455 pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
456 if ((offset > 0) && ((size_t)offset < size))
457 used += (size_t)offset;
458
459#if defined(WITH_STREAMPOOL_DEBUG)
460 StreamPool_Lock(pool);
461
462 offset = _snprintf(&buffer[used], size - 1 - used, "\n-- dump used array take locations --\n");
463 if ((offset > 0) && ((size_t)offset < size - used))
464 used += (size_t)offset;
465 for (size_t x = 0; x < pool->uSize; x++)
466 {
467 const struct s_StreamPoolEntry* cur = &pool->uArray[x];
468 WINPR_ASSERT(cur->msg || (cur->lines == 0));
469
470 for (size_t y = 0; y < cur->lines; y++)
471 {
472 offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz " | %" PRIuz "]: %s\n", x,
473 y, cur->msg[y]);
474 if ((offset > 0) && ((size_t)offset < size - used))
475 used += (size_t)offset;
476 }
477 }
478
479 offset = _snprintf(&buffer[used], size - 1 - used, "\n-- statistics called from --\n");
480 if ((offset > 0) && ((size_t)offset < size - used))
481 used += (size_t)offset;
482
483 struct s_StreamPoolEntry entry = { 0 };
484 void* stack = winpr_backtrace(20);
485 if (stack)
486 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
487 winpr_backtrace_free(stack);
488
489 for (size_t x = 0; x < entry.lines; x++)
490 {
491 const char* msg = entry.msg[x];
492 offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz "]: %s\n", x, msg);
493 if ((offset > 0) && ((size_t)offset < size - used))
494 used += (size_t)offset;
495 }
496 free((void*)entry.msg);
497 StreamPool_Unlock(pool);
498#endif
499 buffer[used] = '\0';
500 return buffer;
501}
502
503BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
504{
505 wLog* log = WLog_Get(TAG);
506
507 /* HACK: We disconnected the transport above, now wait without a read or write lock until all
508 * streams in use have been returned to the pool. */
509 while (timeoutMS > 0)
510 {
511 const size_t used = StreamPool_UsedCount(pool);
512 if (used == 0)
513 return TRUE;
514 WLog_Print(log, WLOG_DEBUG, "%" PRIuz " streams still in use, sleeping...", used);
515
516 char buffer[4096] = { 0 };
517 StreamPool_GetStatistics(pool, buffer, sizeof(buffer));
518 WLog_Print(log, WLOG_TRACE, "Pool statistics: %s", buffer);
519
520 UINT32 diff = 10;
521 if (timeoutMS != INFINITE)
522 {
523 diff = timeoutMS > 10 ? 10 : timeoutMS;
524 timeoutMS -= diff;
525 }
526 Sleep(diff);
527 }
528
529 return FALSE;
530}