线程池

处理大量的数据

线程池是可用于执行任务的线程的集合。这是一种管理和重用线程的方法,可以减少创建和销毁线程的开销。

在线程池中,会创建和维护固定数量的线程。当任务提交到线程池时,将分配一个可用线程来执行该任务。一旦任务完成,线程就会返回到池中,并可用于执行另一个任务。

线程池通常用于需要同时执行多个任务的应用程序。它们可以通过减少创建和销毁线程的开销,以及允许并行执行任务来提高性能。ThreadPoolExecutor类用于创建和管理线程池,submit方法用于将任务提交到池中执行。以下是如何使用ThreadPoolExecutor类创建线程池的示例:

1
2
3
4
5
6
7
8
from concurrent.futures import ThreadPoolExecutor

# Create a thread pool with 4 threads
with ThreadPoolExecutor(max_workers=4) as executor:
# Submit a task to the pool
future = executor.submit(my_function, arg1, arg2)
# Wait for the task to complete and get the result
result = future.result()

在本例中,使用ThreadPoolExecutor类创建了一个包含4个线程的线程池。使用submit方法将任务提交到池中,使用result方法等待任务完成并获得结果。

读写锁是一种同步机制,用于控制对共享资源的访问。读写锁允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。这可以提高并发性能,因为多个线程可以同时读取共享资源,而不会相互干扰。在Python中,可以使用threading模块中的RLock类来实现读写锁。

条件变量是一种同步机制,用于在多个线程之间传递信号。条件变量允许一个或多个线程等待某个条件变为真,然后再继续执行。在Python中,可以使用threading模块中的Condition类来实现条件变量。Condition类包含wait,notify和notifyall等方法,用于等待条件变为真,通知等待线程条件已经满足,以及通知所有等待线程条件已经满足。条件变量通常与锁一起使用,以确保线程安全。在ThreadPoolExecutor类中,可以使用条件变量来控制线程池中任务的执行顺序,例如等待某个任务完成后再执行另一个任务。

状态变量是用于跟踪线程池中任务的状态的变量。在ThreadPoolExecutor类中,有几个状态变量用于跟踪线程池中任务的状态,例如activecount,completedcount和shutdown等。activecount变量用于跟踪当前正在执行的任务数,completedcount变量用于跟踪已完成的任务数,shutdown变量用于指示线程池是否已关闭。这些状态变量可以帮助你了解线程池中任务的状态,并根据需要采取适当的措施。

读写锁和条件变量是两种不同的同步机制,用于不同的目的。读写锁用于控制对共享资源的访问,以提高并发性能。条件变量用于在多个线程之间传递信号,以控制线程的执行顺序。

读写锁允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。这可以提高并发性能,因为多个线程可以同时读取共享资源,而不会相互干扰。在Python中,可以使用threading模块中的RLock类来实现读写锁。

条件变量允许一个或多个线程等待某个条件变为真,然后再继续执行。在Python中,可以使用threading模块中的Condition类来实现条件变量。条件变量通常与锁一起使用,以确保线程安全。在ThreadPoolExecutor类中,可以使用条件变量来控制线程池中任务的执行顺序,例如等待某个任务完成后再执行另一个任务。

互斥体

互斥体(Mutex)是一种同步机制,用于保护共享资源免受并发访问的影响。在线程池中,互斥体可以用来保护共享资源,例如共享变量或共享数据结构,以确保在任何给定时间只有一个线程可以访问它们。这可以防止多个线程同时修改共享资源,从而避免数据竞争和其他并发问题。因此,在线程池中使用互斥体是非常重要的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import concurrent.futures
import threading

# 共享变量
counter = 0

# 创建一个互斥体
lock = threading.Lock()

def worker(num):
global counter
"""模拟一个耗时的任务"""
print(f"Worker {num} starting")
# 获取互斥体
with lock:
# 修改共享变量
counter += 1
print(f"Worker {num} finished")

if __name__ == '__main__':
# 创建一个包含4个线程的线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# 向线程池中添加10个任务
for i in range(10):
executor.submit(worker, i)
# 打印共享变量的值
print(f"Counter value: {counter}")

