20 #include <winpr/config.h>
22 #include <winpr/crt.h>
23 #include <winpr/wlog.h>
25 #include <winpr/collections.h>
27 #include "../stream.h"
29 #define TAG WINPR_TAG("utils.streampool")
31 struct s_StreamPoolEntry
33 #if defined(WITH_STREAMPOOL_DEBUG)
44 struct s_StreamPoolEntry* aArray;
48 struct s_StreamPoolEntry* uArray;
55 static void discard_entry(
struct s_StreamPoolEntry* entry, BOOL discardStream)
60 #if defined(WITH_STREAMPOOL_DEBUG)
61 free((
void*)entry->msg);
64 if (discardStream && entry->s)
65 Stream_Free(entry->s, entry->s->isAllocatedStream);
67 const struct s_StreamPoolEntry empty = { 0 };
71 static struct s_StreamPoolEntry add_entry(
wStream* s)
73 struct s_StreamPoolEntry entry = { 0 };
75 #if defined(WITH_STREAMPOOL_DEBUG)
76 void* stack = winpr_backtrace(20);
78 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
79 winpr_backtrace_free(stack);
90 static INLINE
void StreamPool_Lock(wStreamPool* pool)
93 if (pool->synchronized)
94 EnterCriticalSection(&pool->lock);
101 static INLINE
void StreamPool_Unlock(wStreamPool* pool)
104 if (pool->synchronized)
105 LeaveCriticalSection(&pool->lock);
108 static BOOL StreamPool_EnsureCapacity(wStreamPool* pool,
size_t count, BOOL usedOrAvailable)
112 size_t* cap = (usedOrAvailable) ? &pool->uCapacity : &pool->aCapacity;
113 size_t* size = (usedOrAvailable) ? &pool->uSize : &pool->aSize;
114 struct s_StreamPoolEntry** array = (usedOrAvailable) ? &pool->uArray : &pool->aArray;
118 new_cap = *size + count;
119 else if (*size + count > *cap)
120 new_cap = (*size + count + 2) / 2 * 3;
121 else if ((*size + count) < *cap / 3)
126 struct s_StreamPoolEntry* new_arr = NULL;
128 if (*cap < *size + count)
132 (
struct s_StreamPoolEntry*)realloc(*array,
sizeof(
struct s_StreamPoolEntry) * new_cap);
145 static void StreamPool_ShiftUsed(wStreamPool* pool,
size_t index)
149 const size_t pcount = 1;
150 const size_t off = index + pcount;
151 if (pool->uSize >= off)
153 for (
size_t x = 0; x < pcount; x++)
155 struct s_StreamPoolEntry* cur = &pool->uArray[index + x];
156 discard_entry(cur, FALSE);
158 MoveMemory(&pool->uArray[index], &pool->uArray[index + pcount],
159 (pool->uSize - index - pcount) *
sizeof(
struct s_StreamPoolEntry));
160 pool->uSize -= pcount;
168 static void StreamPool_AddUsed(wStreamPool* pool,
wStream* s)
170 StreamPool_EnsureCapacity(pool, 1, TRUE);
171 pool->uArray[pool->uSize] = add_entry(s);
179 static void StreamPool_RemoveUsed(wStreamPool* pool,
wStream* s)
182 for (
size_t index = 0; index < pool->uSize; index++)
184 struct s_StreamPoolEntry* cur = &pool->uArray[index];
187 StreamPool_ShiftUsed(pool, index);
193 static void StreamPool_ShiftAvailable(wStreamPool* pool,
size_t index)
197 const size_t pcount = 1;
198 const size_t off = index + pcount;
199 if (pool->aSize >= off)
201 for (
size_t x = 0; x < pcount; x++)
203 struct s_StreamPoolEntry* cur = &pool->aArray[index + x];
204 discard_entry(cur, FALSE);
207 MoveMemory(&pool->aArray[index], &pool->aArray[index + pcount],
208 (pool->aSize - index - pcount) *
sizeof(
struct s_StreamPoolEntry));
209 pool->aSize -= pcount;
217 wStream* StreamPool_Take(wStreamPool* pool,
size_t size)
220 size_t foundIndex = 0;
223 StreamPool_Lock(pool);
226 size = pool->defaultSize;
228 for (
size_t index = 0; index < pool->aSize; index++)
230 struct s_StreamPoolEntry* cur = &pool->aArray[index];
233 if (Stream_Capacity(s) >= size)
243 s = Stream_New(NULL, size);
249 Stream_SetPosition(s, 0);
250 Stream_SetLength(s, Stream_Capacity(s));
251 StreamPool_ShiftAvailable(pool, foundIndex);
258 StreamPool_AddUsed(pool, s);
262 StreamPool_Unlock(pool);
271 static void StreamPool_Remove(wStreamPool* pool,
wStream* s)
273 StreamPool_EnsureCapacity(pool, 1, FALSE);
274 Stream_EnsureValidity(s);
275 for (
size_t x = 0; x < pool->aSize; x++)
277 wStream* cs = pool->aArray[x].s;
281 pool->aArray[(pool->aSize)++] = add_entry(s);
282 StreamPool_RemoveUsed(pool, s);
285 static void StreamPool_ReleaseOrReturn(wStreamPool* pool,
wStream* s)
287 StreamPool_Lock(pool);
291 StreamPool_Remove(pool, s);
292 StreamPool_Unlock(pool);
295 void StreamPool_Return(wStreamPool* pool,
wStream* s)
301 StreamPool_Lock(pool);
302 StreamPool_Remove(pool, s);
303 StreamPool_Unlock(pool);
315 StreamPool_Lock(s->pool);
317 StreamPool_Unlock(s->pool);
325 void Stream_Release(
wStream* s)
329 StreamPool_ReleaseOrReturn(s->pool, s);
336 wStream* StreamPool_Find(wStreamPool* pool,
const BYTE* ptr)
340 StreamPool_Lock(pool);
342 for (
size_t index = 0; index < pool->uSize; index++)
344 struct s_StreamPoolEntry* cur = &pool->uArray[index];
346 if ((ptr >= Stream_Buffer(cur->s)) &&
347 (ptr < (Stream_Buffer(cur->s) + Stream_Capacity(cur->s))))
354 StreamPool_Unlock(pool);
363 void StreamPool_Clear(wStreamPool* pool)
365 StreamPool_Lock(pool);
367 for (
size_t x = 0; x < pool->aSize; x++)
369 struct s_StreamPoolEntry* cur = &pool->aArray[x];
370 discard_entry(cur, TRUE);
375 WLog_WARN(TAG,
"Clearing StreamPool, but there are %" PRIuz
" streams currently in use",
377 for (
size_t x = 0; x < pool->uSize; x++)
379 struct s_StreamPoolEntry* cur = &pool->uArray[x];
380 discard_entry(cur, TRUE);
384 StreamPool_Unlock(pool);
387 size_t StreamPool_UsedCount(wStreamPool* pool)
389 StreamPool_Lock(pool);
390 size_t usize = pool->uSize;
391 StreamPool_Unlock(pool);
399 wStreamPool* StreamPool_New(BOOL
synchronized,
size_t defaultSize)
401 wStreamPool* pool = NULL;
403 pool = (wStreamPool*)calloc(1,
sizeof(wStreamPool));
407 pool->synchronized =
synchronized;
408 pool->defaultSize = defaultSize;
410 if (!StreamPool_EnsureCapacity(pool, 32, FALSE))
412 if (!StreamPool_EnsureCapacity(pool, 32, TRUE))
415 InitializeCriticalSectionAndSpinCount(&pool->lock, 4000);
420 WINPR_PRAGMA_DIAG_PUSH
421 WINPR_PRAGMA_DIAG_IGNORED_MISMATCHED_DEALLOC
422 StreamPool_Free(pool);
423 WINPR_PRAGMA_DIAG_POP
427 void StreamPool_Free(wStreamPool* pool)
431 StreamPool_Clear(pool);
433 DeleteCriticalSection(&pool->lock);
442 char* StreamPool_GetStatistics(wStreamPool* pool,
char* buffer,
size_t size)
446 if (!buffer || (size < 1))
450 int offset = _snprintf(buffer, size - 1,
451 "aSize =%" PRIuz
", uSize =%" PRIuz
", aCapacity=%" PRIuz
452 ", uCapacity=%" PRIuz,
453 pool->aSize, pool->uSize, pool->aCapacity, pool->uCapacity);
454 if ((offset > 0) && ((
size_t)offset < size))
455 used += (size_t)offset;
457 #if defined(WITH_STREAMPOOL_DEBUG)
458 StreamPool_Lock(pool);
460 offset = _snprintf(&buffer[used], size - 1 - used,
"\n-- dump used array take locations --\n");
461 if ((offset > 0) && ((
size_t)offset < size - used))
462 used += (size_t)offset;
463 for (
size_t x = 0; x < pool->uSize; x++)
465 const struct s_StreamPoolEntry* cur = &pool->uArray[x];
466 WINPR_ASSERT(cur->msg || (cur->lines == 0));
468 for (
size_t y = 0; y < cur->lines; y++)
470 offset = _snprintf(&buffer[used], size - 1 - used,
"[%" PRIuz
" | %" PRIuz
"]: %s\n", x,
472 if ((offset > 0) && ((
size_t)offset < size - used))
473 used += (size_t)offset;
477 offset = _snprintf(&buffer[used], size - 1 - used,
"\n-- statistics called from --\n");
478 if ((offset > 0) && ((
size_t)offset < size - used))
479 used += (size_t)offset;
481 struct s_StreamPoolEntry entry = { 0 };
482 void* stack = winpr_backtrace(20);
484 entry.msg = winpr_backtrace_symbols(stack, &entry.lines);
485 winpr_backtrace_free(stack);
487 for (
size_t x = 0; x < entry.lines; x++)
489 const char* msg = entry.msg[x];
490 offset = _snprintf(&buffer[used], size - 1 - used,
"[%" PRIuz
"]: %s\n", x, msg);
491 if ((offset > 0) && ((
size_t)offset < size - used))
492 used += (size_t)offset;
494 free((
void*)entry.msg);
495 StreamPool_Unlock(pool);
501 BOOL StreamPool_WaitForReturn(wStreamPool* pool, UINT32 timeoutMS)
503 wLog* log = WLog_Get(TAG);
507 while (timeoutMS > 0)
509 const size_t used = StreamPool_UsedCount(pool);
512 WLog_Print(log, WLOG_DEBUG,
"%" PRIuz
" streams still in use, sleeping...", used);
514 char buffer[4096] = { 0 };
515 StreamPool_GetStatistics(pool, buffer,
sizeof(buffer));
516 WLog_Print(log, WLOG_TRACE,
"Pool statistics: %s", buffer);
519 if (timeoutMS != INFINITE)
521 diff = timeoutMS > 10 ? 10 : timeoutMS;