线程池 处理大量的数据
线程池是可用于执行任务的线程的集合。这是一种管理和重用线程的方法,可以减少创建和销毁线程的开销。
在线程池中,会创建和维护固定数量的线程。当任务提交到线程池时,将分配一个可用线程来执行该任务。一旦任务完成,线程就会返回到池中,并可用于执行另一个任务。
线程池通常用于需要同时执行多个任务的应用程序。它们可以通过减少创建和销毁线程的开销,以及允许并行执行任务来提高性能。ThreadPoolExecutor类用于创建和管理线程池,submit方法用于将任务提交到池中执行。以下是如何使用ThreadPoolExecutor类创建线程池的示例:
1 2 3 4 5 6 7 8 from concurrent.futures import ThreadPoolExecutorwith ThreadPoolExecutor(max_workers=4 ) as executor: future = executor.submit(my_function, arg1, arg2) result = future.result()
在本例中,使用ThreadPoolExecutor类创建了一个包含4个线程的线程池。使用submit方法将任务提交到池中,使用result方法等待任务完成并获得结果。
读写锁是一种同步机制,用于控制对共享资源的访问。读写锁允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。这可以提高并发性能,因为多个线程可以同时读取共享资源,而不会相互干扰。在Python中,可以使用threading模块中的RLock类来实现读写锁。
条件变量是一种同步机制,用于在多个线程之间传递信号。条件变量允许一个或多个线程等待某个条件变为真,然后再继续执行。在Python中,可以使用threading模块中的Condition类来实现条件变量。Condition类包含wait,notify和notifyall等方法,用于等待条件变为真,通知等待线程条件已经满足,以及通知所有等待线程条件已经满足。条件变量通常与锁一起使用,以确保线程安全。在ThreadPoolExecutor类中,可以使用条件变量来控制线程池中任务的执行顺序,例如等待某个任务完成后再执行另一个任务。
状态变量是用于跟踪线程池中任务的状态的变量。在ThreadPoolExecutor类中,有几个状态变量用于跟踪线程池中任务的状态,例如activecount,completed count和shutdown等。activecount变量用于跟踪当前正在执行的任务数,completed count变量用于跟踪已完成的任务数,shutdown变量用于指示线程池是否已关闭。这些状态变量可以帮助你了解线程池中任务的状态,并根据需要采取适当的措施。
读写锁和条件变量是两种不同的同步机制,用于不同的目的。读写锁用于控制对共享资源的访问,以提高并发性能。条件变量用于在多个线程之间传递信号,以控制线程的执行顺序。
读写锁允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。这可以提高并发性能,因为多个线程可以同时读取共享资源,而不会相互干扰。在Python中,可以使用threading模块中的RLock类来实现读写锁。
条件变量允许一个或多个线程等待某个条件变为真,然后再继续执行。在Python中,可以使用threading模块中的Condition类来实现条件变量。条件变量通常与锁一起使用,以确保线程安全。在ThreadPoolExecutor类中,可以使用条件变量来控制线程池中任务的执行顺序,例如等待某个任务完成后再执行另一个任务。
互斥体 互斥体(Mutex)是一种同步机制,用于保护共享资源免受并发访问的影响。在线程池中,互斥体可以用来保护共享资源,例如共享变量或共享数据结构,以确保在任何给定时间只有一个线程可以访问它们。这可以防止多个线程同时修改共享资源,从而避免数据竞争和其他并发问题。因此,在线程池中使用互斥体是非常重要的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import concurrent.futuresimport threadingcounter = 0 lock = threading.Lock() def worker (num ): global counter """模拟一个耗时的任务""" print (f"Worker {num} starting" ) with lock: counter += 1 print (f"Worker {num} finished" ) if __name__ == '__main__' : with concurrent.futures.ThreadPoolExecutor(max_workers=4 ) as executor: for i in range (10 ): executor.submit(worker, i) print (f"Counter value: {counter} " )
为了避免多个线程同时修改counter,我们使用了一个互斥体lock来保护它。在worker函数中,我们首先获取互斥体,然后修改共享变量counter。这样,我们就可以确保在任何给定时间只有一个线程可以访问counter,从而避免数据竞争和其他并发问题。
在计算机科学中,互斥体(Mutex)和锁(Lock)是两个常用的概念,它们都是用于保护共享资源免受并发访问的影响。在实际应用中,这两个概念有时会被混淆使用,但它们并不完全相同。
互斥体是一种同步机制,用于保护共享资源免受并发访问的影响。在多线程编程中,互斥体可以用来保护共享资源,例如共享变量或共享数据结构,以确保在任何给定时间只有一个线程可以访问它们。这可以防止多个线程同时修改共享资源,从而避免数据竞争和其他并发问题。在Python中,可以使用threading模块中的Lock类来实现互斥体的功能。
锁是一种更加通用的同步机制,它可以用于实现互斥体、信号量、条件变量等多种同步原语。在Python中,可以使用threading模块中的Lock类来实现锁的功能。Lock类提供了acquire和release方法,用于获取和释放锁。当一个线程获取了锁之后,其他线程就无法获取锁,直到该线程释放锁为止。因此,锁可以用来保护临界区,以确保在任何给定时间只有一个线程可以访问它们。
在实际应用中,互斥体和锁有时会被混淆使用,因为它们的功能有一定的重叠。在某些情况下,互斥体和锁可以互换使用,但在其他情况下,它们可能会有所不同。例如,在某些操作系统中,互斥体和锁的实现方式可能不同,因此它们的性能和行为也可能会有所不同。
在Python中,Lock类通常被用作互斥体的实现方式。因此,在线程池中使用Lock类来保护共享资源是非常常见的做法。
假唤醒 虚假唤醒(Spurious Wakeup)是指在多线程编程中,一个线程在等待某个条件变量时,即使没有其他线程通知它,它也会被唤醒。这种情况可能会导致程序出现错误或异常行为,因为线程可能会在没有满足条件的情况下被唤醒。一般是由于在设置超时的时候发生。
在Python中,可以使用threading模块中的Condition类来实现条件变量。Condition类提供了wait、notify和notify_all等方法,用于等待条件变量、通知等待线程和通知所有等待线程。在使用Condition类时,需要注意虚假唤醒的问题。为了避免虚假唤醒,可以在wait方法中使用循环来检查条件是否满足,例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import threadingcounter = 0 cond = threading.Condition() def worker (): global counter """模拟一个耗时的任务""" print ("Worker starting" ) with cond: while counter < 10 : cond.wait() counter += 1 print ("Worker finished" ) if __name__ == '__main__' : t = threading.Thread(target=worker) t.start() with cond: counter += 1 cond.notify()
在这个示例中,我们首先定义了一个共享变量counter,它将被多个线程同时访问和修改。然后,我们创建了一个条件变量cond,它将用于等待和通知线程。在worker函数中,我们首先获取条件变量,然后使用循环来检查条件是否满足。如果条件不满足,线程将等待条件变量。在主线程中,我们修改了共享变量counter,然后通知等待线程。在worker函数中,线程被唤醒后,会再次检查条件是否满足,如果条件满足,就会修改共享变量counter。这样,我们就可以避免虚假唤醒的问题,确保线程只有在满足条件的情况下才会被唤醒。
总之,虚假唤醒是多线程编程中常见的问题,需要特别注意。在Python中,可以使用threading模块中的Condition类来实现条件变量,并使用循环来避免虚假唤醒的问题。
在c++中,如果要解决,需要在传入timeout的同时传入一个函数指针,当二者同时满足的时候才能继续向下执行。
nginx线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 #pragma once #include <windows.h> #include <iostream> #include <tchar.h> using namespace std;#define ngx_thread_pool_queue_init(q) \ (q)->first = NULL; \ (q)->last = &(q)->first struct ngx_thread_task_t { ngx_thread_task_t * next; unsigned int id; void * ctx; void (*handler)(void * ctx); }; typedef struct { ngx_thread_task_t * first; ngx_thread_task_t ** last; } ngx_thread_pool_queue_t ; struct ngx_thread_pool_t { SRWLOCK lock; ngx_thread_pool_queue_t queue; int waiting; CONDITION_VARIABLE cond; int threads; int max_queue; }; ngx_thread_pool_t *ngx_thread_pool_config (unsigned int threads) ;bool ngx_thread_pool_init_worker (ngx_thread_pool_t * tp) ;void ngx_thread_pool_exit_worker (ngx_thread_pool_t * tp) ;unsigned int ngx_thread_task_post (ngx_thread_pool_t * tp, ngx_thread_task_t * task) ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 #include "浅谈nginx线程池.h" static ngx_thread_pool_t g_tp; static unsigned int ngx_thread_pool_task_id;static unsigned int ngx_thread_pool_init (ngx_thread_pool_t * tp) ;static DWORD WINAPIngx_thread_pool_cycle (void * ctx) ;static void ngx_thread_pool_destroy (ngx_thread_pool_t * tp) ;static void ngx_thread_pool_exit_handler (void * ctx) ;ngx_thread_pool_t *ngx_thread_pool_config (unsigned int threads) { unsigned int max_queue = 65536 ; if (threads < 1 ) { threads = 1 ; } ngx_thread_pool_t * tp = &g_tp; if (tp->threads) { return tp; } tp->threads = threads; tp->max_queue = max_queue; return tp; } bool ngx_thread_pool_init_worker (ngx_thread_pool_t * tp) { if (ngx_thread_pool_init (tp) != true ) { return false ; } return true ; } static unsigned int ngx_thread_pool_init (ngx_thread_pool_t * tp) { int err; DWORD tid; SECURITY_ATTRIBUTES SecurityAttributes = {0 }; bool IsOk = true ; ngx_thread_pool_queue_init (&(tp->queue)); InitializeSRWLock (&(tp->lock)); InitializeConditionVariable (&(tp->cond)); if (IsOk == false ) { return false ; } int n = 0 ; for (n = 0 ; n < tp->threads; n++) { HANDLE ThreadHandle = CreateThread (NULL ,0 ,(LPTHREAD_START_ROUTINE)ngx_thread_pool_cycle, tp,0 ,&tid); if (ThreadHandle == NULL ) { return false ; } } return true ; } static DWORD WINAPIngx_thread_pool_cycle (LPVOID ctx) { ngx_thread_pool_t * tp = (ngx_thread_pool_t *)ctx; ngx_thread_task_t * task; for (;;) { AcquireSRWLockExclusive (&(tp->lock)); tp->waiting--; while (tp->queue.first == NULL ) { if (SleepConditionVariableSRW (&tp->cond, &tp->lock, INFINITE, NULL ) == false ) { ReleaseSRWLockExclusive (&tp->lock); return -1 ; } } task = tp->queue.first; if (task->next != NULL ) { tp->queue.first = task->next; } if (tp->queue.first == NULL ) { tp->queue.last = &tp->queue.first; } ReleaseSRWLockExclusive (&tp->lock); task->handler (task->ctx); task->next = NULL ; } } unsigned int ngx_thread_task_post (ngx_thread_pool_t * tp, ngx_thread_task_t * task) { AcquireSRWLockExclusive (&tp->lock); if (tp->waiting >= tp->max_queue) { ReleaseSRWLockExclusive (&tp->lock); return false ; } task->id = ngx_thread_pool_task_id++; task->next = NULL ; WakeConditionVariable (&tp->cond); *tp->queue.last = task; tp->queue.last = &task->next; tp->waiting++; ReleaseSRWLockExclusive (&tp->lock); return true ; } void ngx_thread_pool_exit_worker (ngx_thread_pool_t * tp) { ngx_thread_pool_destroy (tp); } static void ngx_thread_pool_destroy (ngx_thread_pool_t * tp) { unsigned int n; ngx_thread_task_t task; volatile unsigned int lock; ZeroMemory (&task, sizeof (ngx_thread_task_t )); task.handler = ngx_thread_pool_exit_handler; task.ctx = (void *)&lock; for (n = 0 ; n < tp->threads; n++) { lock = 1 ; if (ngx_thread_task_post (tp, &task) != true ) { return ; } while (lock) { Sleep (0 ); } } } static void ngx_thread_pool_exit_handler (void * data) { _tprintf(_T("ThreadIdentify:%d ngx_thread_pool_exit_handler()\r\n" ), GetCurrentThreadId ()); unsigned int * lock = (unsigned int *)data; *lock = 0 ; ExitThread (0 ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 #include "浅谈nginx线程池.h" static void ngx_thread_handler (void * ctx) ; void _tmain(){ ngx_thread_task_t task[2 ]; ZeroMemory (task, sizeof (ngx_thread_task_t ) * 2 ); int ctx1 = 1 ; int ctx2 = 2 ; task[0 ].handler = ngx_thread_handler; task[0 ].ctx = (void *)&ctx1; task[1 ].handler = ngx_thread_handler; task[1 ].ctx = (void *)&ctx2; SYSTEM_INFO SystemInfo = { 0 }; GetSystemInfo (&SystemInfo); DWORD dwNumberOfProcessors = SystemInfo.dwNumberOfProcessors; ngx_thread_pool_t * tp = ngx_thread_pool_config (3 ); if (true != ngx_thread_pool_init_worker (tp)) { goto done; } if (ngx_thread_task_post (tp, &task[0 ]) != true ) { return ; } if (ngx_thread_task_post (tp, &task[1 ]) != true ) { return ; } done: ngx_thread_pool_exit_worker (tp); return ; } static void ngx_thread_handler (void * ctx) { int v1 = *(int *)ctx; _tprintf(_T("ThreadIdentify:%d ngx_thread_handler() %d\r\n" ),GetCurrentThreadId (),v1); }