为了避免多个线程同时修改counter,我们使用了一个互斥体lock来保护它。在worker函数中,我们首先获取互斥体,然后修改共享变量counter。这样,我们就可以确保在任何给定时间只有一个线程可以访问counter,从而避免数据竞争和其他并发问题。

在计算机科学中,互斥体(Mutex)和锁(Lock)是两个常用的概念,它们都是用于保护共享资源免受并发访问的影响。在实际应用中,这两个概念有时会被混淆使用,但它们并不完全相同。

互斥体是一种同步机制,用于保护共享资源免受并发访问的影响。在多线程编程中,互斥体可以用来保护共享资源,例如共享变量或共享数据结构,以确保在任何给定时间只有一个线程可以访问它们。这可以防止多个线程同时修改共享资源,从而避免数据竞争和其他并发问题。在Python中,可以使用threading模块中的Lock类来实现互斥体的功能。

锁是一种更加通用的同步机制,它可以用于实现互斥体、信号量、条件变量等多种同步原语。在Python中,可以使用threading模块中的Lock类来实现锁的功能。Lock类提供了acquire和release方法,用于获取和释放锁。当一个线程获取了锁之后,其他线程就无法获取锁,直到该线程释放锁为止。因此,锁可以用来保护临界区,以确保在任何给定时间只有一个线程可以访问它们。

在实际应用中,互斥体和锁有时会被混淆使用,因为它们的功能有一定的重叠。在某些情况下,互斥体和锁可以互换使用,但在其他情况下,它们可能会有所不同。例如,在某些操作系统中,互斥体和锁的实现方式可能不同,因此它们的性能和行为也可能会有所不同。

在Python中,Lock类通常被用作互斥体的实现方式。因此,在线程池中使用Lock类来保护共享资源是非常常见的做法。

假唤醒

虚假唤醒(Spurious Wakeup)是指在多线程编程中,一个线程在等待某个条件变量时,即使没有其他线程通知它,它也会被唤醒。这种情况可能会导致程序出现错误或异常行为,因为线程可能会在没有满足条件的情况下被唤醒。一般是由于在设置超时的时候发生。

在Python中,可以使用threading模块中的Condition类来实现条件变量。Condition类提供了wait、notify和notify_all等方法,用于等待条件变量、通知等待线程和通知所有等待线程。在使用Condition类时,需要注意虚假唤醒的问题。为了避免虚假唤醒,可以在wait方法中使用循环来检查条件是否满足,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import threading

# 共享变量
counter = 0

# 创建一个条件变量
cond = threading.Condition()

def worker():
global counter
"""模拟一个耗时的任务"""
print("Worker starting")
# 获取条件变量
with cond:
# 检查条件是否满足
while counter < 10:
# 等待条件变量
cond.wait()
# 修改共享变量
counter += 1
print("Worker finished")

if __name__ == '__main__':
# 创建一个线程
t = threading.Thread(target=worker)
t.start()
# 修改共享变量
with cond:
counter += 1
# 通知等待线程
cond.notify()

在这个示例中,我们首先定义了一个共享变量counter,它将被多个线程同时访问和修改。然后,我们创建了一个条件变量cond,它将用于等待和通知线程。在worker函数中,我们首先获取条件变量,然后使用循环来检查条件是否满足。如果条件不满足,线程将等待条件变量。在主线程中,我们修改了共享变量counter,然后通知等待线程。在worker函数中,线程被唤醒后,会再次检查条件是否满足,如果条件满足,就会修改共享变量counter。这样,我们就可以避免虚假唤醒的问题,确保线程只有在满足条件的情况下才会被唤醒。

总之,虚假唤醒是多线程编程中常见的问题,需要特别注意。在Python中,可以使用threading模块中的Condition类来实现条件变量,并使用循环来避免虚假唤醒的问题。

在c++中,如果要解决,需要在传入timeout的同时传入一个函数指针,当二者同时满足的时候才能继续向下执行。

nginx线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#pragma once
#include <windows.h>
#include <iostream>
#include <tchar.h>
using namespace std;


