首页 > 编程语言 > 详细

生产者和消费者线程实现

时间:2015-12-02 20:14:43      阅读:319      评论:0      收藏:0      [点我收藏+]

一个简单的生产者消费者程序,大致思想如下:一个生产者线程,多个消费者线程,一个缓存区。缓冲区由整形数组实现,以数值-1表示清零,写入的数据为非负数。以一个结构体包含缓冲区和必要的线程锁与条件变量。以下分别有C++和C语言实现:

C++实现:

  1 #include <stdio.h>  
  2 #include <stdlib.h>
  3 #include <pthread.h>
  4 #include <unistd.h>
  5 #include <sys/select.h>
  6 #include <string.h>
  7 
  8 #define BUFF_SIZE 16    // 缓存区大小
  9 #define CONSUMER_NUM 3    // 消费者人数
 10 
 11 struct prodcons 
 12 {
 13     int buffer[BUFF_SIZE];
 14     pthread_mutex_t lock;
 15     int readpos,writepos;
 16     pthread_cond_t notEmpty;
 17     pthread_cond_t notFull;
 18 };
 19 
 20 typedef struct prodcons prodcons_t;
 21 
 22 class ProducterConsumer
 23 {
 24 public:
 25     ProducterConsumer();
 26     static void initial(prodcons_t* shareStruct);
 27     static void* producer(void* arg);            // 生产者线程执行函数
 28     static void* consumer(void* threadNum);    // 消费者线程执行函数
 29     static void stopThread();        // 销毁生产者消费者线程
 30 
 31 protected:    
 32     static void writeIn(prodcons_t *shareStruct,int data);
 33     static int readOut(prodcons_t *shareStruct, int threadNum);
 34     void createThread();    // 创建生产者和消费者
 35     
 36 private:
 37     static prodcons_t m_prodconsStruct;    // 共享结构体
 38     static bool m_isStop;        // 判断停止线程
 39     static pthread_t m_pidPro;    // 生产者线程id
 40     static pthread_t m_pidCon[CONSUMER_NUM];    // 消费者线程id
 41     
 42 };
 43 
 44 prodcons_t ProducterConsumer:: m_prodconsStruct;
 45 bool  ProducterConsumer::m_isStop = false;
 46 pthread_t ProducterConsumer::m_pidPro = 0;
 47 pthread_t ProducterConsumer::m_pidCon[CONSUMER_NUM] = {0};
 48 
 49 ProducterConsumer::ProducterConsumer()
 50 {
 51     initial(&m_prodconsStruct);
 52     createThread();
 53 }
 54 
 55 void ProducterConsumer::initial(prodcons_t *shareStruct)
 56 {
 57     pthread_mutex_init(&shareStruct->lock,NULL);
 58     pthread_cond_init(&shareStruct->notEmpty,NULL);
 59     pthread_cond_init(&shareStruct->notFull,NULL);
 60     shareStruct->readpos=0;
 61     shareStruct->writepos=0;
 62     memset(shareStruct->buffer, -1, sizeof(int)*BUFF_SIZE); // -1 表示未写入数据
 63 }
 64 
 65 void* ProducterConsumer::producer(void* arg)
 66 {
 67     int data = 0;
 68     while(!m_isStop)
 69     {
 70         writeIn(&m_prodconsStruct, data);
 71         data++;
 72     }
 73     
 74     return NULL;
 75 }
 76 
 77 void* ProducterConsumer::consumer(void* threadNum)
 78 {
 79     while(!m_isStop)    // C语言无bool类型,也就无true和false
 80     {
 81         readOut(&m_prodconsStruct,(int)threadNum);
 82     }
 83     
 84     return NULL;
 85 }
 86 
 87 void ProducterConsumer::writeIn(prodcons_t *shareStruct,int data)
 88 {
 89     pthread_mutex_lock(&shareStruct->lock);
 90     
 91     /*
 92     写指针追上读指针并且写指针指向的下一个缓存空间有数据,则表示缓存被写满,此时不可再写,否则会覆盖未读数据,
 93     我们对已经读过的缓存清零(即赋值-1),以此判断是否是写满状态,另一种方案是读写指针用一直累加的方法判断谁前谁后从而知道写满的状态
 94     */
 95     while((shareStruct->writepos == shareStruct->readpos) 
 96         && (shareStruct->buffer[(shareStruct->writepos+1)%BUFF_SIZE] != -1) 
 97         && !m_isStop) 
 98     {
 99         printf("wait for empty space\n");
100         pthread_cond_wait(&shareStruct->notFull,&shareStruct->lock);
101     }
102     
103     if(m_isStop)
104     {
105         pthread_mutex_unlock(&shareStruct->lock);
106         pthread_exit((void *)2);
107     }
108     
109     shareStruct->buffer[shareStruct->writepos % BUFF_SIZE] = data;
110     printf("writeIn----->%d\n",data);
111     shareStruct->writepos++;
112     if(shareStruct->writepos >= BUFF_SIZE)  
113         shareStruct->writepos = 0; // 缓存存满后,从头开始再存
114     
115     pthread_cond_signal(&shareStruct->notEmpty);
116     pthread_mutex_unlock(&shareStruct->lock);
117 }
118 
119 int ProducterConsumer::readOut(prodcons_t *shareStruct, int threadNum)
120 {
121     pthread_mutex_lock(&shareStruct->lock);
122     
123     while((shareStruct->readpos == shareStruct->writepos)
124         && (shareStruct->buffer[(shareStruct->readpos+1)%BUFF_SIZE] == -1)
125         && !m_isStop) 
126     {
127         printf("wait for data, Consumer Number %d\n", threadNum);
128         pthread_cond_wait(&shareStruct->notEmpty, &shareStruct->lock);
129     }
130     
131     if(m_isStop)
132     {
133         pthread_mutex_unlock(&shareStruct->lock);
134         pthread_exit((void *)2);
135     }
136     
137     int data = shareStruct->buffer[shareStruct->readpos % BUFF_SIZE];
138     printf("Consumer Number %d thread : 0x%0x and data = %d\n",threadNum,((unsigned int)pthread_self()), data);
139     shareStruct->buffer[shareStruct->readpos % BUFF_SIZE] = -1;    // 读之后清数据
140     shareStruct->readpos++;
141     if(shareStruct->readpos >= BUFF_SIZE)  
142         shareStruct->readpos=0; // 缓存读完后,从头再读
143     
144     pthread_cond_signal(&shareStruct->notFull);
145     pthread_mutex_unlock(&shareStruct->lock);
146     return data;
147 }
148 
149 void ProducterConsumer::createThread()
150 {
151     pthread_create(&m_pidPro,NULL, producer, NULL);
152     for(int i=0; i < CONSUMER_NUM; i++)
153     {
154         pthread_create(&m_pidCon[i],NULL, consumer,(void*)i);
155     }
156 }
157 
158 void ProducterConsumer::stopThread()
159 {
160     int err = -1;
161     void* ret;
162     m_isStop = true;
163     pthread_cond_broadcast(&m_prodconsStruct.notEmpty);
164     pthread_cond_broadcast(&m_prodconsStruct.notFull);
165     err = pthread_join(m_pidPro,&ret);
166     if(err != 0)
167         printf("can not join producter thread :0x%0x\n", m_pidPro);
168     else
169         printf("exit producter thread :0x%0x, exit code: %d\n", m_pidPro, (int)ret);
170     
171     for(int i = 0; i < CONSUMER_NUM; i++)
172     {
173         err = pthread_join(m_pidCon[i],&ret);
174         if(err != 0)
175             printf("can not join consumer thread :0x%0x\n", m_pidCon[i]);
176         else
177             printf("exit consumer thread :0x%0x, exit code: %d\n", m_pidCon[i], (int)ret);
178     }
179 }
180 
181 int main(void)
182 {
183     ProducterConsumer pc;
184     printf("start...\n");
185     
186     // 延迟-----决定线程跑多久
187     struct timeval timeout;
188     timeout.tv_sec = 1;    // 1s
189     timeout.tv_usec = 0;
190     select(0, NULL, NULL, NULL, &timeout);
191     
192     pc.stopThread();
193     printf("end...\n");
194     exit(0);
195 }
196 
197 /*
198 1.可以将输出打印到文件中查看,命令如下:
199     $./hhd_producter_and_consumer > logs.log   文件末尾而不是覆盖
200     $./hhd_producter_and_consumer >> logs.log  覆盖之前文件
201     
202 2.类的static变量在程序进入main()函数前初始化.
203 3.线程执行函数中类中必须定义为static类型,原因貌似是线程执行函数必须是普通函数,而类的成员函数依赖类的实例对象,
204   需要加static改变这种依赖成为全局函数,在C代码中线程执行函数无需加static.
205 */

