23 #include <winpr/config.h>
25 #include <winpr/winpr.h>
26 #include <winpr/assert.h>
28 #include <winpr/handle.h>
30 #include <winpr/thread.h>
33 #define MIN(x, y) (((x) < (y)) ? (x) : (y))
37 #define MAX(x, y) (((x) > (y)) ? (x) : (y))
82 #include <winpr/crt.h>
83 #include <winpr/platform.h>
86 #ifdef WINPR_HAVE_UNISTD_H
90 #ifdef WINPR_HAVE_SYS_EVENTFD_H
91 #include <sys/eventfd.h>
94 #include <winpr/debug.h>
99 #include <winpr/collections.h>
104 #include "../handle/handle.h"
106 #define TAG WINPR_TAG("thread")
108 static WINPR_THREAD mainThread;
110 #if defined(WITH_THREAD_LIST)
111 static wListDictionary* thread_list = NULL;
114 static BOOL ThreadCloseHandle(HANDLE handle);
115 static void cleanup_handle(
void* obj);
117 static BOOL ThreadIsHandled(HANDLE handle)
119 return WINPR_HANDLE_IS_HANDLED(handle, HANDLE_TYPE_THREAD, FALSE);
122 static int ThreadGetFd(HANDLE handle)
124 WINPR_THREAD* pThread = (WINPR_THREAD*)handle;
126 if (!ThreadIsHandled(handle))
129 return pThread->event.fds[0];
132 #define run_mutex_init(fkt, mux, arg) run_mutex_init_(fkt, #fkt, mux, arg)
133 static BOOL run_mutex_init_(
int (*fkt)(pthread_mutex_t*,
const pthread_mutexattr_t*),
134 const char* name, pthread_mutex_t* mutex,
135 const pthread_mutexattr_t* mutexattr)
142 rc = fkt(mutex, mutexattr);
145 char ebuffer[256] = { 0 };
146 WLog_WARN(TAG,
"[%s] failed with [%s]", name, winpr_strerror(rc, ebuffer,
sizeof(ebuffer)));
151 #define run_mutex_fkt(fkt, mux) run_mutex_fkt_(fkt, #fkt, mux)
152 static BOOL run_mutex_fkt_(
int (*fkt)(pthread_mutex_t* mux),
const char* name,
153 pthread_mutex_t* mutex)
163 char ebuffer[256] = { 0 };
164 WLog_WARN(TAG,
"[%s] failed with [%s]", name, winpr_strerror(rc, ebuffer,
sizeof(ebuffer)));
169 #define run_cond_init(fkt, cond, arg) run_cond_init_(fkt, #fkt, cond, arg)
170 static BOOL run_cond_init_(
int (*fkt)(pthread_cond_t*,
const pthread_condattr_t*),
const char* name,
171 pthread_cond_t* condition,
const pthread_condattr_t* conditionattr)
176 WINPR_ASSERT(condition);
178 rc = fkt(condition, conditionattr);
181 char ebuffer[256] = { 0 };
182 WLog_WARN(TAG,
"[%s] failed with [%s]", name, winpr_strerror(rc, ebuffer,
sizeof(ebuffer)));
187 #define run_cond_fkt(fkt, cond) run_cond_fkt_(fkt, #fkt, cond)
188 static BOOL run_cond_fkt_(
int (*fkt)(pthread_cond_t* mux),
const char* name,
189 pthread_cond_t* condition)
194 WINPR_ASSERT(condition);
199 char ebuffer[256] = { 0 };
200 WLog_WARN(TAG,
"[%s] failed with [%s]", name, winpr_strerror(rc, ebuffer,
sizeof(ebuffer)));
205 static int pthread_mutex_checked_unlock(pthread_mutex_t* mutex)
208 WINPR_ASSERT(pthread_mutex_trylock(mutex) == EBUSY);
209 return pthread_mutex_unlock(mutex);
214 WINPR_ASSERT(bundle);
217 if (!run_mutex_init(pthread_mutex_init, &bundle->mux, NULL))
220 if (!run_cond_init(pthread_cond_init, &bundle->cond, NULL))
229 WINPR_ASSERT(bundle);
231 run_cond_fkt(pthread_cond_destroy, &bundle->cond);
232 run_mutex_fkt(pthread_mutex_destroy, &bundle->mux);
239 WINPR_ASSERT(bundle);
241 if (!run_mutex_fkt(pthread_mutex_lock, &bundle->mux))
244 if (!run_cond_fkt(pthread_cond_signal, &bundle->cond))
246 if (!run_mutex_fkt(pthread_mutex_checked_unlock, &bundle->mux))
253 WINPR_ASSERT(bundle);
254 return run_mutex_fkt(pthread_mutex_lock, &bundle->mux);
259 WINPR_ASSERT(bundle);
260 return run_mutex_fkt(pthread_mutex_checked_unlock, &bundle->mux);
267 WINPR_ASSERT(bundle);
269 WINPR_ASSERT(pthread_mutex_trylock(&bundle->mux) == EBUSY);
273 int r = pthread_cond_wait(&bundle->cond, &bundle->mux);
276 char ebuffer[256] = { 0 };
277 WLog_ERR(TAG,
"failed to wait for %s [%s]", name,
278 winpr_strerror(r, ebuffer,
sizeof(ebuffer)));
281 case ENOTRECOVERABLE:
299 static BOOL signal_thread_ready(WINPR_THREAD* thread)
301 WINPR_ASSERT(thread);
303 return mux_condition_bundle_signal(&thread->isCreated);
306 static BOOL signal_thread_is_running(WINPR_THREAD* thread)
308 WINPR_ASSERT(thread);
310 return mux_condition_bundle_signal(&thread->isRunning);
313 static DWORD ThreadCleanupHandle(HANDLE handle)
315 DWORD status = WAIT_FAILED;
316 WINPR_THREAD* thread = (WINPR_THREAD*)handle;
318 if (!ThreadIsHandled(handle))
321 if (!run_mutex_fkt(pthread_mutex_lock, &thread->mutex))
326 int rc = pthread_join(thread->thread, NULL);
330 char ebuffer[256] = { 0 };
331 WLog_ERR(TAG,
"pthread_join failure: [%d] %s", rc,
332 winpr_strerror(rc, ebuffer,
sizeof(ebuffer)));
336 thread->joined = TRUE;
339 status = WAIT_OBJECT_0;
342 if (!run_mutex_fkt(pthread_mutex_checked_unlock, &thread->mutex))
370 static void dump_thread(WINPR_THREAD* thread)
372 #if defined(WITH_DEBUG_THREADS)
373 void* stack = winpr_backtrace(20);
376 WLog_DBG(TAG,
"Called from:");
377 msg = winpr_backtrace_symbols(stack, &used);
379 for (
size_t i = 0; i < used; i++)
380 WLog_DBG(TAG,
"[%" PRIdz
"]: %s", i, msg[i]);
383 winpr_backtrace_free(stack);
384 WLog_DBG(TAG,
"Thread handle created still not closed!");
385 msg = winpr_backtrace_symbols(thread->create_stack, &used);
387 for (
size_t i = 0; i < used; i++)
388 WLog_DBG(TAG,
"[%" PRIdz
"]: %s", i, msg[i]);
394 WLog_DBG(TAG,
"Thread still running!");
396 else if (!thread->exit_stack)
398 WLog_DBG(TAG,
"Thread suspended.");
402 WLog_DBG(TAG,
"Thread exited at:");
403 msg = winpr_backtrace_symbols(thread->exit_stack, &used);
405 for (
size_t i = 0; i < used; i++)
406 WLog_DBG(TAG,
"[%" PRIdz
"]: %s", i, msg[i]);
411 WINPR_UNUSED(thread);
419 static BOOL set_event(WINPR_THREAD* thread)
421 return winpr_event_set(&thread->event);
424 static BOOL reset_event(WINPR_THREAD* thread)
426 return winpr_event_reset(&thread->event);
429 #if defined(WITH_THREAD_LIST)
430 static BOOL thread_compare(
const void* a,
const void* b)
432 const pthread_t* p1 = a;
433 const pthread_t* p2 = b;
434 BOOL rc = pthread_equal(*p1, *p2);
439 static INIT_ONCE threads_InitOnce = INIT_ONCE_STATIC_INIT;
440 static pthread_t mainThreadId;
441 static DWORD currentThreadTlsIndex = TLS_OUT_OF_INDEXES;
443 static BOOL initializeThreads(
PINIT_ONCE InitOnce, PVOID Parameter, PVOID* Context)
445 if (!apc_init(&mainThread.apc))
447 WLog_ERR(TAG,
"failed to initialize APC");
451 mainThread.common.Type = HANDLE_TYPE_THREAD;
452 mainThreadId = pthread_self();
454 currentThreadTlsIndex = TlsAlloc();
455 if (currentThreadTlsIndex == TLS_OUT_OF_INDEXES)
457 WLog_ERR(TAG,
"Major bug, unable to allocate a TLS value for currentThread");
460 #if defined(WITH_THREAD_LIST)
461 thread_list = ListDictionary_New(TRUE);
465 WLog_ERR(TAG,
"Couldn't create global thread list");
466 goto error_thread_list;
469 thread_list->objectKey.fnObjectEquals = thread_compare;
476 static BOOL signal_and_wait_for_ready(WINPR_THREAD* thread)
480 WINPR_ASSERT(thread);
482 if (!mux_condition_bundle_lock(&thread->isRunning))
485 if (!signal_thread_ready(thread))
488 if (!mux_condition_bundle_wait(&thread->isRunning,
"threadIsRunning"))
491 #if defined(WITH_THREAD_LIST)
492 if (!ListDictionary_Contains(thread_list, &thread->thread))
494 WLog_ERR(TAG,
"Thread not in thread_list, startup failed!");
502 if (!mux_condition_bundle_unlock(&thread->isRunning))
511 static void* thread_launcher(
void* arg)
514 WINPR_THREAD* thread = (WINPR_THREAD*)arg;
515 LPTHREAD_START_ROUTINE fkt = NULL;
519 WLog_ERR(TAG,
"Called with invalid argument %p", arg);
523 if (!TlsSetValue(currentThreadTlsIndex, thread))
525 WLog_ERR(TAG,
"thread %d, unable to set current thread value", pthread_self());
529 if (!(fkt = thread->lpStartAddress))
533 LPTHREAD_START_ROUTINE fkt;
537 WLog_ERR(TAG,
"Thread function argument is %p", cnv.pv);
541 if (!signal_and_wait_for_ready(thread))
544 rc = fkt(thread->lpParameter);
549 apc_cleanupThread(thread);
552 thread->dwExitCode = rc;
556 (void)signal_thread_ready(thread);
558 if (thread->detached || !thread->started)
559 cleanup_handle(thread);
565 static BOOL winpr_StartThread(WINPR_THREAD* thread)
569 pthread_attr_t attr = { 0 };
571 if (!mux_condition_bundle_lock(&thread->isCreated))
575 pthread_attr_init(&attr);
576 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
578 if (thread->dwStackSize > 0)
579 pthread_attr_setstacksize(&attr, thread->dwStackSize);
581 thread->started = TRUE;
584 #if defined(WITH_THREAD_LIST)
585 if (!ListDictionary_Add(thread_list, &thread->thread, thread))
587 WLog_ERR(TAG,
"failed to add the thread to the thread list");
592 if (pthread_create(&thread->thread, &attr, thread_launcher, thread))
595 if (!mux_condition_bundle_wait(&thread->isCreated,
"threadIsCreated"))
599 if (!mux_condition_bundle_unlock(&thread->isCreated))
602 if (!signal_thread_is_running(thread))
604 WLog_ERR(TAG,
"failed to signal the thread was ready");
612 if (!mux_condition_bundle_unlock(&thread->isCreated))
616 pthread_attr_destroy(&attr);
624 BOOL SetThreadPriority(HANDLE hThread,
int nPriority)
629 if (!winpr_Handle_GetInfo(hThread, &Type, &Object) || Object->Type != HANDLE_TYPE_THREAD)
634 const int diff = (max - min);
635 const int normal = min + diff / 2;
636 const int off = MIN(1, diff / 4);
637 int sched_priority = -1;
639 switch (nPriority & ~(THREAD_MODE_BACKGROUND_BEGIN | THREAD_MODE_BACKGROUND_END))
641 case THREAD_PRIORITY_ABOVE_NORMAL:
642 sched_priority = MIN(normal + off, max);
644 case THREAD_PRIORITY_BELOW_NORMAL:
645 sched_priority = MAX(normal - off, min);
647 case THREAD_PRIORITY_HIGHEST:
648 sched_priority = max;
650 case THREAD_PRIORITY_IDLE:
651 sched_priority = min;
653 case THREAD_PRIORITY_LOWEST:
654 sched_priority = min;
656 case THREAD_PRIORITY_TIME_CRITICAL:
657 sched_priority = max;
660 case THREAD_PRIORITY_NORMAL:
661 sched_priority = normal;
664 #if defined(_POSIX_C_SOURCE) && (_POSIX_C_SOURCE >= 200809L) && defined(PTHREAD_SETSCHEDPRIO)
665 WINPR_THREAD* thread = (WINPR_THREAD*)Object;
666 const int rc = pthread_setschedprio(thread->thread, sched_priority);
669 char buffer[256] = { 0 };
670 WLog_ERR(TAG,
"pthread_setschedprio(%d) %s [%d]", sched_priority,
671 winpr_strerror(rc, buffer,
sizeof(buffer)), rc);
675 WLog_WARN(TAG,
"pthread_setschedprio(%d) not implemented, requires POSIX 2008 or later",
681 HANDLE CreateThread(LPSECURITY_ATTRIBUTES lpThreadAttributes,
size_t dwStackSize,
682 LPTHREAD_START_ROUTINE lpStartAddress, LPVOID lpParameter,
683 DWORD dwCreationFlags, LPDWORD lpThreadId)
685 HANDLE handle = NULL;
686 WINPR_THREAD* thread = (WINPR_THREAD*)calloc(1,
sizeof(WINPR_THREAD));
691 thread->dwStackSize = dwStackSize;
692 thread->lpParameter = lpParameter;
693 thread->lpStartAddress = lpStartAddress;
694 thread->lpThreadAttributes = lpThreadAttributes;
695 thread->common.ops = &ops;
696 #if defined(WITH_DEBUG_THREADS)
697 thread->create_stack = winpr_backtrace(20);
701 if (!winpr_event_init(&thread->event))
703 WLog_ERR(TAG,
"failed to create event");
707 if (!run_mutex_init(pthread_mutex_init, &thread->mutex, NULL))
709 WLog_ERR(TAG,
"failed to initialize thread mutex");
713 if (!apc_init(&thread->apc))
715 WLog_ERR(TAG,
"failed to initialize APC");
719 if (!mux_condition_bundle_init(&thread->isCreated))
721 if (!mux_condition_bundle_init(&thread->isRunning))
724 WINPR_HANDLE_SET_TYPE_AND_MODE(thread, HANDLE_TYPE_THREAD, WINPR_FD_READ);
725 handle = (HANDLE)thread;
727 InitOnceExecuteOnce(&threads_InitOnce, initializeThreads, NULL, NULL);
729 if (!(dwCreationFlags & CREATE_SUSPENDED))
731 if (!winpr_StartThread(thread))
736 if (!set_event(thread))
742 cleanup_handle(thread);
746 void cleanup_handle(
void* obj)
748 WINPR_THREAD* thread = (WINPR_THREAD*)obj;
752 if (!apc_uninit(&thread->apc))
753 WLog_ERR(TAG,
"failed to destroy APC");
755 mux_condition_bundle_uninit(&thread->isCreated);
756 mux_condition_bundle_uninit(&thread->isRunning);
757 run_mutex_fkt(pthread_mutex_destroy, &thread->mutex);
759 winpr_event_uninit(&thread->event);
761 #if defined(WITH_THREAD_LIST)
762 ListDictionary_Remove(thread_list, &thread->thread);
764 #if defined(WITH_DEBUG_THREADS)
766 if (thread->create_stack)
767 winpr_backtrace_free(thread->create_stack);
769 if (thread->exit_stack)
770 winpr_backtrace_free(thread->exit_stack);
776 BOOL ThreadCloseHandle(HANDLE handle)
778 WINPR_THREAD* thread = (WINPR_THREAD*)handle;
780 #if defined(WITH_THREAD_LIST)
783 WLog_ERR(TAG,
"Thread list does not exist, check call!");
786 else if (!ListDictionary_Contains(thread_list, &thread->thread))
788 WLog_ERR(TAG,
"Thread list does not contain this thread! check call!");
793 ListDictionary_Lock(thread_list);
797 if ((thread->started) && (WaitForSingleObject(thread, 0) != WAIT_OBJECT_0))
799 WLog_DBG(TAG,
"Thread running, setting to detached state!");
800 thread->detached = TRUE;
801 pthread_detach(thread->thread);
805 cleanup_handle(thread);
808 #if defined(WITH_THREAD_LIST)
809 ListDictionary_Unlock(thread_list);
816 HANDLE CreateRemoteThread(HANDLE hProcess, LPSECURITY_ATTRIBUTES lpThreadAttributes,
817 size_t dwStackSize, LPTHREAD_START_ROUTINE lpStartAddress,
818 LPVOID lpParameter, DWORD dwCreationFlags, LPDWORD lpThreadId)
820 WLog_ERR(TAG,
"not implemented");
821 SetLastError(ERROR_CALL_NOT_IMPLEMENTED);
825 VOID ExitThread(DWORD dwExitCode)
827 #if defined(WITH_THREAD_LIST)
829 pthread_t tid = pthread_self();
833 WLog_ERR(TAG,
"function called without existing thread list!");
834 #if defined(WITH_DEBUG_THREADS)
839 else if (!ListDictionary_Contains(thread_list, &tid))
841 WLog_ERR(TAG,
"function called, but no matching entry in thread list!");
842 #if defined(WITH_DEBUG_THREADS)
849 WINPR_THREAD* thread;
850 ListDictionary_Lock(thread_list);
851 thread = ListDictionary_GetItemValue(thread_list, &tid);
852 WINPR_ASSERT(thread);
853 thread->exited = TRUE;
854 thread->dwExitCode = dwExitCode;
855 #if defined(WITH_DEBUG_THREADS)
856 thread->exit_stack = winpr_backtrace(20);
858 ListDictionary_Unlock(thread_list);
860 rc = thread->dwExitCode;
862 if (thread->detached || !thread->started)
863 cleanup_handle(thread);
865 pthread_exit((
void*)(
size_t)rc);
868 WINPR_UNUSED(dwExitCode);
872 BOOL GetExitCodeThread(HANDLE hThread, LPDWORD lpExitCode)
876 WINPR_THREAD* thread = NULL;
878 if (!winpr_Handle_GetInfo(hThread, &Type, &Object) || Object->Type != HANDLE_TYPE_THREAD)
880 WLog_ERR(TAG,
"hThread is not a thread");
881 SetLastError(ERROR_INVALID_PARAMETER);
885 thread = (WINPR_THREAD*)Object;
886 *lpExitCode = thread->dwExitCode;
890 WINPR_THREAD* winpr_GetCurrentThread(VOID)
892 WINPR_THREAD* ret = NULL;
894 InitOnceExecuteOnce(&threads_InitOnce, initializeThreads, NULL, NULL);
895 if (mainThreadId == pthread_self())
896 return (HANDLE)&mainThread;
898 ret = TlsGetValue(currentThreadTlsIndex);
902 HANDLE _GetCurrentThread(VOID)
904 return (HANDLE)winpr_GetCurrentThread();
907 DWORD GetCurrentThreadId(VOID)
909 pthread_t tid = pthread_self();
912 uintptr_t ptid = WINPR_REINTERPRET_CAST(tid, pthread_t, uintptr_t);
913 return ptid & UINT32_MAX;
920 ULONG_PTR completionArg;
923 static void userAPC(LPVOID arg)
925 UserApcItem* userApc = (UserApcItem*)arg;
927 userApc->completion(userApc->completionArg);
929 userApc->apc.markedForRemove = TRUE;
932 DWORD QueueUserAPC(PAPCFUNC pfnAPC, HANDLE hThread, ULONG_PTR dwData)
936 WINPR_APC_ITEM* apc = NULL;
937 UserApcItem* apcItem = NULL;
942 if (!winpr_Handle_GetInfo(hThread, &Type, &Object) || Object->Type != HANDLE_TYPE_THREAD)
944 WLog_ERR(TAG,
"hThread is not a thread");
945 SetLastError(ERROR_INVALID_PARAMETER);
949 apcItem = calloc(1,
sizeof(*apcItem));
952 SetLastError(ERROR_INVALID_PARAMETER);
957 apc->type = APC_TYPE_USER;
958 apc->markedForFree = TRUE;
959 apc->alwaysSignaled = TRUE;
960 apc->completion = userAPC;
961 apc->completionArgs = apc;
962 apcItem->completion = pfnAPC;
963 apcItem->completionArg = dwData;
964 apc_register(hThread, apc);
968 DWORD ResumeThread(HANDLE hThread)
972 WINPR_THREAD* thread = NULL;
974 if (!winpr_Handle_GetInfo(hThread, &Type, &Object) || Object->Type != HANDLE_TYPE_THREAD)
976 WLog_ERR(TAG,
"hThread is not a thread");
977 SetLastError(ERROR_INVALID_PARAMETER);
981 thread = (WINPR_THREAD*)Object;
983 if (!run_mutex_fkt(pthread_mutex_lock, &thread->mutex))
986 if (!thread->started)
988 if (!winpr_StartThread(thread))
990 run_mutex_fkt(pthread_mutex_checked_unlock, &thread->mutex);
995 WLog_WARN(TAG,
"Thread already started!");
997 if (!run_mutex_fkt(pthread_mutex_checked_unlock, &thread->mutex))
1003 DWORD SuspendThread(HANDLE hThread)
1005 WLog_ERR(TAG,
"not implemented");
1006 SetLastError(ERROR_CALL_NOT_IMPLEMENTED);
1010 BOOL SwitchToThread(VOID)
1016 if (sched_yield() != 0)
1022 BOOL TerminateThread(HANDLE hThread, DWORD dwExitCode)
1026 WINPR_THREAD* thread = NULL;
1028 if (!winpr_Handle_GetInfo(hThread, &Type, &Object) || Object->Type != HANDLE_TYPE_THREAD)
1031 thread = (WINPR_THREAD*)Object;
1032 thread->exited = TRUE;
1033 thread->dwExitCode = dwExitCode;
1035 if (!run_mutex_fkt(pthread_mutex_lock, &thread->mutex))
1039 pthread_cancel(thread->thread);
1041 WLog_ERR(TAG,
"Function not supported on this platform!");
1044 if (!run_mutex_fkt(pthread_mutex_checked_unlock, &thread->mutex))
1051 VOID DumpThreadHandles(
void)
1053 #if defined(WITH_DEBUG_THREADS)
1056 void* stack = winpr_backtrace(20);
1057 WLog_DBG(TAG,
"---------------- Called from ----------------------------");
1058 msg = winpr_backtrace_symbols(stack, &used);
1060 for (
size_t i = 0; i < used; i++)
1062 WLog_DBG(TAG,
"[%" PRIdz
"]: %s", i, msg[i]);
1066 winpr_backtrace_free(stack);
1067 WLog_DBG(TAG,
"---------------- Start Dumping thread handles -----------");
1069 #if defined(WITH_THREAD_LIST)
1072 WLog_DBG(TAG,
"All threads properly shut down and disposed of.");
1076 ULONG_PTR* keys = NULL;
1077 ListDictionary_Lock(thread_list);
1078 int x, count = ListDictionary_GetKeys(thread_list, &keys);
1079 WLog_DBG(TAG,
"Dumping %d elements", count);
1081 for (
size_t x = 0; x < count; x++)
1083 WINPR_THREAD* thread = ListDictionary_GetItemValue(thread_list, (
void*)keys[x]);
1084 WLog_DBG(TAG,
"Thread [%d] handle created still not closed!", x);
1085 msg = winpr_backtrace_symbols(thread->create_stack, &used);
1087 for (
size_t i = 0; i < used; i++)
1089 WLog_DBG(TAG,
"[%" PRIdz
"]: %s", i, msg[i]);
1094 if (thread->started)
1096 WLog_DBG(TAG,
"Thread [%d] still running!", x);
1100 WLog_DBG(TAG,
"Thread [%d] exited at:", x);
1101 msg = winpr_backtrace_symbols(thread->exit_stack, &used);
1103 for (
size_t i = 0; i < used; i++)
1104 WLog_DBG(TAG,
"[%" PRIdz
"]: %s", i, msg[i]);
1111 ListDictionary_Unlock(thread_list);
1115 WLog_DBG(TAG,
"---------------- End Dumping thread handles -------------");