FreeRDP
Loading...
Searching...
No Matches
StreamPool.c
1
20#include <winpr/config.h>
21
22#include <winpr/crt.h>
23#include <winpr/wlog.h>
24
25#include <winpr/collections.h>
26
27#include "../stream.h"
28#include "../log.h"
29#define TAG WINPR_TAG("utils.streampool")
30
31struct s_StreamPoolEntry
32{
33#if defined(WITH_STREAMPOOL_DEBUG)
34 char** msg;
35 size_t lines;
36#endif
37 wStream* s;
38};
39
40struct s_wStreamPool
41{
42 size_t aSize;
43 size_t aCapacity;
44 struct s_StreamPoolEntry* aArray;
45
46 size_t uSize;
47 size_t uCapacity;
48 struct s_StreamPoolEntry* uArray;
49
51 BOOL synchronized;
52 size_t defaultSize;
53};
54
55static void discard_entry(struct s_StreamPoolEntry* entry, BOOL discardStream)
56{
57 if (!entry)
58 return;
59
60#if defined(WITH_STREAMPOOL_DEBUG)
61 free((void*)entry->msg);
62#endif
63
64 if (discardStream && entry->s)
65 Stream_Free(entry->s, entry->s->isAllocatedStream);
66
67 const struct s_StreamPoolEntry empty = WINPR_C_ARRAY_INIT;
68 *entry = empty;
69}
70
71static struct s_StreamPoolEntry add_entry(wStream* s)
72{
73 struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
74
75#if defined(WITH_STREAMPOOL_DEBUG)
76 void* stack = winpr_backtrace(20);
77 if (stack)
78 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
79 winpr_backtrace_free(stack);
80#endif
81
82 entry.s = s;
83 return entry;
84}
85
90static inline void StreamPool_Lock(wStreamPool* pool)
91{
92 WINPR_ASSERT(pool);
93 if (pool->synchronized)
94 EnterCriticalSection(&pool->lock);
95}
96
101static inline void StreamPool_Unlock(wStreamPool* pool)
102{
103 WINPR_ASSERT(pool);
104 if (pool->synchronized)
105 LeaveCriticalSection(&pool->lock);
106}
107
108static BOOL StreamPool_EnsureCapacity(wStreamPool* pool, size_t count, BOOL usedOrAvailable)
109{
110 WINPR_ASSERT(pool);
111
112 size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
113 size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
114 struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
115
116 size_t new_cap = 0;
117 if (*cap == 0)
118 new_cap = *size + count;
119 else if (*size + count > *cap)
120 new_cap = (*size + count + 2) / 2 * 3;
121 else if ((*size + count) < *cap / 3)
122 new_cap = *cap / 2;
123
124 if (new_cap > 0)
125 {
126 struct s_StreamPoolEntry* new_arr = nullptr;
127
128 if (*cap < *size + count)
129 *cap += count;
130
131 new_arr =
132 (struct s_StreamPoolEntry*)realloc(*array, sizeof(struct s_StreamPoolEntry) * new_cap);
133 if (!new_arr)
134 return FALSE;
135 *cap = new_cap;
136 *array = new_arr;
137 }
138 return TRUE;
139}
140
145static void StreamPool_ShiftUsed(wStreamPool* pool, size_t index)
146{
147 WINPR_ASSERT(pool);
148
149 const size_t pcount = 1;
150 const size_t off = index + pcount;
151 if (pool->uSize >= off)
152 {
153 for (size_t x = 0; x < pcount; x++)
154 {
155 struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
156 discard_entry(cur, FALSE);
157 }
158 MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
159 (pool->uSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
160 pool->uSize -= pcount;
161 }
162}
163
168static void StreamPool_AddUsed(wStreamPool* pool, wStream* s)
169{
170 StreamPool_EnsureCapacity(pool, 1, TRUE);
171 pool->uArray[pool->uSize] = add_entry(s);
172 pool->uSize++;
173}
174
179static void StreamPool_RemoveUsed(wStreamPool* pool, wStream* s)
180{
181 WINPR_ASSERT(pool);
182 for (size_t index = 0; index < pool->uSize; index++)
183 {
184 struct s_StreamPoolEntry* cur = &pool->uArray[index];
185 if (cur->s == s)
186 {
187 StreamPool_ShiftUsed(pool, index);
188 break;
189 }
190 }
191}
192
193static void StreamPool_ShiftAvailable(wStreamPool* pool, size_t index)
194{
195 WINPR_ASSERT(pool);
196
197 const size_t pcount = 1;
198 const size_t off = index + pcount;
199 if (pool->aSize >= off)
200 {
201 for (size_t x = 0; x < pcount; x++)
202 {
203 struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
204 discard_entry(cur, FALSE);
205 }
206
207 MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
208 (pool->aSize - index - pcount) * sizeof(struct s_StreamPoolEntry));
209 pool->aSize -= pcount;
210 }
211}
212
217wStream* StreamPool_Take(wStreamPool* pool, size_t size)
218{
219 BOOL found = FALSE;
220 size_t foundIndex = 0;
221 wStream* s = nullptr;
222
223 StreamPool_Lock(pool);
224
225 if (size == 0)
226 size = pool->defaultSize;
227
228 for (size_t index = 0; index < pool->aSize; index++)
229 {
230 struct s_StreamPoolEntry* cur = &pool->aArray[index];
231 s = cur->s;
232
233 if (Stream_Capacity(s) >= size)
234 {
235 found = TRUE;
236 foundIndex = index;
237 break;
238 }
239 }
240
241 if (!found)
242 {
243 s = Stream_New(nullptr, size);
244 if (!s)
245 goto out_fail;
246 }
247 else if (s)
248 {
249 Stream_ResetPosition(s);
250 if (!Stream_SetLength(s, Stream_Capacity(s)))
251 goto out_fail;
252 StreamPool_ShiftAvailable(pool, foundIndex);
253 }
254
255 if (s)
256 {
257 s->pool = pool;
258 s->count = 1;
259 StreamPool_AddUsed(pool, s);
260 }
261
262out_fail:
263 StreamPool_Unlock(pool);
264
265 return s;
266}
267
272static void StreamPool_Remove(wStreamPool* pool, wStream* s)
273{
274 StreamPool_EnsureCapacity(pool, 1, FALSE);
275 Stream_EnsureValidity(s);
276 for (size_t x = 0; x < pool->aSize; x++)
277 {
278 wStream* cs = pool->aArray[x].s;
279 if (cs == s)
280 return;
281 }
282 pool->aArray[(pool->aSize)++] = add_entry(s);
283 StreamPool_RemoveUsed(pool, s);
284}
285
286static void StreamPool_ReleaseOrReturn(wStreamPool* pool, wStream* s)
287{
288 StreamPool_Lock(pool);
289 StreamPool_Remove(pool, s);
290 StreamPool_Unlock(pool);
291}
292
293void StreamPool_Return(wStreamPool* pool, wStream* s)
294{
295 WINPR_ASSERT(pool);
296 if (!s)
297 return;
298
299 StreamPool_Lock(pool);
300 StreamPool_Remove(pool, s);
301 StreamPool_Unlock(pool);
302}
303
308void Stream_AddRef(wStream* s)
309{
310 WINPR_ASSERT(s);
311 s->count++;
312}
313
318void Stream_Release(wStream* s)
319{
320 WINPR_ASSERT(s);
321
322 if (s->count > 0)
323 s->count--;
324 if (s->count == 0)
325 {
326 if (s->pool)
327 StreamPool_ReleaseOrReturn(s->pool, s);
328 else
329 Stream_Free(s, TRUE);
330 }
331}
332
337wStream* StreamPool_Find(wStreamPool* pool, const BYTE* ptr)
338{
339 wStream* s = nullptr;
340
341 StreamPool_Lock(pool);
342
343 for (size_t index = 0; index < pool->uSize; index++)
344 {
345 struct s_StreamPoolEntry* cur = &pool->uArray[index];
346
347 if ((ptr >= Stream_Buffer(cur->s)) &&
348 (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
349 {
350 s = cur->s;
351 break;
352 }
353 }
354
355 StreamPool_Unlock(pool);
356
357 return s;
358}
359
364void StreamPool_Clear(wStreamPool* pool)
365{
366 StreamPool_Lock(pool);
367
368 for (size_t x = 0; x < pool->aSize; x++)
369 {
370 struct s_StreamPoolEntry* cur = &pool->aArray[x];
371 discard_entry(cur, TRUE);
372 }
373 pool->aSize = 0;
374
375 if (pool->uSize > 0)
376 {
377 WLog_WARN(TAG, "Clearing StreamPool, but there are %" PRIuz " streams currently in use",
378 pool->uSize);
379 for (size_t x = 0; x < pool->uSize; x++)
380 {
381 struct s_StreamPoolEntry* cur = &pool->uArray[x];
382 discard_entry(cur, TRUE);
383 }
384 pool->uSize = 0;
385 }
386
387 StreamPool_Unlock(pool);
388}
389
390size_t StreamPool_UsedCount(wStreamPool* pool)
391{
392 StreamPool_Lock(pool);
393 size_t usize = pool->uSize;
394 StreamPool_Unlock(pool);
395 return usize;
396}
397
402wStreamPool* StreamPool_New(BOOL synchronized, size_t defaultSize)
403{
404 wStreamPool* pool = nullptr;
405
406 pool = (wStreamPool*)calloc(1, sizeof(wStreamPool));
407
408 if (pool)
409 {
410 pool->synchronized = synchronized;
411 pool->defaultSize = defaultSize;
412
413 if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
414 goto fail;
415 if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
416 goto fail;
417
418 if (!InitializeCriticalSectionAndSpinCount(&pool->lock, 4000))
419 goto fail;
420 }
421
422 return pool;
423fail:
424 WINPR_PRAGMA_DIAG_PUSH
425 WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
426 StreamPool_Free(pool);
427 WINPR_PRAGMA_DIAG_POP
428 return nullptr;
429}
430
431void StreamPool_Free(wStreamPool* pool)
432{
433 if (pool)
434 {
435 StreamPool_Clear(pool);
436
437 DeleteCriticalSection(&pool->lock);
438
439 free(pool->aArray);
440 free(pool->uArray);
441
442 free(pool);
443 }
444}
445
446char* StreamPool_GetStatistics(wStreamPool* pool, char* buffer, size_t size)
447{
448 WINPR_ASSERT(pool);
449
450 if (!buffer || (size < 1))
451 return nullptr;
452
453 size_t used = 0;
454 int offset = _snprintf(buffer, size - 1,
455 "aSize =%" PRIuz ", uSize =%" PRIuz ", aCapacity=%" PRIuz
456 ", uCapacity=%" PRIuz,
457 pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
458 if ((offset > 0) && ((size_t)offset < size))
459 used += (size_t)offset;
460
461#if defined(WITH_STREAMPOOL_DEBUG)
462 StreamPool_Lock(pool);
463
464 offset = _snprintf(&buffer[used], size - 1 - used, "\n-- dump used array take locations --\n");
465 if ((offset > 0) && ((size_t)offset < size - used))
466 used += (size_t)offset;
467 for (size_t x = 0; x < pool->uSize; x++)
468 {
469 const struct s_StreamPoolEntry* cur = &pool->uArray[x];
470 WINPR_ASSERT(cur->msg || (cur->lines == 0));
471
472 for (size_t y = 0; y < cur->lines; y++)
473 {
474 offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz " | %" PRIuz "]: %s\n", x,
475 y, cur->msg[y]);
476 if ((offset > 0) && ((size_t)offset < size - used))
477 used += (size_t)offset;
478 }
479 }
480
481 offset = _snprintf(&buffer[used], size - 1 - used, "\n-- statistics called from --\n");
482 if ((offset > 0) && ((size_t)offset < size - used))
483 used += (size_t)offset;
484
485 struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
486 void* stack = winpr_backtrace(20);
487 if (stack)
488 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
489 winpr_backtrace_free(stack);
490
491 for (size_t x = 0; x < entry.lines; x++)
492 {
493 const char* msg = entry.msg[x];
494 offset = _snprintf(&buffer[used], size - 1 - used, "[%" PRIuz "]: %s\n", x, msg);
495 if ((offset > 0) && ((size_t)offset < size - used))
496 used += (size_t)offset;
497 }
498 free((void*)entry.msg);
499 StreamPool_Unlock(pool);
500#endif
501 buffer[used] = '\0';
502 return buffer;
503}
504
505BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
506{
507 wLog* log = WLog_Get(TAG);
508
509 /* HACK: We disconnected the transport above, now wait without a read or write lock until all
510 * streams in use have been returned to the pool. */
511 while (timeoutMS > 0)
512 {
513 const size_t used = StreamPool_UsedCount(pool);
514 if (used == 0)
515 return TRUE;
516 WLog_Print(log, WLOG_DEBUG, "%" PRIuz " streams still in use, sleeping...", used);
517
518 char buffer[4096] = WINPR_C_ARRAY_INIT;
519 StreamPool_GetStatistics(pool, buffer, sizeof(buffer));
520 WLog_Print(log, WLOG_TRACE, "Pool statistics: %s", buffer);
521
522 UINT32 diff = 10;
523 if (timeoutMS != INFINITE)
524 {
525 diff = timeoutMS > 10 ? 10 : timeoutMS;
526 timeoutMS -= diff;
527 }
528 Sleep(diff);
529 }
530
531 return FALSE;
532}