C 语言实现:

  1 #include <stdio.h>
  2 #include <stdlib.h>
  3 #include <pthread.h>
  4 #include <unistd.h>
  5 #include <sys/select.h>
  6 #include <string.h>
  7 
  8 #define BUFF_SIZE 16    // 缓存区大小
  9 #define CONSUMER_NUM 3    // 消费者人数
 10 
 11 int isStop = 0;        // 判断停止线程
 12 
 13 struct prodcons 
 14 {
 15     int buffer[BUFF_SIZE];
 16     pthread_mutex_t lock;
 17     int readpos,writepos;
 18     pthread_cond_t notEmpty;
 19     pthread_cond_t notFull;
 20 }m_prodconsStruct;
 21 
 22 typedef struct prodcons prodcons_t;
 23 
 24 void initial(struct prodcons* shareStruct)
 25 {
 26     pthread_mutex_init(&shareStruct->lock,NULL);
 27     pthread_cond_init(&shareStruct->notEmpty,NULL);
 28     pthread_cond_init(&shareStruct->notFull,NULL);
 29     shareStruct->readpos=0;
 30     shareStruct->writepos=0;
 31     memset(shareStruct->buffer, -1, sizeof(int)*BUFF_SIZE); // -1 表示未写入数据
 32 }
 33 
 34 void writeIn(prodcons_t *shareStruct,int data)
 35 {
 36     pthread_mutex_lock(&shareStruct->lock);
 37     
 38     /*
 39     线程挂起条件:
 40     对写指针而言,缓存是环形。
 41     1.写指针追上读指针表示缓存被写满,此时不可再写,否则会覆盖未读数据;
 42     2.收到线程停止指令;
 43     此处是对已经读过的缓存清零以便下次写时判断是否是写满状态,另一方案读写指针用一直累加的方法判断谁前谁后.
 44     */
 45     while((shareStruct->writepos == shareStruct->readpos) 
 46         && (shareStruct->buffer[(shareStruct->writepos+1)%BUFF_SIZE] != -1) 
 47         && !isStop) 
 48     {
 49         printf("wait for empty space\n");
 50         pthread_cond_wait(&shareStruct->notFull,&shareStruct->lock);
 51     }
 52     
 53     if(isStop)
 54     {
 55         pthread_mutex_unlock(&shareStruct->lock);
 56         pthread_exit((void *)2);
 57     }
 58     
 59     shareStruct->buffer[shareStruct->writepos % BUFF_SIZE] = data;
 60     printf("writeIn----->%d\n",data);
 61     shareStruct->writepos++;
 62     if(shareStruct->writepos >= BUFF_SIZE)  
 63         shareStruct->writepos = 0; // 缓存存满后,从头开始再存
 64     
 65     pthread_cond_signal(&shareStruct->notEmpty);
 66     pthread_mutex_unlock(&shareStruct->lock);
 67 }
 68 
 69 int readOut(prodcons_t *shareStruct, int threadNum)
 70 {
 71     pthread_mutex_lock(&shareStruct->lock);
 72     
 73     while((shareStruct->readpos == shareStruct->writepos)
 74         && (shareStruct->buffer[(shareStruct->readpos+1)%BUFF_SIZE] == -1)
 75         && !isStop) 
 76     {
 77         printf("wait for data, Consumer Number %d\n", threadNum);
 78         pthread_cond_wait(&shareStruct->notEmpty, &shareStruct->lock);
 79     }
 80     
 81     if(isStop)
 82     {
 83         pthread_mutex_unlock(&shareStruct->lock);
 84         pthread_exit((void *)2);
 85     }
 86     
 87     int data = shareStruct->buffer[shareStruct->readpos % BUFF_SIZE];
 88     printf("Consumer Number %d thread : 0x%0x and data = %d\n",threadNum,((unsigned int)pthread_self()), data);
 89     shareStruct->buffer[shareStruct->readpos % BUFF_SIZE] = -1;    // 读之后清数据
 90     shareStruct->readpos++;
 91     if(shareStruct->readpos >= BUFF_SIZE)  
 92         shareStruct->readpos=0; // 缓存读完后,从头再读
 93     
 94     pthread_cond_signal(&shareStruct->notFull);
 95     pthread_mutex_unlock(&shareStruct->lock);
 96     return data;
 97 }
 98 
 99 void* producer(void* arg)
