生产者-消费者模式

最近在学习线程池,深度剖析了之后,发现线程池实际上是一个典型的“生产者-消费者模式”,现在来重新整理回顾一下该模型,或者说叫做设计模式(pattern),加深自己的理解。

什么是生产者-消费者模式?#

假设某个模块(包括类、函数、线程、进程)负责产生数据,而这些数据由另一个模块来负责处理,则将产生数据的模块,就形象地称为【生产者】;而处理数据的模块,就称为【消费者】,在该模式中,生产者和消费者还共享了一个缓冲区,生产者把数据放入缓冲区,而消费者从缓冲区取出数据。大概的结构如下图所示:

Screen Shot 2022-02-06 at 19.39.53

为神马要使用该模式?#

在实际的软件开发中,也经常使用到这种设计模式,其优点主要有下面几点:

1. 解耦#

如果生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

例如如果要构建web服务器,处理线程的请求:生产者(也就是select、poll或者epoll)将任务提交给线程池,线程池创建线程处理任务,如果需要运行的任务数大于线程池的基本线程数,那么就把任务扔到阻塞队列(通过线程池+阻塞队列的方式比只使用一个阻塞队列的效率高很多,因为消费者能够处理就直接处理掉了,不用每个消费者都要先从阻塞队列中取出任务再执行)。

2. 支持并发(concurrency)#

生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。
使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体,生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。

3. 支持分布式#

生产者和消费者通过队列进行通讯,所以不需要运行在同一台机器上,在分布式环境中可以通过redis的list作为队列,而消费者只需要轮询队列中是否有数据。同时还能支持集群的伸缩性,当某台机器宕掉的时候,不会导致整个集群宕掉。

实现生产者消费者模式#

使用队列缓冲区#

最传统、最常见的方式。也就是单个生产者对应单个消费者,用队列(FIFO)作缓冲。下面我用C语言的互斥量简单实现一下,当然,也可以使用信号量进行实现,在这里就不赘述了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// 实现生产者消费者模型
struct msg
{
int num;
struct msg *next;
};
struct msg *header;

pthread_mutex_t mutex;
pthread_cond_t hasData;

void pthread_err(int ret, char *str)
{
if (ret != 0)
{
fprintf(stdout, "%s:%s", str, strerror(ret));
pthread_exit(NULL);
}
}

void* consumer(void *arg)
{
while (1)
{
struct msg *mp;

// 上来就要加锁
int ret = pthread_mutex_lock(&mutex);
pthread_err(ret, "wrong lock!\n");
if (header == NULL)
{
// 如果条件变量不满足,则阻塞在此处,并将mutex解锁
// 满足后,加锁mutex
pthread_cond_wait(&hasData, &mutex);
}
// 当阻塞结束,去读数据
mp = header;
header = mp->next;

pthread_mutex_unlock(&mutex);
printf("---------consumer:%d\n", mp->num);

free(mp);
sleep(rand() % 3);
}

}

void* producer(void *arg)
{
while (1)
{
struct msg *mp = malloc(sizeof(struct msg));
// 模拟生产一个数据
mp->num = rand() % 1000 + 1;
printf("--produce %d\n", mp->num);

// 并挂上去
int ret = pthread_mutex_lock(&mutex);
mp->next = header;
header = mp; // 写公共区域
pthread_mutex_unlock(&mutex);

// 通知条件变量
pthread_cond_signal(&hasData);
sleep(rand() % 3);
}
}
int main(void)
{
int ret;
pthread_t cid, pid;
srand(time(NULL));
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&hasData, NULL);

ret = pthread_create(&cid, NULL, &consumer, NULL);
pthread_err(ret, "wrong create!\n");
ret = pthread_create(&pid, NULL, &producer, NULL);
pthread_err(ret, "wrong create!\n");
pthread_join(cid, NULL);
pthread_join(pid, NULL);

pthread_exit(NULL);
}

开销#

  • 内存分配的性能开销

    在上面的代码中,生产者和消费者各是一个线程,生产者将数据写入队列头(push),消费者从队尾读出数据(pop),当队列为空,消费者就进入休眠状态,而当队列满了,生产者就进入休眠状态,流程很简单。

    但是在这个过程中的一个主要的问题就是关于内存分配的性能开销。对于常见的队列实现:在每次 push 时,可能涉及到对于堆内存的分配;在每次 pop 时,可能涉及堆内存的释放。假如生产者和消费者频繁地 push、pop,那内存分配的开销就会比较大,而对于C或者C++而言,new 或 malloc是线程安全的,也会有加锁的开销和用户态/核心态切换的开销。

  • 同步和互斥的性能

    由于两个线程共用一个队列,自然就会涉及到线程间诸如同步、互斥以及死锁等问题,比如生产者和消费者要对于队列进行读写时,应该加锁保护(如互斥量、信号量),所以也会有一定的开销。

环形缓冲区 VS 队列缓冲区#

可以把环形缓冲区的读出端(以下简称 R)和写入端(以下简称 W)想象成是两个人在体育场跑道上追逐。当 R 追上 W 的时候,就是缓冲区为空;当 W 追上 R 的时候(W 比 R 多跑一圈),就是缓冲区满。

Screen Shot 2022-02-06 at 20.21.32

环形缓冲区所有的 push/pop 操作都是在一个已经分配好了的的存储空间内进行。而队列缓冲区在 push 的时候,可能会分配存储空间用于存储新元素;在 pop 时,可能会释放废弃元素的存储空间。所以环形方式相比队列方式,少掉了对于缓冲区元素所用存储空间的分配、释放。这是环形缓冲区的一个主要优势。

环形缓冲区的实现#