一个简单的生产者消费者程序,大致思想如下:一个生产者线程,多个消费者线程,一个缓存区。缓冲区由整形数组实现,以数值-1表示清零,写入的数据为非负数。以一个结构体包含缓冲区和必要的线程锁与条件变量。以下分别有C++和C语言实现:
C++实现:
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <pthread.h> 4 #include <unistd.h> 5 #include <sys/select.h> 6 #include <string.h> 7 8 #define BUFF_SIZE 16 // 缓存区大小 9 #define CONSUMER_NUM 3 // 消费者人数 10 11 struct prodcons 12 { 13 int buffer[BUFF_SIZE]; 14 pthread_mutex_t lock; 15 int readpos,writepos; 16 pthread_cond_t notEmpty; 17 pthread_cond_t notFull; 18 }; 19 20 typedef struct prodcons prodcons_t; 21 22 class ProducterConsumer 23 { 24 public: 25 ProducterConsumer(); 26 static void initial(prodcons_t* shareStruct); 27 static void* producer(void* arg); // 生产者线程执行函数 28 static void* consumer(void* threadNum); // 消费者线程执行函数 29 static void stopThread(); // 销毁生产者消费者线程 30 31 protected: 32 static void writeIn(prodcons_t *shareStruct,int data); 33 static int readOut(prodcons_t *shareStruct, int threadNum); 34 void createThread(); // 创建生产者和消费者 35 36 private: 37 static prodcons_t m_prodconsStruct; // 共享结构体 38 static bool m_isStop; // 判断停止线程 39 static pthread_t m_pidPro; // 生产者线程id 40 static pthread_t m_pidCon[CONSUMER_NUM]; // 消费者线程id 41 42 }; 43 44 prodcons_t ProducterConsumer:: m_prodconsStruct; 45 bool ProducterConsumer::m_isStop = false; 46 pthread_t ProducterConsumer::m_pidPro = 0; 47 pthread_t ProducterConsumer::m_pidCon[CONSUMER_NUM] = {0}; 48 49 ProducterConsumer::ProducterConsumer() 50 { 51 initial(&m_prodconsStruct); 52 createThread(); 53 } 54 55 void ProducterConsumer::initial(prodcons_t *shareStruct) 56 { 57 pthread_mutex_init(&shareStruct->lock,NULL); 58 pthread_cond_init(&shareStruct->notEmpty,NULL); 59 pthread_cond_init(&shareStruct->notFull,NULL); 60 shareStruct->readpos=0; 61 shareStruct->writepos=0; 62 memset(shareStruct->buffer, -1, sizeof(int)*BUFF_SIZE); // -1 表示未写入数据 63 } 64 65 void* ProducterConsumer::producer(void* arg) 66 { 67 int data = 0; 68 while(!m_isStop) 69 { 70 writeIn(&m_prodconsStruct, data); 71 data++; 72 } 73 74 return NULL; 75 } 76 77 void* ProducterConsumer::consumer(void* threadNum) 78 { 79 while(!m_isStop) // C语言无bool类型,也就无true和false 80 { 81 readOut(&m_prodconsStruct,(int)threadNum); 82 } 83 84 return NULL; 85 } 86 87 void ProducterConsumer::writeIn(prodcons_t *shareStruct,int data) 88 { 89 pthread_mutex_lock(&shareStruct->lock); 90 91 /* 92 写指针追上读指针并且写指针指向的下一个缓存空间有数据,则表示缓存被写满,此时不可再写,否则会覆盖未读数据, 93 我们对已经读过的缓存清零(即赋值-1),以此判断是否是写满状态,另一种方案是读写指针用一直累加的方法判断谁前谁后从而知道写满的状态 94 */ 95 while((shareStruct->writepos == shareStruct->readpos) 96 && (shareStruct->buffer[(shareStruct->writepos+1)%BUFF_SIZE] != -1) 97 && !m_isStop) 98 { 99 printf("wait for empty space\n"); 100 pthread_cond_wait(&shareStruct->notFull,&shareStruct->lock); 101 } 102 103 if(m_isStop) 104 { 105 pthread_mutex_unlock(&shareStruct->lock); 106 pthread_exit((void *)2); 107 } 108 109 shareStruct->buffer[shareStruct->writepos % BUFF_SIZE] = data; 110 printf("writeIn----->%d\n",data); 111 shareStruct->writepos++; 112 if(shareStruct->writepos >= BUFF_SIZE) 113 shareStruct->writepos = 0; // 缓存存满后,从头开始再存 114 115 pthread_cond_signal(&shareStruct->notEmpty); 116 pthread_mutex_unlock(&shareStruct->lock); 117 } 118 119 int ProducterConsumer::readOut(prodcons_t *shareStruct, int threadNum) 120 { 121 pthread_mutex_lock(&shareStruct->lock); 122 123 while((shareStruct->readpos == shareStruct->writepos) 124 && (shareStruct->buffer[(shareStruct->readpos+1)%BUFF_SIZE] == -1) 125 && !m_isStop) 126 { 127 printf("wait for data, Consumer Number %d\n", threadNum); 128 pthread_cond_wait(&shareStruct->notEmpty, &shareStruct->lock); 129 } 130 131 if(m_isStop) 132 { 133 pthread_mutex_unlock(&shareStruct->lock); 134 pthread_exit((void *)2); 135 } 136 137 int data = shareStruct->buffer[shareStruct->readpos % BUFF_SIZE]; 138 printf("Consumer Number %d thread : 0x%0x and data = %d\n",threadNum,((unsigned int)pthread_self()), data); 139 shareStruct->buffer[shareStruct->readpos % BUFF_SIZE] = -1; // 读之后清数据 140 shareStruct->readpos++; 141 if(shareStruct->readpos >= BUFF_SIZE) 142 shareStruct->readpos=0; // 缓存读完后,从头再读 143 144 pthread_cond_signal(&shareStruct->notFull); 145 pthread_mutex_unlock(&shareStruct->lock); 146 return data; 147 } 148 149 void ProducterConsumer::createThread() 150 { 151 pthread_create(&m_pidPro,NULL, producer, NULL); 152 for(int i=0; i < CONSUMER_NUM; i++) 153 { 154 pthread_create(&m_pidCon[i],NULL, consumer,(void*)i); 155 } 156 } 157 158 void ProducterConsumer::stopThread() 159 { 160 int err = -1; 161 void* ret; 162 m_isStop = true; 163 pthread_cond_broadcast(&m_prodconsStruct.notEmpty); 164 pthread_cond_broadcast(&m_prodconsStruct.notFull); 165 err = pthread_join(m_pidPro,&ret); 166 if(err != 0) 167 printf("can not join producter thread :0x%0x\n", m_pidPro); 168 else 169 printf("exit producter thread :0x%0x, exit code: %d\n", m_pidPro, (int)ret); 170 171 for(int i = 0; i < CONSUMER_NUM; i++) 172 { 173 err = pthread_join(m_pidCon[i],&ret); 174 if(err != 0) 175 printf("can not join consumer thread :0x%0x\n", m_pidCon[i]); 176 else 177 printf("exit consumer thread :0x%0x, exit code: %d\n", m_pidCon[i], (int)ret); 178 } 179 } 180 181 int main(void) 182 { 183 ProducterConsumer pc; 184 printf("start...\n"); 185 186 // 延迟-----决定线程跑多久 187 struct timeval timeout; 188 timeout.tv_sec = 1; // 1s 189 timeout.tv_usec = 0; 190 select(0, NULL, NULL, NULL, &timeout); 191 192 pc.stopThread(); 193 printf("end...\n"); 194 exit(0); 195 } 196 197 /* 198 1.可以将输出打印到文件中查看,命令如下: 199 $./hhd_producter_and_consumer > logs.log 文件末尾而不是覆盖 200 $./hhd_producter_and_consumer >> logs.log 覆盖之前文件 201 202 2.类的static变量在程序进入main()函数前初始化. 203 3.线程执行函数中类中必须定义为static类型,原因貌似是线程执行函数必须是普通函数,而类的成员函数依赖类的实例对象, 204 需要加static改变这种依赖成为全局函数,在C代码中线程执行函数无需加static. 205 */
C 语言实现:
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <pthread.h> 4 #include <unistd.h> 5 #include <sys/select.h> 6 #include <string.h> 7 8 #define BUFF_SIZE 16 // 缓存区大小 9 #define CONSUMER_NUM 3 // 消费者人数 10 11 int isStop = 0; // 判断停止线程 12 13 struct prodcons 14 { 15 int buffer[BUFF_SIZE]; 16 pthread_mutex_t lock; 17 int readpos,writepos; 18 pthread_cond_t notEmpty; 19 pthread_cond_t notFull; 20 }m_prodconsStruct; 21 22 typedef struct prodcons prodcons_t; 23 24 void initial(struct prodcons* shareStruct) 25 { 26 pthread_mutex_init(&shareStruct->lock,NULL); 27 pthread_cond_init(&shareStruct->notEmpty,NULL); 28 pthread_cond_init(&shareStruct->notFull,NULL); 29 shareStruct->readpos=0; 30 shareStruct->writepos=0; 31 memset(shareStruct->buffer, -1, sizeof(int)*BUFF_SIZE); // -1 表示未写入数据 32 } 33 34 void writeIn(prodcons_t *shareStruct,int data) 35 { 36 pthread_mutex_lock(&shareStruct->lock); 37 38 /* 39 线程挂起条件: 40 对写指针而言,缓存是环形。 41 1.写指针追上读指针表示缓存被写满,此时不可再写,否则会覆盖未读数据; 42 2.收到线程停止指令; 43 此处是对已经读过的缓存清零以便下次写时判断是否是写满状态,另一方案读写指针用一直累加的方法判断谁前谁后. 44 */ 45 while((shareStruct->writepos == shareStruct->readpos) 46 && (shareStruct->buffer[(shareStruct->writepos+1)%BUFF_SIZE] != -1) 47 && !isStop) 48 { 49 printf("wait for empty space\n"); 50 pthread_cond_wait(&shareStruct->notFull,&shareStruct->lock); 51 } 52 53 if(isStop) 54 { 55 pthread_mutex_unlock(&shareStruct->lock); 56 pthread_exit((void *)2); 57 } 58 59 shareStruct->buffer[shareStruct->writepos % BUFF_SIZE] = data; 60 printf("writeIn----->%d\n",data); 61 shareStruct->writepos++; 62 if(shareStruct->writepos >= BUFF_SIZE) 63 shareStruct->writepos = 0; // 缓存存满后,从头开始再存 64 65 pthread_cond_signal(&shareStruct->notEmpty); 66 pthread_mutex_unlock(&shareStruct->lock); 67 } 68 69 int readOut(prodcons_t *shareStruct, int threadNum) 70 { 71 pthread_mutex_lock(&shareStruct->lock); 72 73 while((shareStruct->readpos == shareStruct->writepos) 74 && (shareStruct->buffer[(shareStruct->readpos+1)%BUFF_SIZE] == -1) 75 && !isStop) 76 { 77 printf("wait for data, Consumer Number %d\n", threadNum); 78 pthread_cond_wait(&shareStruct->notEmpty, &shareStruct->lock); 79 } 80 81 if(isStop) 82 { 83 pthread_mutex_unlock(&shareStruct->lock); 84 pthread_exit((void *)2); 85 } 86 87 int data = shareStruct->buffer[shareStruct->readpos % BUFF_SIZE]; 88 printf("Consumer Number %d thread : 0x%0x and data = %d\n",threadNum,((unsigned int)pthread_self()), data); 89 shareStruct->buffer[shareStruct->readpos % BUFF_SIZE] = -1; // 读之后清数据 90 shareStruct->readpos++; 91 if(shareStruct->readpos >= BUFF_SIZE) 92 shareStruct->readpos=0; // 缓存读完后,从头再读 93 94 pthread_cond_signal(&shareStruct->notFull); 95 pthread_mutex_unlock(&shareStruct->lock); 96 return data; 97 } 98 99 void* producer(void* arg) 100 { 101 int data = 0; 102 while(1) 103 { 104 writeIn(&m_prodconsStruct, data); 105 data++; 106 } 107 108 return NULL; 109 } 110 111 void* consumer(void* threadNum) 112 { 113 while(1) // C语言无bool类型,也就无true和false 114 { 115 readOut(&m_prodconsStruct,(int)threadNum); 116 } 117 118 return NULL; 119 } 120 121 int main(void) 122 { 123 int err = -1; 124 void* ret; 125 pthread_t pid_pro; 126 pthread_t pid_con[CONSUMER_NUM]; 127 128 // 初始化共享结构体 129 initial(&m_prodconsStruct); 130 131 // 创建生产者和消费者 132 pthread_create(&pid_pro,NULL,producer, NULL); 133 for(int i=0; i < CONSUMER_NUM; i++) 134 { 135 pthread_create(&pid_con[i],NULL,consumer,(void*)i); 136 } 137 138 // 延迟-----觉得线程能跑多久 139 struct timeval timeout; 140 timeout.tv_sec = 1; // 1s 141 timeout.tv_usec = 0; 142 select(0, NULL, NULL, NULL, &timeout); 143 144 // 销毁生产者消费者线程 145 isStop = 1; 146 pthread_cond_broadcast(&m_prodconsStruct.notEmpty); 147 pthread_cond_broadcast(&m_prodconsStruct.notFull); 148 err = pthread_join(pid_pro,&ret); 149 if(err != 0) 150 printf("can not join producter thread :0x%0x\n", pid_pro); 151 else 152 printf("exit producter thread :0x%0x, exit code: %d\n", pid_pro, (int)ret); 153 154 for(int i = 0; i < CONSUMER_NUM; i++) 155 { 156 err = pthread_join(pid_con[i],&ret); 157 if(err != 0) 158 printf("can not join consumer thread :0x%0x\n", pid_con[i]); 159 else 160 printf("exit consumer thread :0x%0x, exit code: %d\n", pid_con[i], (int)ret); 161 } 162 163 printf("end...\n"); 164 exit(0); 165 }
原文:http://www.cnblogs.com/feixiang927/p/5013782.html