100 {
101     int data = 0;
102     while(1)
103     {
104         writeIn(&m_prodconsStruct, data);
105         data++;
106     }
107     
108     return NULL;
109 }
110 
111 void* consumer(void* threadNum)
112 {
113     while(1)    // C语言无bool类型,也就无true和false
114     {
115         readOut(&m_prodconsStruct,(int)threadNum);
116     }
117     
118     return NULL;
119 }
120 
121 int main(void)
122 {
123     int err = -1;
124     void* ret;
125     pthread_t pid_pro;
126     pthread_t pid_con[CONSUMER_NUM];
127     
128     // 初始化共享结构体
129     initial(&m_prodconsStruct);
130     
131     // 创建生产者和消费者
132     pthread_create(&pid_pro,NULL,producer, NULL);
133     for(int i=0; i < CONSUMER_NUM; i++)
134     {
135         pthread_create(&pid_con[i],NULL,consumer,(void*)i);
136     }
137     
138     // 延迟-----觉得线程能跑多久
139     struct timeval timeout;
140     timeout.tv_sec = 1;    // 1s
141     timeout.tv_usec = 0;
142     select(0, NULL, NULL, NULL, &timeout);
143     
144     // 销毁生产者消费者线程
145     isStop = 1;
146     pthread_cond_broadcast(&m_prodconsStruct.notEmpty);
147     pthread_cond_broadcast(&m_prodconsStruct.notFull);
148     err = pthread_join(pid_pro,&ret);
149     if(err != 0)
150         printf("can not join producter thread :0x%0x\n", pid_pro);
151     else
152         printf("exit producter thread :0x%0x, exit code: %d\n", pid_pro, (int)ret);
153     
154     for(int i = 0; i < CONSUMER_NUM; i++)
155     {
156         err = pthread_join(pid_con[i],&ret);
157         if(err != 0)
158             printf("can not join consumer thread :0x%0x\n", pid_con[i]);
159         else
160             printf("exit consumer thread :0x%0x, exit code: %d\n", pid_con[i], (int)ret);
161     }
162 
163     printf("end...\n");
164     exit(0);
165 }

 

生产者和消费者线程实现

原文:http://www.cnblogs.com/feixiang927/p/5013782.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!