Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

发展出线程池的背景

网络通信情景下

**多进程模型:**服务器要支持多个客户端的话,最简单的方式就是为每个客户端创建一个进程处理请求,当服务完成后再销毁进程。因为进程包含一套资源,在创建和销毁的时候需要分配资源和回收资源,极端情况下,当有大量的请求来,系统中给PCB的描述符还不够呢! CPU 将大量的精力用于频繁的创建和销毁进程,还有多少精力给真正需要服务的请求呢?

注意:服务端支持多客户端,服务端创建一个socket,并在这个socket上绑定 ip 和端口进行监听,当客户端调用 connect()函数时,就开始了三次握手建立连接,主进程fork子进程,让子进程处理请求任务,所以在高并发情境下,可能因为主进程忘记调用 wait 导致子进程编程僵尸进程(僵尸进程是指一个进程只有其PCB数据结构还在系统中),当系统中的僵尸进程越来越多的时候,系统的可用资源越来越少,例如进程pid 会被耗尽。

所以多进程模型在高并发情景下不仅可能会因为进程频繁上下切换耗尽CPU资源,也可能因为忘记回收子进程导致系统内僵尸进程变多,耗尽系统资源。

**多线程模型:**因为进程包含一套资源,而线程是轻量级进程,创建和销毁线程消耗的资源肯定是比进程要小,但是当请求量太大时,频繁创建和销毁线程也是挺要命的。

单机主机多任务

现在的处理器是多核处理器,也就支持多任务,单机中处理任务也是支持多进程模型和多线程模型的,实现真正的并行

多进程模型:为每个任务创建一个进程处理

多线程线程模型:为每个线程创建一个线程处理

线程池机制

定义预先创建一批线程,当有任务来时,线程池依据设定好的调度算法,为任务分配一个空闲线程,避免了因为等待线程创建浪费的时间,得以让任务能够被立刻被执行,任务执行完后,线程没有被销毁,而是重新放到了线程池中,等待下一次任务的到来。

核心思想:复用线程提高响应速度,降低时延

线程池的使用场景

网络服务器:每当有一个请求来,就在线程池中取一个线程取处理这个请求,请求处理完后,这个线程又会回到线程池中,等待下一个任务的到来,这样提高了响应的时间,也减少了资源的消耗,这就是reactor和practor的网络模型

游戏开发:游戏开发包含大量的计算,包括图形渲染、数据计算等多任务,多线程配合多核使得游戏更加流畅。

数据库连接池:

线程池的核心参数

  1. 核心线程数(corePoolSize)

    • 线程池中保持的最小线程数量

    • 即使线程空闲也不会被回收

    • 新任务会优先使用核心线程处理

  2. 最大线程数(maxPoolSize)

    • 线程池允许创建的最大线程数量

    • 每次在线程池中取线程时,都会先判断线程是否空闲,如果没有空闲线程,就会创建新的线程,但是程序中存在的线程数据量最大不能超过 maxPoolSize

  3. 工作队列(workQueue)

    • 用于保存新任务,由任务调度取出
  4. 线程空闲时间(keepAliveTime)

    • 非核心线程空闲时的存活时间

    • 超过此时间且线程数大于corePoolSize时,线程会被回收

  5. uint

    keepAliveTime 的计量单位

  6. 线程工厂(threadFactory)

    • 用于创建新线程的工厂

    • 可以自定义线程名称、优先级等

  7. 拒绝策略(handler)

    • 当线程池和工作队列都饱和时的对新任务的处理策略
    • 常见策略:
      • AbortPolicy:默认策略,抛出RejectedExecutionException
      • CallerRunsPolicy:由调用线程执行该任务
      • DiscardPolicy:直接丢弃任务
      • DiscardOldestPolicy:丢弃队列中最旧的任务并重试

C++线程池关键部分

1.线程队列—— std::vector<std::thread> workers

这个是一个线程队列,循环创建corePoolSize个核心线程,并将其添加到vector容器中,每个线程都去执行 work() 函数

1
2
3
4
std::vector<std::thread> threads;
for (size_t i = 0; i < corePoolSize; ++i) {
    threads.emplace_back([this] { this->worker(); });
}