#define ngx_thread_pool_queue_init(q) \
(q)->first = NULL; \
(q)->last = &(q)->first


//任务节点
struct ngx_thread_task_t {
ngx_thread_task_t* next; //使用列表保存任务节点
unsigned int id; //对该任务的一个标识
void* ctx; //用户数据私有数据
void (*handler)(void* ctx); //处理私用数据的函数
};

typedef struct {
ngx_thread_task_t* first;
ngx_thread_task_t** last;
} ngx_thread_pool_queue_t;


//线程池的管理员
struct ngx_thread_pool_t {

SRWLOCK lock; //读写锁 条件变量
ngx_thread_pool_queue_t queue; //存放任务节点的队列 内存池
int waiting;
CONDITION_VARIABLE cond; //条件变量
int threads; //池中的配置的线程最大个数
int max_queue;
};


ngx_thread_pool_t*
ngx_thread_pool_config(unsigned int threads);

bool
ngx_thread_pool_init_worker(ngx_thread_pool_t* tp);
void
ngx_thread_pool_exit_worker(ngx_thread_pool_t* tp);


unsigned int
ngx_thread_task_post(ngx_thread_pool_t* tp, ngx_thread_task_t* task);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#include "浅谈nginx线程池.h"



//条件变量
//读写锁
static ngx_thread_pool_t g_tp; //定义了一个全局静态的线程池管理员结构体
static unsigned int ngx_thread_pool_task_id;

static unsigned int
ngx_thread_pool_init(ngx_thread_pool_t* tp);
static DWORD WINAPI
ngx_thread_pool_cycle(void* ctx);
static void
ngx_thread_pool_destroy(ngx_thread_pool_t* tp);
static void
ngx_thread_pool_exit_handler(void* ctx);

