服务器项目解析(一):线程池设计

本项目使用线程池 + Reactor模式并发处理用户请求,主线程负责监听IO事件的发生,一旦socket可以读写,则通知工作线程(线程池中的线程)负责处理读写+逻辑(HTTP请求报文的解析等等)。这就是Reactor模式所做的,主线程(I/O处理单元)仅仅负责监听文件描述符上是否有事件发生即(可读、可写)事件,若有事件发生,则立即通知工作线程(逻辑单元),将socket可读可写事件放入请求队列,交给工作线程处理。

所以线程池主要做两件事情:

  1. 处理读写(write()read()函数)。
  2. 处理HTTP请求报文,包括解析HTTP请求报文和响应HTTP请求。

为什么要使用线程池?#

使用线程池,有以下三个原因:

  1. 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
  2. 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
  3. 系统无法合理管理内部的资源分布,会降低系统的稳定性。

本质上说,客户端和服务端的连接是一个生产者消费者问题,客户端可以类比为生产者,服务端则是消费者,消费客户端发送过来的数据,所以每当有一个客户端连接到服务器,发送数据时,都可以开启一个新的线程来处理这些数据,当你需要限制你应用程序中同时运行的线程数时,线程池非常有用。因为启动一个新线程会带来性能开销,每个线程也会为其堆栈分配一些内存等。为了任务的并发执行,我们可以将这些任务任务传递到线程池,而不是为每个任务动态开启一个新的线程。

Screen Shot 2022-03-03 at 21.12.41

任务对象是如何插入到线程池中的#

在eventLoop()函数的逻辑中,通过epoll_wait()阻塞监听文件描述符上的事件,如果有事件发生并且事件发生在listenfd的话,则调用dealWithClientData()函数,将listenfd上到达的connection通过 accept()接收,并返回一个新的socket文件描述符connfd用于和用户通信,并对用户请求返回响应,同时将这个connfd注册到内核事件表中,等用户发来请求报文,如下代码:

1
2
3
4
5
// 将cfd加入到红黑树上
addfd(m_epollfd, cfd, true);

// 为该次连接(用户)初始化
users[cfd].init(cfd, address);

同时,在上述的逻辑中,如果epoll_wait()阻塞监听文件描述符上的事件为读事件(EPOLLIN)或者写事件(EPOLLOUT),那么主线程就会调用读和写事件的处理函数:

1
2
3
4
5
6
7
8
9
10
// 如果是读事件
else if (events[i].events & EPOLLIN)
{
dealWithRead(sockfd);
}
// 如果是写事件
else if (events[i].events & EPOLLOUT)
{
dealWithWrite(sockfd);
}

这两个函数所做的事情也很简单,仅仅是将该任务对象(users)插入到线程池的任务队列中:

1
2
3
4
5
6
7
void webserver::dealWithRead(int sockfd) /* 处理读事件 */
{
// reactor模式
// 将socket可读事件放入请求队列,交给工作线程处理,0代表读事件
m_pool->append(users + sockfd, 0);
}
/* 写事件同理 */

线程池主要结构#

  • 所谓线程池,就是一个pthread_t类型的普通数组,通过pthread_create()函数创建m_thread_number线程,用来执行worker()函数以执行每个请求处理函数(HTTP请求的process函数),通过pthread_detach()将线程设置成脱离态(detached)后,当这一线程运行结束时,它的资源会被系统自动回收,而不再需要在其它线程中对其进行 pthread_join() 操作。
  • 操作工作队列一定要加锁(locker),因为它被所有线程共享。
  • 我们用信号量来标识请求队列中的请求数,通过m_queuestat.wait();来等待一个请求队列中待处理的HTTP请求,然后交给线程池中的空闲线程来处理。

如何实现线程池的同步互斥的?#

处理线程同步互斥问题可以考虑互斥锁、条件变量、读写锁和信号量。本线程池为1:N模型,主线程负责监听并负责添加任务(建立连接 + 创建参数)到线程池中,之后worker线程负责取任务并执行,可供选择的同步策略可以是”互斥锁 + 条件变量”或信号量来完成。

  • 互斥锁 + 条件变量(线程同步)
1
2
3
4
int pthread_mutex_lock(pthread_mutex_t *mptr);
int pthread_mutex_unlock(pthread_mutex_t *mptr);
int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);
int pthread_cond_signal(pthread_cond_t *cptr);
  • 信号量(进程、线程同步)
1
2
3
4
int sem_init(sem_t *sem, int shared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);
int sem_destory(sem_t *sem);

其实信号量初值设为1(二元信号量)时,可以实现互斥锁功能,信号量初值为N时可以实现条件变量功能。不过信号量主要上锁和解锁可以在不同线程,同步操作容易写错,另外,信号量必须先处理同步信号量再用互斥信号量包住临界区,这里写错会发生死锁情况。所以本线程池使用互斥锁 + 条件变量来实现。

线程池线程数目如何确定?#

查阅了一些资料,获取到了下面的内容:

线程池数目

并发任务的执行情况和任务类型相关,IO密集型和CPU密集型的任务运行起来的情况差异非常大,但这种占比是较难合理预估的,这导致很难有一个简单有效的通用公式帮我们直接计算出结果。

参考资料#

  1. Java线程池实现原理及其在美团业务中的实践

  2. How to set an ideal thread pool size