2. 任务队列——std::queue<std::function<void()> tasks

这是一个任务队列,当一个新任务来时会将它放到任务队列中,等待被处理。因为任务队列对于线程池来说是个共享变量,所以任务队列需要使用互斥锁保证多线程的互斥访问

1
2
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;

3.互斥锁—— std::mutex queueMutex

这是一个互斥锁,因为工作线程是需要从任务队列中取出任务执行,所以任务队列对线程池中的线程来说就是共享数据,多线程访问共享数据是需要互斥访问的,互斥需要通过锁机制实现,确保某一时刻只能有一个线程访问任务队列。互斥锁如果被其他线程持有,那么这个线程就会被阻塞,直到互斥锁被释放

注意:通过mutex.lock()方式获取互斥锁,还需要手动释放锁来避免死锁

1
2
3
4
std::mutex mutex
mutex.lock()
/*访问共享变量的代码*/
mutex.unlock()

为了避免因为忘记手动释放锁导致死锁,可以使用 std::lock_guard 或者 std::unique_lock,这两种加锁方式可以在作用域结束时自动释放锁

1
2
3
4
{
std::unique_lock<std::mutex> lock(mutex);
/*访问共享变量的代码*/
}// lock自动析构释放锁

4. 条件变量——std::condition_variable condition

这个是条件变量,用于线程池中多线程的同步通信。它和互斥锁配合,当任务队列中没有任务时,工作线程可以通过条件变量进入等待状态,释放互斥锁,让出CPU资源。当有新任务添加到任务队列时,可以听过条件变量通知等待线程,唤醒线程获取互斥锁,从任务队列中取出任务执行

1
2
3
4
5
6
7
std::condition_variable condition;
std::unique_lock<std::mutex> lock(mutex);
while (tasks.empty()) {
    condition.wait(lock);
}
auto task = std::move(tasks.front());
tasks.pop();

condition.wait(lock)是无条件等待,它会让线程释放互斥锁并进入等待状态,直到其他线程执行 condition.notify_all() 或 condition.notify_one() 时,线程才会被唤醒。

线程池的工作过程

step1:每当有新任务到来时,新任务都会被添加进入任务队列中,再通过条件变量通知线程获取

step2:创建线程的时候就就让线程去执行一个工作函数,这个工作函数是不断地获取任务队列的互斥锁然后添加

C++实现线程池过程

线程池类构造的大框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

class ThreadPool {
public:
ThreadPool(size_t corePoolSize,size_t maxPoolSize,
std::chrono::milliseconds keepAliveTime);

~ThreadPool();

//添加任务到任务队列中
template<class F, class... Args>
auto submit(F&& f, Args&&... args)

private:
// 核心参数
const size_t corePoolSize; // 核心线程数
const size_t maxPoolSize; // 最大线程数
const std::chrono::milliseconds keepAliveTime; // 空闲线程存活时间

// 线程管理
std::vector<std::thread> threads; // 工作线程
std::queue<std::function<void()>> tasks; // 任务队列

// 同步原语
std::mutex queueMutex;
std::condition_variable condition;

// 状态控制
std::atomic<bool> running;
std::atomic<size_t> currentThreads; // 当前线程数
std::atomic<size_t> idleThreads; // 空闲线程数
};

构造函数

线程池的构造函数的主要功能是:初始化核心线程数量(corePoolSize)、最大线程数量(maxPoolSize)、空闲线程存活时间(keepAliveTime)。然后循环创建corePoolSize个核心线程,线程创建后立刻互斥访问任务队列获取任务执行。因为存在corePoolSize、maxPoolSize、keepAliveTime等参数,线程互斥访问任务队列获取任务有以下两种不同的情况。

情况一:带超时的任务等待(适用于非核心线程)

最多等待keepAliveTime时间,如果超时且没有任务,进入线程回收判断,如果当前的线程数大于核心线程数,就会进入回收非核心线程阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
if (keepAliveTime.count() > 0) {  // 非核心线程处理逻辑, 带超时的等待
if (!condition.wait_for(lock, keepAliveTime,
[this] { return !running || !tasks.empty(); })) {

// 超时且当前线程数超过核心线程数,结束该线程
if (currentThreads > corePoolSize) {
--currentThreads;
--idleThreads;
return;
}
continue;
}
}

