20#include <winpr/config.h>
23#include <winpr/wlog.h>
25#include <winpr/collections.h>
29#define TAG WINPR_TAG("utils.streampool")
31struct s_StreamPoolEntry
33#if defined(WITH_STREAMPOOL_DEBUG)
44 struct s_StreamPoolEntry* aArray;
48 struct s_StreamPoolEntry* uArray;
55static void discard_entry(
struct s_StreamPoolEntry* entry, BOOL discardStream)
60#if defined(WITH_STREAMPOOL_DEBUG)
61 free((
void*)entry->msg);
64 if (discardStream && entry->s)
65 Stream_Free(entry->s, entry->s->isAllocatedStream);
67 const struct s_StreamPoolEntry empty = { 0 };
71static struct s_StreamPoolEntry add_entry(
wStream* s)
73 struct s_StreamPoolEntry entry = { 0 };
75#if defined(WITH_STREAMPOOL_DEBUG)
76 void* stack = winpr_backtrace(20);
78 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
79 winpr_backtrace_free(stack);
90static INLINE
void StreamPool_Lock(wStreamPool* pool)
93 if (pool->synchronized)
94 EnterCriticalSection(&pool->lock);
101static INLINE
void StreamPool_Unlock(wStreamPool* pool)
104 if (pool->synchronized)
105 LeaveCriticalSection(&pool->lock);
108static BOOL StreamPool_EnsureCapacity(wStreamPool* pool,
size_t count, BOOL usedOrAvailable)
112 size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
113 size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
114 struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
118 new_cap = *size + count;
119 else if (*size + count > *cap)
120 new_cap = (*size + count + 2) / 2 * 3;
121 else if ((*size + count) < *cap / 3)
126 struct s_StreamPoolEntry* new_arr = NULL;
128 if (*cap < *size + count)
132 (
struct s_StreamPoolEntry*)realloc(*array,
sizeof(
struct s_StreamPoolEntry) * new_cap);
145static void StreamPool_ShiftUsed(wStreamPool* pool,
size_t index)
149 const size_t pcount = 1;
150 const size_t off = index + pcount;
151 if (pool->uSize >= off)
153 for (
size_t x = 0; x < pcount; x++)
155 struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
156 discard_entry(cur, FALSE);
158 MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
159 (pool->uSize - index - pcount) *
sizeof(
struct s_StreamPoolEntry));
160 pool->uSize -= pcount;
168static void StreamPool_AddUsed(wStreamPool* pool,
wStream* s)
170 StreamPool_EnsureCapacity(pool, 1, TRUE);
171 pool->uArray[pool->uSize] = add_entry(s);
179static void StreamPool_RemoveUsed(wStreamPool* pool,
wStream* s)
182 for (
size_t index = 0; index < pool->uSize; index++)
184 struct s_StreamPoolEntry* cur = &pool->uArray[index];
187 StreamPool_ShiftUsed(pool, index);
193static void StreamPool_ShiftAvailable(wStreamPool* pool,
size_t index)
197 const size_t pcount = 1;
198 const size_t off = index + pcount;
199 if (pool->aSize >= off)
201 for (
size_t x = 0; x < pcount; x++)
203 struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
204 discard_entry(cur, FALSE);
207 MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
208 (pool->aSize - index - pcount) *
sizeof(
struct s_StreamPoolEntry));
209 pool->aSize -= pcount;
217wStream* StreamPool_Take(wStreamPool* pool,
size_t size)
220 size_t foundIndex = 0;
223 StreamPool_Lock(pool);
226 size = pool->defaultSize;
228 for (
size_t index = 0; index < pool->aSize; index++)
230 struct s_StreamPoolEntry* cur = &pool->aArray[index];
233 if (Stream_Capacity(s) >= size)
243 s = Stream_New(NULL, size);
249 Stream_SetPosition(s, 0);
250 Stream_SetLength(s, Stream_Capacity(s));
251 StreamPool_ShiftAvailable(pool, foundIndex);
258 StreamPool_AddUsed(pool, s);
262 StreamPool_Unlock(pool);
271static void StreamPool_Remove(wStreamPool* pool,
wStream* s)
273 StreamPool_EnsureCapacity(pool, 1, FALSE);
274 Stream_EnsureValidity(s);
275 for (
size_t x = 0; x < pool->aSize; x++)
277 wStream* cs = pool->aArray[x].s;
281 pool->aArray[(pool->aSize)++] = add_entry(s);
282 StreamPool_RemoveUsed(pool, s);
285static void StreamPool_ReleaseOrReturn(wStreamPool* pool,
wStream* s)
287 StreamPool_Lock(pool);
288 StreamPool_Remove(pool, s);
289 StreamPool_Unlock(pool);
292void StreamPool_Return(wStreamPool* pool,
wStream* s)
298 StreamPool_Lock(pool);
299 StreamPool_Remove(pool, s);
300 StreamPool_Unlock(pool);
326 StreamPool_ReleaseOrReturn(s->pool, s);
328 Stream_Free(s, TRUE);
336wStream* StreamPool_Find(wStreamPool* pool,
const BYTE* ptr)
340 StreamPool_Lock(pool);
342 for (
size_t index = 0; index < pool->uSize; index++)
344 struct s_StreamPoolEntry* cur = &pool->uArray[index];
346 if ((ptr >= Stream_Buffer(cur->s)) &&
347 (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
354 StreamPool_Unlock(pool);
363void StreamPool_Clear(wStreamPool* pool)
365 StreamPool_Lock(pool);
367 for (
size_t x = 0; x < pool->aSize; x++)
369 struct s_StreamPoolEntry* cur = &pool->aArray[x];
370 discard_entry(cur, TRUE);
376 WLog_WARN(TAG,
"Clearing StreamPool, but there are %" PRIuz
" streams currently in use",
378 for (
size_t x = 0; x < pool->uSize; x++)
380 struct s_StreamPoolEntry* cur = &pool->uArray[x];
381 discard_entry(cur, TRUE);
386 StreamPool_Unlock(pool);
389size_t StreamPool_UsedCount(wStreamPool* pool)
391 StreamPool_Lock(pool);
392 size_t usize = pool->uSize;
393 StreamPool_Unlock(pool);
401wStreamPool* StreamPool_New(BOOL
synchronized,
size_t defaultSize)
403 wStreamPool* pool = NULL;
405 pool = (wStreamPool*)calloc(1,
sizeof(wStreamPool));
409 pool->synchronized =
synchronized;
410 pool->defaultSize = defaultSize;
412 if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
414 if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
417 InitializeCriticalSectionAndSpinCount(&pool->lock, 4000);
422 WINPR_PRAGMA_DIAG_PUSH
423 WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
424 StreamPool_Free(pool);
425 WINPR_PRAGMA_DIAG_POP
429void StreamPool_Free(wStreamPool* pool)
433 StreamPool_Clear(pool);
435 DeleteCriticalSection(&pool->lock);
444char* StreamPool_GetStatistics(wStreamPool* pool,
char* buffer,
size_t size)
448 if (!buffer || (size < 1))
452 int offset = _snprintf(buffer, size - 1,
453 "aSize =%" PRIuz
", uSize =%" PRIuz
", aCapacity=%" PRIuz
454 ", uCapacity=%" PRIuz,
455 pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
456 if ((offset > 0) && ((
size_t)offset < size))
457 used += (size_t)offset;
459#if defined(WITH_STREAMPOOL_DEBUG)
460 StreamPool_Lock(pool);
462 offset = _snprintf(&buffer[used], size - 1 - used,
"\n-- dump used array take locations --\n");
463 if ((offset > 0) && ((
size_t)offset < size - used))
464 used += (size_t)offset;
465 for (
size_t x = 0; x < pool->uSize; x++)
467 const struct s_StreamPoolEntry* cur = &pool->uArray[x];
468 WINPR_ASSERT(cur->msg || (cur->lines == 0));
470 for (
size_t y = 0; y < cur->lines; y++)
472 offset = _snprintf(&buffer[used], size - 1 - used,
"[%" PRIuz
" | %" PRIuz
"]: %s\n", x,
474 if ((offset > 0) && ((
size_t)offset < size - used))
475 used += (size_t)offset;
479 offset = _snprintf(&buffer[used], size - 1 - used,
"\n-- statistics called from --\n");
480 if ((offset > 0) && ((
size_t)offset < size - used))
481 used += (size_t)offset;
483 struct s_StreamPoolEntry entry = { 0 };
484 void* stack = winpr_backtrace(20);
486 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
487 winpr_backtrace_free(stack);
489 for (
size_t x = 0; x < entry.lines; x++)
491 const char* msg = entry.msg[x];
492 offset = _snprintf(&buffer[used], size - 1 - used,
"[%" PRIuz
"]: %s\n", x, msg);
493 if ((offset > 0) && ((
size_t)offset < size - used))
494 used += (size_t)offset;
496 free((
void*)entry.msg);
497 StreamPool_Unlock(pool);
503BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
505 wLog* log = WLog_Get(TAG);
509 while (timeoutMS > 0)
511 const size_t used = StreamPool_UsedCount(pool);
514 WLog_Print(log, WLOG_DEBUG,
"%" PRIuz
" streams still in use, sleeping...", used);
516 char buffer[4096] = { 0 };
517 StreamPool_GetStatistics(pool, buffer,
sizeof(buffer));
518 WLog_Print(log, WLOG_TRACE,
"Pool statistics: %s", buffer);
521 if (timeoutMS != INFINITE)
523 diff = timeoutMS > 10 ? 10 : timeoutMS;