ngx_thread_pool_t*
ngx_thread_pool_config(unsigned int threads) //设置的线程个数
{
unsigned int max_queue = 65536; //任务队列
if (threads < 1)
{
threads = 1;
}
ngx_thread_pool_t* tp = &g_tp; //全局静态结构体
if (tp->threads)
{
return tp;
}
tp->threads = threads; //设置线程池中的线程个数
tp->max_queue = max_queue;
return tp;
}
bool
ngx_thread_pool_init_worker(ngx_thread_pool_t* tp)
{
//日志信息记录


if (ngx_thread_pool_init(tp) != true)
{
return false;
}
return true;
}
static unsigned int
ngx_thread_pool_init(ngx_thread_pool_t* tp) //网络 文件读写 组件与组件IO操作
{
int err;
DWORD tid;
SECURITY_ATTRIBUTES SecurityAttributes = {0};
bool IsOk = true;



ngx_thread_pool_queue_init(&(tp->queue)); //初始化队列

//创建读写锁
InitializeSRWLock(&(tp->lock));//初始化读写锁 线程同步

//创建条件变量
InitializeConditionVariable(&(tp->cond));


//可以在这里进行线程池中的线程属性设置
//IsOk = pthread_attr_init(&attr);
if (IsOk == false)
{
return false;
}
//为线程池创建线程
int n = 0;
for (n = 0; n < tp->threads; n++)
{
//线程栈的默认大小是1M
HANDLE ThreadHandle = CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)ngx_thread_pool_cycle, tp,0,&tid);
if (ThreadHandle == NULL)
{
//线程回收
//工作队列任务节点进行销毁
return false;
}
}
//(void)pthread_attr_destroy(&attr); //销毁之前为线程设置属性时产生的动态内存
return true;
}
static DWORD WINAPI
ngx_thread_pool_cycle(LPVOID ctx)
{
ngx_thread_pool_t* tp = (ngx_thread_pool_t*)ctx; //线程池管理员
ngx_thread_task_t* task;
for (;;)
{

AcquireSRWLockExclusive(&(tp->lock)); //获得SRW锁 独占权
tp->waiting--; //注意看看有没有初始化
while (tp->queue.first == NULL) //看看有无任务
{
if (SleepConditionVariableSRW(&tp->cond, &tp->lock, INFINITE, NULL) == false) //释放锁 Sleep 等待通知唤醒重新拿到锁的控制权
{
//调用上述函数失败
ReleaseSRWLockExclusive(&tp->lock);
return -1;
}
}
//如果任务存在
task = tp->queue.first;

if (task->next != NULL)
{
tp->queue.first = task->next; //队列任务下移
}
if (tp->queue.first == NULL)
{
//只有上述一个任务的时候 将Last域置空
tp->queue.last = &tp->queue.first;
}
ReleaseSRWLockExclusive(&tp->lock);
//处理队列中的认为
task->handler(task->ctx); //将任务插入到队列的时候都会放置处理函数
task->next = NULL; //当前任务从 任务队列中断开

//假如task是使用了动态申请 一定要主要回收
}
}
unsigned int
ngx_thread_task_post(ngx_thread_pool_t* tp, ngx_thread_task_t* task)
{
AcquireSRWLockExclusive(&tp->lock); //获得SRW锁
if (tp->waiting >= tp->max_queue)
{
ReleaseSRWLockExclusive(&tp->lock);
return false;
}
task->id = ngx_thread_pool_task_id++; //使用全局变量为每个任务节点进行编号
task->next = NULL;
WakeConditionVariable(&tp->cond); //通知工作线程
*tp->queue.last = task;
tp->queue.last = &task->next;
tp->waiting++;
ReleaseSRWLockExclusive(&tp->lock);
return true;
}
void
ngx_thread_pool_exit_worker(ngx_thread_pool_t* tp)
{
ngx_thread_pool_destroy(tp);
}
static void
ngx_thread_pool_destroy(ngx_thread_pool_t* tp)
{
unsigned int n;
ngx_thread_task_t task; //投递退出循环的任务
volatile unsigned int lock; //不允许从寄存器中获取该数据
ZeroMemory(&task, sizeof(ngx_thread_task_t));
task.handler = ngx_thread_pool_exit_handler; //杀死当前线程
task.ctx = (void*)&lock;
for (n = 0; n < tp->threads; n++)
{
lock = 1;
if (ngx_thread_task_post(tp, &task) != true) //开一枪 死一个
{
return;
}
while (lock)
{
Sleep(0);
}
}
}
static void
ngx_thread_pool_exit_handler(void* data)
{

_tprintf(_T("ThreadIdentify:%d ngx_thread_pool_exit_handler()\r\n"), GetCurrentThreadId());
unsigned int* lock = (unsigned int*)data;
*lock = 0;
ExitThread(0);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include "浅谈nginx线程池.h"

//条件变量 读写锁
//数据结构
//销毁线程池
//


static void ngx_thread_handler(void* ctx); //只能在当前CPP文件中被调用
void _tmain()
{

ngx_thread_task_t task[2]; //处理任务
ZeroMemory(task, sizeof(ngx_thread_task_t) * 2); //内存进行初始
int ctx1 = 1;
int ctx2 = 2;
task[0].handler = ngx_thread_handler; //处理数据的函数
task[0].ctx = (void*)&ctx1;

task[1].handler = ngx_thread_handler;
task[1].ctx = (void*)&ctx2;



//线程池初始化 必须指定线程池中的线程个数

//获取当前系统信息
SYSTEM_INFO SystemInfo = { 0 };
GetSystemInfo(&SystemInfo);
DWORD dwNumberOfProcessors = SystemInfo.dwNumberOfProcessors;


//线程池进行信息配置
ngx_thread_pool_t* tp = ngx_thread_pool_config(3); //线程池的管理员

//线程池进行初始化
if (true != ngx_thread_pool_init_worker(tp))
{
goto done;
}

//投递任务
if (ngx_thread_task_post(tp, &task[0]) != true)
{
return;
}
if (ngx_thread_task_post(tp, &task[1]) != true)
{
return;
}


done:
ngx_thread_pool_exit_worker(tp); //通知工作线程解散
return;
}



static void ngx_thread_handler(void* ctx) //任务节点的数据处理函数
{
int v1 = *(int*)ctx;

_tprintf(_T("ThreadIdentify:%d ngx_thread_handler() %d\r\n"),GetCurrentThreadId(),v1);

}