情况二:无限等待(适用于核心线程)

1
2
// 无限等待
condition.wait(lock,[this] { return !running || !tasks.empty(); });
判断依据 核心线程 非核心线程
线程计数 currentThreads <= corePoolSize currentThreads > corePoolSize
等待方式 无限等待 (wait) 超时等待 (wait_for)
退出条件 仅线程池关闭时退出 空闲超时且线程数超标时退出
  1. 线程执行任务
1
2
3
4
5
6
7
--idleThreads;
if (!running && tasks.empty()) {
--currentThreads;
return;
}
task = std::move(tasks.front());
tasks.pop();

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class ThreadPool {
public:
ThreadPool(size_t corePoolSize,size_t maxPoolSize,std::chrono::milliseconds keepAliveTime)
: corePoolSize(corePoolSize),
maxPoolSize(maxPoolSize),
keepAliveTime(keepAliveTime),
running(true),
currentThreads(0),
idleThreads(0)
{
// 初始化核心线程
for (size_t i = 0; i < corePoolSize; ++i) {
addWorkerThread();
}
}

private:
void addWorkerThread() {
threads.emplace_back([this] {
while (true) {
std::function<void()> task;

{
std::unique_lock<std::mutex> lock(queueMutex);

++idleThreads;

// 等待任务或关闭信号
if (keepAliveTime.count() > 0) { // 非核心线程处理逻辑
// 带超时的等待
if (!condition.wait_for(lock, keepAliveTime,
[this] { return !running || !tasks.empty(); })) {

// 超时且当前线程数超过核心线程数,结束该线程
if (currentThreads > corePoolSize) {
--currentThreads;
--idleThreads;
return;
}
continue;
}
}
else { // 核心线程处理逻辑
// 无限等待
condition.wait(lock, [this] { return !running || !tasks.empty(); });
}

--idleThreads;

// 检查是否需要退出
if (!running && tasks.empty()) {
--currentThreads;
return;
}

// 获取任务
task = std::move(tasks.front());
tasks.pop();
}

// 执行任务
task();
}
});

++currentThreads;
}
};

小结

构造函数主要干三个事

  1. 创建多个线程
  2. 互斥访问任务队列获取任务,判断任务等待逻辑(核心线程、非核心线程)
  3. 执行任务

析构函数

析构函数释放线程池的资源。

首先,在一个临界区内(通过std::unique_lock自动管理锁)将stop标志设置为true,表示线程池要停止;然后,通过condition.notify_all()通知所有等待的线程,让它们有机会检查stop标志并退出;最后,通过循环调用thread.join()等待所有线程执行完毕,释放线程资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class ThreadPool {
public:
~ThreadPool()
{
shutdown();
}
void shutdown() {
{
std::unique_lock<std::mutex> lock(queueMutex);
running = false;
}

condition.notify_all();

for (std::thread& workerThread : threads) {
if (workerThread.joinable()) {
workerThread.join();
}
}
}
};

任务提交函数

