20#include <winpr/config.h>
23#include <winpr/wlog.h>
25#include <winpr/collections.h>
29#define TAG WINPR_TAG("utils.streampool")
31struct s_StreamPoolEntry
33#if defined(WITH_STREAMPOOL_DEBUG)
44 struct s_StreamPoolEntry* aArray;
48 struct s_StreamPoolEntry* uArray;
55static void discard_entry(
struct s_StreamPoolEntry* entry, BOOL discardStream)
60#if defined(WITH_STREAMPOOL_DEBUG)
61 free((
void*)entry->msg);
64 if (discardStream && entry->s)
65 Stream_Free(entry->s, entry->s->isAllocatedStream);
67 const struct s_StreamPoolEntry empty = WINPR_C_ARRAY_INIT;
71static struct s_StreamPoolEntry add_entry(
wStream* s)
73 struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
75#if defined(WITH_STREAMPOOL_DEBUG)
76 void* stack = winpr_backtrace(20);
78 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
79 winpr_backtrace_free(stack);
90static inline void StreamPool_Lock(wStreamPool* pool)
93 if (pool->synchronized)
94 EnterCriticalSection(&pool->lock);
101static inline void StreamPool_Unlock(wStreamPool* pool)
104 if (pool->synchronized)
105 LeaveCriticalSection(&pool->lock);
108static BOOL StreamPool_EnsureCapacity(wStreamPool* pool,
size_t count, BOOL usedOrAvailable)
112 size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
113 size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
114 struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
118 new_cap = *size + count;
119 else if (*size + count > *cap)
120 new_cap = (*size + count + 2) / 2 * 3;
121 else if ((*size + count) < *cap / 3)
126 struct s_StreamPoolEntry* new_arr =
nullptr;
128 if (*cap < *size + count)
132 (
struct s_StreamPoolEntry*)realloc(*array,
sizeof(
struct s_StreamPoolEntry) * new_cap);
145static void StreamPool_ShiftUsed(wStreamPool* pool,
size_t index)
149 const size_t pcount = 1;
150 const size_t off = index + pcount;
151 if (pool->uSize >= off)
153 for (
size_t x = 0; x < pcount; x++)
155 struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
156 discard_entry(cur, FALSE);
158 MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
159 (pool->uSize - index - pcount) *
sizeof(
struct s_StreamPoolEntry));
160 pool->uSize -= pcount;
168static void StreamPool_AddUsed(wStreamPool* pool,
wStream* s)
170 StreamPool_EnsureCapacity(pool, 1, TRUE);
171 pool->uArray[pool->uSize] = add_entry(s);
179static void StreamPool_RemoveUsed(wStreamPool* pool,
wStream* s)
182 for (
size_t index = 0; index < pool->uSize; index++)
184 struct s_StreamPoolEntry* cur = &pool->uArray[index];
187 StreamPool_ShiftUsed(pool, index);
193static void StreamPool_ShiftAvailable(wStreamPool* pool,
size_t index)
197 const size_t pcount = 1;
198 const size_t off = index + pcount;
199 if (pool->aSize >= off)
201 for (
size_t x = 0; x < pcount; x++)
203 struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
204 discard_entry(cur, FALSE);
207 MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
208 (pool->aSize - index - pcount) *
sizeof(
struct s_StreamPoolEntry));
209 pool->aSize -= pcount;
217wStream* StreamPool_Take(wStreamPool* pool,
size_t size)
220 size_t foundIndex = 0;
223 StreamPool_Lock(pool);
226 size = pool->defaultSize;
228 for (
size_t index = 0; index < pool->aSize; index++)
230 struct s_StreamPoolEntry* cur = &pool->aArray[index];
233 if (Stream_Capacity(s) >= size)
243 s = Stream_New(
nullptr, size);
249 Stream_ResetPosition(s);
250 if (!Stream_SetLength(s, Stream_Capacity(s)))
252 StreamPool_ShiftAvailable(pool, foundIndex);
259 StreamPool_AddUsed(pool, s);
263 StreamPool_Unlock(pool);
272static void StreamPool_Remove(wStreamPool* pool,
wStream* s)
274 StreamPool_EnsureCapacity(pool, 1, FALSE);
275 Stream_EnsureValidity(s);
276 for (
size_t x = 0; x < pool->aSize; x++)
278 wStream* cs = pool->aArray[x].s;
282 pool->aArray[(pool->aSize)++] = add_entry(s);
283 StreamPool_RemoveUsed(pool, s);
286static void StreamPool_ReleaseOrReturn(wStreamPool* pool,
wStream* s)
288 StreamPool_Lock(pool);
289 StreamPool_Remove(pool, s);
290 StreamPool_Unlock(pool);
293void StreamPool_Return(wStreamPool* pool,
wStream* s)
299 StreamPool_Lock(pool);
300 StreamPool_Remove(pool, s);
301 StreamPool_Unlock(pool);
327 StreamPool_ReleaseOrReturn(s->pool, s);
329 Stream_Free(s, TRUE);
337wStream* StreamPool_Find(wStreamPool* pool,
const BYTE* ptr)
341 StreamPool_Lock(pool);
343 for (
size_t index = 0; index < pool->uSize; index++)
345 struct s_StreamPoolEntry* cur = &pool->uArray[index];
347 if ((ptr >= Stream_Buffer(cur->s)) &&
348 (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
355 StreamPool_Unlock(pool);
364void StreamPool_Clear(wStreamPool* pool)
366 StreamPool_Lock(pool);
368 for (
size_t x = 0; x < pool->aSize; x++)
370 struct s_StreamPoolEntry* cur = &pool->aArray[x];
371 discard_entry(cur, TRUE);
377 WLog_WARN(TAG,
"Clearing StreamPool, but there are %" PRIuz
" streams currently in use",
379 for (
size_t x = 0; x < pool->uSize; x++)
381 struct s_StreamPoolEntry* cur = &pool->uArray[x];
382 discard_entry(cur, TRUE);
387 StreamPool_Unlock(pool);
390size_t StreamPool_UsedCount(wStreamPool* pool)
392 StreamPool_Lock(pool);
393 size_t usize = pool->uSize;
394 StreamPool_Unlock(pool);
402wStreamPool* StreamPool_New(BOOL
synchronized,
size_t defaultSize)
404 wStreamPool* pool =
nullptr;
406 pool = (wStreamPool*)calloc(1,
sizeof(wStreamPool));
410 pool->synchronized =
synchronized;
411 pool->defaultSize = defaultSize;
413 if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
415 if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
418 if (!InitializeCriticalSectionAndSpinCount(&pool->lock, 4000))
424 WINPR_PRAGMA_DIAG_PUSH
425 WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
426 StreamPool_Free(pool);
427 WINPR_PRAGMA_DIAG_POP
431void StreamPool_Free(wStreamPool* pool)
435 StreamPool_Clear(pool);
437 DeleteCriticalSection(&pool->lock);
446char* StreamPool_GetStatistics(wStreamPool* pool,
char* buffer,
size_t size)
450 if (!buffer || (size < 1))
454 int offset = _snprintf(buffer, size - 1,
455 "aSize =%" PRIuz
", uSize =%" PRIuz
", aCapacity=%" PRIuz
456 ", uCapacity=%" PRIuz,
457 pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
458 if ((offset > 0) && ((
size_t)offset < size))
459 used += (size_t)offset;
461#if defined(WITH_STREAMPOOL_DEBUG)
462 StreamPool_Lock(pool);
464 offset = _snprintf(&buffer[used], size - 1 - used,
"\n-- dump used array take locations --\n");
465 if ((offset > 0) && ((
size_t)offset < size - used))
466 used += (size_t)offset;
467 for (
size_t x = 0; x < pool->uSize; x++)
469 const struct s_StreamPoolEntry* cur = &pool->uArray[x];
470 WINPR_ASSERT(cur->msg || (cur->lines == 0));
472 for (
size_t y = 0; y < cur->lines; y++)
474 offset = _snprintf(&buffer[used], size - 1 - used,
"[%" PRIuz
" | %" PRIuz
"]: %s\n", x,
476 if ((offset > 0) && ((
size_t)offset < size - used))
477 used += (size_t)offset;
481 offset = _snprintf(&buffer[used], size - 1 - used,
"\n-- statistics called from --\n");
482 if ((offset > 0) && ((
size_t)offset < size - used))
483 used += (size_t)offset;
485 struct s_StreamPoolEntry entry = WINPR_C_ARRAY_INIT;
486 void* stack = winpr_backtrace(20);
488 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
489 winpr_backtrace_free(stack);
491 for (
size_t x = 0; x < entry.lines; x++)
493 const char* msg = entry.msg[x];
494 offset = _snprintf(&buffer[used], size - 1 - used,
"[%" PRIuz
"]: %s\n", x, msg);
495 if ((offset > 0) && ((
size_t)offset < size - used))
496 used += (size_t)offset;
498 free((
void*)entry.msg);
499 StreamPool_Unlock(pool);
505BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
507 wLog* log = WLog_Get(TAG);
511 while (timeoutMS > 0)
513 const size_t used = StreamPool_UsedCount(pool);
516 WLog_Print(log, WLOG_DEBUG,
"%" PRIuz
" streams still in use, sleeping...", used);
518 char buffer[4096] = WINPR_C_ARRAY_INIT;
519 StreamPool_GetStatistics(pool, buffer,
sizeof(buffer));
520 WLog_Print(log, WLOG_TRACE,
"Pool statistics: %s", buffer);
523 if (timeoutMS != INFINITE)
525 diff = timeoutMS > 10 ? 10 : timeoutMS;