实验目的
①实现生产者—消费者问题的模拟,以便更好的理解此经典进程同步问题。生产者-消费者问题是典型的PV操作问题,假设系统中有一个比较大的缓冲池,生产者的任务是只要缓冲池未满就可以将生产出的产品放入其中,而消费者的任务是只要缓冲池未空就可以从缓冲池中拿走产品。缓冲池被占用时,任何进程都不能访问。
②每一个生产者都要把自己生产的产品放入缓冲池,每个消费者从缓冲池中取走产品消费。在这种情况下,生产者消费者进程同步,因为只有通过互通消息才知道是否能存入产品或者取走产品。他们之间也存在互斥,即生产者消费者必须互斥访问缓冲池,即不能有两个以上的进程同时进行。
实验原理
在同一个进程地址空间内执行两个线程。生产者线程生产物品,然后将物品放置在一个空缓冲区中供消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻挡,直到新的物品被生产出来。
生产者流程图
消费者流程图
注意点
①本次实验是关于生产者与消费者之间互斥和同步的问题。问题的是指是P、V操作,实验设一个共享缓冲区,生产者和消费者互斥的使用,当一个线程使用缓冲区的时候,另一个让其等待直到前一个线程释放缓冲区为止。
②生产者与消费者是一个与现实有关的经验问题,通过此原理举一反三可以解决其他类似的问题。 通过本实验设计,我们对操作系统的P、V进一步的认识,深入的了解P、V操作的实质和其重要性。课本的理论知识进一步阐述了现实中的实际问题。
③Linux环境下编写变异C语言有Windows稍有不同,注意在Linux中编译带有线程<pthread.h>头文件的源程序需要加上参数-lpthread
并且如果要在Linux控制台输出中文还得更改为utf-8编码。
④在编写生产者消费者函数的时候,最好把P、V操作整体嵌入其中,不必在另外设置函数体。否则,如果像:
Semaphore mutex=1//此时mutex为全局变量
如果在生产者(消费者)函数体中这样写:
Producer()
{
……
Wait(mutex);
……
}
此时根本无法改变全局变量的值,即mutex值没有变。
如果非要用函数,则需要对每个信号量设置一个P函数,一个V函数或者不对mutex设为全局变量。
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
#include <stdio.h> #include <pthread.h> #include <windows.h> #define N 100 #define true 1 #define producerNum 10 #define consumerNum 5 #define sleepTime 1000 typedef int semaphore; typedef int item; item buffer[N] = {0}; int in = 0; int out = 0; int proCount = 0; semaphore mutex = 1, empty = N, full = 0, proCmutex = 1; void * producer(void * a){ while(true){ while(proCmutex <= 0); proCmutex--; proCount++; printf("生产一个产品ID%d, 缓冲区位置为%d\n",proCount,in); proCmutex++; while(empty <= 0){ printf("缓冲区已满!\n"); } empty--; while(mutex <= 0); mutex--; buffer[in] = proCount; in = (in + 1) % N; mutex++; full++; Sleep(sleepTime); } } void * consumer(void *b){ while(true){ while(full <= 0){ printf("缓冲区为空!\n"); } full--; while(mutex <= 0); mutex--; int nextc = buffer[out]; buffer[out] = 0;//消费完将缓冲区设置为0 out = (out + 1) % N; mutex++; empty++; printf("\t\t\t\t消费一个产品ID%d,缓冲区位置为%d\n", nextc,out); Sleep(sleepTime); } } int main() { pthread_t threadPool[producerNum+consumerNum]; int i; for(i = 0; i < producerNum; i++){ pthread_t temp; if(pthread_create(&temp, NULL, producer, NULL) == -1){ printf("ERROR, fail to create producer%d\n", i); exit(1); } threadPool[i] = temp; }//创建生产者进程放入线程池 for(i = 0; i < consumerNum; i++){ pthread_t temp; if(pthread_create(&temp, NULL, consumer, NULL) == -1){ printf("ERROR, fail to create consumer%d\n", i); exit(1); } threadPool[i+producerNum] = temp; }//创建消费者进程放入线程池 void * result; for(i = 0; i < producerNum+consumerNum; i++){ if(pthread_join(threadPool[i], &result) == -1){ printf("fail to recollect\n"); exit(1); } }//运行线程池 return 0; } |
推荐使用pthread_mutex_lock(&mutex)来加锁,判断无法生产或生产已达上限后,应进入循环将线程阻塞,大概为 while(!canProduce() && !produceStop) { pthread_cond_wait(&canProduceCond, &mutex); }
最近在烤漆,结束后会整理一下。
好的,谢谢
如果是在SMP系统上代码不能正常工作。问题在于线程可能运行在不同的core上并行地修改全局变量,所以用于做mutex或者semaphore的变量在修改时必须是RMW(Read modify write),而且操作还应该能返回供判断的结果。所以正确地实现他们需要使用处理器提供的指令。例如:在x86中bts指令加上lock prefix可以实现lock, 它通过设置位并将原值返回到状态位CF中,从而实现了对共享变量的RMW.
感谢指导。
抱歉,之前评论时没有全面考虑问题,这段代码在单核处理器上也是有问题的。这里还有一个更重要的问题,就是对信号的操作,操作系统相关的书籍在提及wait和signal时一般都会说它们是”原语”,就是说它们是原子的,要么执行要么不执行,是不能被中断的。但这个例子中对信号量的修改
while(semaphore 线程A回到就绪队列 -> 线程B观测semaphore值。
顺便说一下注意点4里的问题,一般操作系统提供的此类函数都以指针作为参数,所以并不需要对每个信号量都设置一个wait和signal函数。
丢失的部分:但这个例子中对信号量的修改while(semaphore 线程A回到就绪队列 -> 线程B观测semaphore值。
这个列子 生产者while(proCmutex <= 0);这里加多几个生产者还是会发生抢占,覆盖掉前一个写入的缓存区
感谢提醒,请问有什么办法解决吗