submit 函数是线程池对外提供的核心接口,用于向线程池提交任务并获取任务的异步结果。使用 C++ 的模板特性和std::function、std::bind来实现任务的封装。std::function<void()>类型的成员变量func用于存储可调用对象,通过std::bind将传入的函数f和参数args绑定成一个无参的可调用对象,赋值给func。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template<class F, class... Args>
auto submit(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {

using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

std::future<return_type> res = task->get_future();

{
std::unique_lock<std::mutex> lock(queueMutex);

if (!running) {
throw std::runtime_error("submit on stopped ThreadPool");
}

// 如果当前线程数小于最大线程数且任务队列已满,添加新线程
if (tasks.size() >= currentThreads - idleThreads &&
currentThreads < maxPoolSize) {
addWorkerThread();
}

tasks.emplace([task]() { (*task)(); });
}

condition.notify_one();
return res;
}

模板参数

​ F:任务函数类型

​ Args…:任务函数的参数类型

返回值

​ std::future<typename std::result_of<F(Args…)>::type>:

​ 一个future对象,可用于获取任务的返回值(通过future::get()

1
2
3
template<class F, class... Args>
auto submit(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;

将传入的函数f和参数args...封装为一个无参函数对象。

使用std::packaged_task包装该函数对象,用于异步执行

通过packaged_task::get_future()获取future对象,用于获取任务结果。

1
2
3
4
5
6
7
using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

std::future<return_type> res = task->get_future();

包含了一个动态扩容机制,当前活跃线程数(currentThreads - idleThreads)小于任务队列中的任务数,也就是发生了任务积压,且当前线程数小于系统中最大线程上限(maxPoolSize)就扩增新线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
std::unique_lock<std::mutex> lock(queueMutex);

if (!running) {
throw std::runtime_error("submit on stopped ThreadPool");
}

// 动态调整线程数的逻辑
if (tasks.size() >= currentThreads - idleThreads &&
currentThreads < maxPoolSize) {
addWorkerThread();
}

tasks.emplace([task]() { (*task)(); });
}

当任务进入队列后,需要通过 condition.notify_one() 唤醒线程去执行任务函数。

1
condition.notify_one();

C++ 线程池的优化

调整线程数量

线程的上下文切换是需要消耗CPU资源的,线程的数量并不是越多越好,线程的上下文切换需要消耗较多的CPU资源,CPU要将它更多的精力放在执行任务上,增加系统的吞吐量。线程的数量也不能过少,这样会任务队列中任务堆积,任务的响应能力下降。所以如何设置线程数量也是有依据的。

对于CPU密集型任务:对于CPU密集型任务应当将CPU资源尽可能都给任务的处理,所以线程数量应该尽可能和CPU核心数持平,例如CPU核心数是3个,那么线程数应当在2~4个

对于I/O密集型任务:这类任务在执行的过程中大部分的时间都是在等待I/O操作,CPU的利用率比较低,适当增加线程数量充分利用起CPU资源,线程数量可以设置为 CPU 核心数的 2 - 3 倍。

优化任务队列数量

如果系统中只有一个任务队列,线程在互斥访问任务队列取任务时,其他的线程只能阻塞等待,当任务队列中任务较多时,从宏观角度看,就是提交的任务的响应时间很长,严重影响用户体验。根本原因就是一个所有的任务都放在一个队列中,一个队列只有一个互斥锁,想要访问任务队列只能排队获取锁,性能低下。

可以将系统中的队列优化多个,按照任务的属性进行分配,每个任务队列都有它的互斥锁和条件变量。这想当于将单通道变成多通道,系统的吞吐量就上去了

C++线程池测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <iostream>
#include <chrono>
#include <thread>
#include "ThreadPool.h"

// 定义一个简单的任务函数
std::mutex coutMutex;
int simpleTask(int num) {
std::lock_guard<std::mutex> lock(coutMutex);
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Task " << num << " executed by thread " << std::this_thread::get_id() << std::endl;
return num; // 返回任务编号
}

int main() {
// 创建线程池:核心2线程,最大5线程,空闲线程存活1秒
ThreadPool pool(2, 5, std::chrono::seconds(1));

// 提交10个任务
std::vector<std::future<int>> results;
for (int i = 0; i < 10; ++i) {
results.emplace_back(pool.submit(simpleTask, i));
}

// 获取结果
for (auto& result : results) {
std::cout << result.get() << " ";
}
system("pause");
// 线程池会在析构时自动关闭
return 0;
}

image-20250516114203351

C++实现线程池和 Java 实现线程池的区别

因为线程池在项目中应用场景很高,网络上有非常多关于实现线程池的方法,作为一个C++选手,看到很多用 Java 实现线程池的方法,发现 JAVA 可以内置多种拒绝策略、能提供ThreadFactory接口以及工作队列。而C++是需要自行调整 线程数、自行实现拒绝策略、直接创建线程代表线程工厂、C++还需要自己实现逻辑队列

  1. 动态调整:Java线程池会自动根据负载调整线程数,而C++需要手动实现
  2. 拒绝策略:Java内置多种拒绝策略,C++需要自行实现
  3. 线程工厂:Java提供ThreadFactory接口,C++通常直接创建线程
  4. 队列选择:Java提供多种BlockingQueue实现,C++需要自己实现队列逻辑

思考

1. 线程池为什么需要核心线程数和最大线程数的区分?

线程池区分核心线程数最大线程数主要是为了在资源利用率系统稳定性之间找到平衡。这种设计允许线程池根据任务负载动态调整资源分配,避免资源浪费或系统过载。

避免频繁创建 / 销毁线程应对突发任务高峰

线程的创建和销毁是有开销的(涉及内核调度、内存分配等)。对于长期存在的任务,如果每次都创建新线程,会导致额外的性能损耗

核心线程数定义了线程池的常驻线程数量。即使这些线程空闲,也不会被销毁,而是保持活跃状态等待新任务。这样,当有新任务提交时,核心线程可以立即处理,减少了任务的响应延迟

应对突发任务高峰(最大线程数存在的意义)

在任务高峰期,核心线程可能不足以快速处理所有任务,导致任务在队列中堆积,影响响应时间。

当任务队列已满且核心线程都在忙碌时,线程池会创建额外线程(直到达到最大线程数)来处理积压的任务。这些额外线程在任务处理完后,如果空闲时间超过设定的keepAliveTime,会被回收,从而释放系统资源。

防止资源耗尽

如果无限制地创建线程,会导致系统资源耗尽(如内存溢出、CPU 上下文切换开销过大),甚至引发系统崩溃。

最大线程数作为上限约束,确保线程池不会创建过多线程。当线程数达到最大线程数且任务队列已满时,新任务会触发拒绝策略(如抛出异常、丢弃任务或由调用线程自行执行)。

灵活适配不同类型的任务

  • CPU 密集型任务
    核心线程数可设置为接近 CPU 核心数(如N+1,其中N为 CPU 核心数),避免过多线程导致上下文切换开销。最大线程数可与核心线程数相同,因为增加额外线程不会提升性能。
  • IO 密集型任务
    核心线程数可设置较小(如N/2),但最大线程数可设置较大(如2N),以处理大量 IO 等待任务。当部分线程在等待 IO 时,额外线程可继续处理其他任务,提高 CPU 利用率

小结

核心线程数 最大线程数
保持系统处理稳定处理基础负载 应对突发高峰,提升吞吐量
减少线程的创建/销毁 防止资源耗尽
适用于长期稳定的任务流 适用于短期、爆发的任务流

2. 当任务队列积压时,除了扩容线程,还有那些优化手段?

任务队列积压指的是:线程池中的任务队列中待处理的任务数量持续增加,超过了系统的处理能力,导致新任务无法被及时执行

方式一:任务队列优化

多队列分流

将任务按照它的类型拆分队列,例如可以将任务按照 IO密集型、CPU密集型分队列,避免他们相互阻塞

动态队列长度调整

1
2
3
4
5
if (queueSize > highWatermark) {
// 拒绝新任务或触发降级
} else if (queueSize < lowWatermark && !isMaxCapacity()) {
// 扩大队列容量
}

设置优先级队列

将任务按照优先级排序,高优先级任务优先处理

1
std::priority_queue<Task, std::vector<Task>, ComparePriority> priorityQueue;

方式二:任务调度策略优化

为任务设置提交超时时间,避免长时间排队

1
2
3
4
5
6
7
bool submitWithTimeout(Task task, Duration timeout) {
if (!queue.tryEnqueue(task, timeout)) {
// 超时处理(如降级、记录日志)
return false;
}
return true;
}

根据系统资源使用情况动态调整调度策略

如果系统CPU使用率很高,那么优先处理 IO型的任务,反之优先处理CPU密集型的任务

方式三:系统级别优化

创建两个线程池,一个线程池用来处理核心任务,另一个线程池处理非核心业务,避免相互影响,当队列积压时,关闭非核心的线程池,通过降级策略尽快处理完紧急任务。

方式四:架构层面

引入中间件,消息队列

任务分片,将任务分发到不同的服务器中处理

评论