线程基类头文件:
#ifndef YTHREAD_H
#define YTHREAD_H
#include <iostream>
using namespace std;
#define CERR(str) cout<<"error:"<< str<<std::endl
#define CINFO(str) cout<<"info:"<< str<<std::endl
enum ResCode
{
RES_OK = 0,
RES_ERR = -1,
};
class TAutoLock
{
public:
TAutoLock(pthread_mutex_t & lock) : m_lock(&lock) { pthread_mutex_lock(m_lock); }
~TAutoLock(void) { release(); }
void release(void)
{
if (m_lock)
{
pthread_mutex_unlock(m_lock);
m_lock = NULL;
}
}
protected:
mutable pthread_mutex_t * m_lock;
};
class YThreadState
{
public:
enum typeState
{
TS_NONE,
TS_BUILD,
TS_WAITTING,
TS_RUNNING,
TS_TOEND,
TS_ENDING,
TS_DEF_END
};
//! return the state information
static const char * getStateString(typeState nState);
//! Constructor
YThreadState(void) : m_nState(TS_NONE),m_nResult(RES_OK)
{
pthread_mutex_init(&m_LockState, NULL);
pthread_cond_init(&m_ConditionState,NULL);
}
virtual ~YThreadState(void)
{
pthread_mutex_destroy(&m_LockState);
pthread_cond_destroy(&m_ConditionState);
//! Destructor
}
//! return the current status
typeState getState(void) const { return m_nState; }
protected:
//! sub-class may decide whether to exit in function Run();
bool mustExit(void) const { return m_nState == TS_TOEND; }
//! reset the object‘s state
void setState(typeState state) { m_nState = state; }
volatile typeState m_nState;
int m_nResult;
pthread_mutex_t m_LockState;
pthread_cond_t m_ConditionState;
};
class YThreadBase
{
public:
YThreadBase(void);
virtual ~YThreadBase(void) {}
enum THREAD_PRIORITY
{
PRIORITY_HIGHEST = 7,
PRIORITY_ABOVE_NORMAL = 1,
PRIORITY_NORMAL = 0,
PRIORITY_BELOW_NORMAL = -1,
PRIORITY_LOWEST = -7,
};
protected:
#define ThreadFuncReturnCode void*
typedef void * (* ThreadFunc)(void * arg);
typedef pthread_t typeThreaID;
typeThreaID m_nPthreadID;
static int threadExit(int nValue);
static int threadStartImp(ThreadFunc func,void * arg,int nStackSize,int nPrio,typeThreaID * pThreadID,bool boDetach);
virtual int run(void) = 0;
protected:
int threadStart(ThreadFunc func,void * arg,int nStackSize,int nPriority, bool boDetach)
{ return threadStartImp(func,arg,nStackSize,nPriority,&m_nPthreadID, boDetach); }
bool threadJoin(void ** status = NULL);
bool threadDetach(void);
void threadYield(void);
};
class YThread:public YThreadBase, public YThreadState
{
public:
YThread(void):YThreadBase(), YThreadState() {}
virtual ~YThread(void) {}
virtual void start(void);
void startEx(int nStackSize = 64 * 1024,int nPriority = YThreadBase::PRIORITY_NORMAL);
virtual void stop(void);
virtual int stopEx(void);
virtual void beforeStop(void) =0;
unsigned int getThreadID(void) const { return (unsigned int)(m_nPthreadID); }
protected:
//bool suspend(int nSec = INFINITE_VALUE,int nNSec = INFINITE_VALUE);
private:
static ThreadFuncReturnCode threadProxy(void * arg);
};
#endif // YTHREAD_H线程基类源文件
#include "ythread.h"
//#include "ttmutex.h"
#include <assert.h>
#include <pthread.h>
const char * g_lpszThreadState[] = {
"None, runnable object not build", //TS_NONE, 线程还没有建立
"Just Build the runnable object", //TS_BUILD, 线程刚刚建立完
"Waiting for task", //TS_WAITTING, 线程正在等待任务
"Running", //TS_RUNNING, 线程正在执行
"Will be end", //TS_TOEND, 线程准备要结束
"Closing the runnable object", //TS_ENDING, 线程正在结束
"Runnable object is Closed", //TS_END, 线程已经结束
"" //TS_DEF_END
};
//-------------------------------------------------------------------
const char * YThreadState::getStateString(typeState nState)
{
return g_lpszThreadState[static_cast<int>(nState)];
}
YThreadBase::YThreadBase(void) : m_nPthreadID(0) {}
int YThreadBase::threadStartImp(ThreadFunc func,void * arg,int nStackSize,int nPriority,typeThreaID * pThreadID,bool boDetach)
{
pthread_attr_t attr;
pthread_attr_init(&attr); // initialize attr with default attributes
pthread_attr_setstacksize (&attr,nStackSize);
if (nPriority != 0)
{
pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
}
int nErrno = pthread_create(pThreadID, &attr, func, arg);
pthread_attr_destroy(&attr);
if (nErrno == 0 && boDetach)
pthread_detach( *pThreadID );
return nErrno;
}
bool YThreadBase::threadDetach(void)
{
return pthread_detach(m_nPthreadID) == 0;
}
void YThreadBase::threadYield(void)
{
pthread_yield();
}
bool YThreadBase::threadJoin(void ** pStatus)
{
return pthread_join(m_nPthreadID, pStatus) == 0;
}
int YThreadBase::threadExit(int nValue)
{
pthread_exit(reinterpret_cast<void *>(nValue));
return nValue;
}
//-------------------------------------------------------------------
void YThread::start(void)
{
startEx();
}
void YThread::startEx(int nStackSize,int nPriority)
{
TAutoLock _au(m_LockState);
int res = YThreadBase::threadStart(threadProxy,this,nStackSize,nPriority,true);
if (res<0)
CERR("Start Thread Error");
pthread_cond_wait(&m_ConditionState, &m_LockState);
}
int YThread::stopEx(void)
{
TAutoLock _au(m_LockState);
if (m_nState == TS_NONE || m_nState == TS_ENDING || m_nState == TS_TOEND)
return m_nResult;
m_nState = TS_TOEND;
beforeStop();
while (m_nState != TS_NONE)
pthread_cond_wait(&m_ConditionState, &m_LockState);
return m_nResult;
}
void YThread::stop(void)
{
TAutoLock _au(m_LockState);
if (m_nState == TS_NONE || m_nState == TS_ENDING || m_nState == TS_TOEND)
return;
m_nState = TS_TOEND;
// beforeStop();
}
ThreadFuncReturnCode YThread::threadProxy(void * arg)
{
YThread * pThread = reinterpret_cast<YThread *>(arg);
pthread_mutex_lock(&pThread->m_LockState);
pThread->m_nState = TS_BUILD;
pthread_cond_signal(&pThread->m_ConditionState);
pthread_mutex_unlock(&pThread->m_LockState);
pThread->m_nResult= 0;
try
{
pThread->m_nResult = pThread->run();
}
catch(std::exception & se)
{
CERR(se.what());
}
catch(...)
{
CERR("unkown exp in thread run");
}
TAutoLock _au(pThread->m_LockState);
pThread->m_nState = TS_NONE;
pthread_cond_signal(&pThread->m_ConditionState);
threadExit(pThread->m_nResult);
return 0;
}线程池类头文件(包括了工作线程)
#ifndef YTHREADPOOL_H
#define YTHREADPOOL_H
#include "ythread.h"
#include <list>
struct MYData
{
std::string strData;
int nNum;
};
class YMYThreadPool;
class YMYThread : public YThread
{
public:
YMYThread(size_t nThreadNum, YMYThreadPool *pParent) :YThread(),m_nThreadNum(nThreadNum) ,m_pParent(pParent)
{
pthread_mutex_init(&m_lockForDataListAndCond, NULL);
pthread_cond_init(&m_condForDataList,NULL);
}
~YMYThread()
{
pthread_mutex_destroy(&m_lockForDataListAndCond);
pthread_cond_destroy(&m_condForDataList);
}
static void initGobal(size_t nParentThreadCount, size_t nDataListLimits);
virtual void beforeStop(void) ;
virtual int run(void);
bool addDataWithoutSigna(const MYData & mydata);
bool addData(const MYData & mydata);
protected:
void setCurrentState(const std::string & strState)
{
m_tBeginTime = time(NULL);
m_strCurrentState = strState;
}
virtual int handleDataList(void);
void dealWithEachData(const MYData & itemdata);
typedef std::list<MYData> typeDataList;
typeDataList m_datalist;
pthread_mutex_t m_lockForDataListAndCond;
pthread_cond_t m_condForDataList;
YMYThreadPool * m_pParent;
size_t m_nThreadNum; //Thread number
time_t m_tBeginTime;
std::string m_strCurrentState;
static size_t g_nDataListSizeLimits;
static size_t g_nParentPoolThreadCount;
};
class YMYThreadPool
{
public:
YMYThreadPool(void);
~YMYThreadPool(void);
void init(size_t nThreadCount,size_t nSunDataLimits);
void start(void);
void stop(void);
YMYThread & getThread(const std::string & strSid);
bool pushMission(const MYData & data);
bool m_boToExit;
private:
YMYThread ** m_ppThreads;
size_t m_nThreadCount;
// TMutexEvent m_AllReadyEvent;
bool m_boInit;
};
#endif // YTHREADPOOL_H线程池类源文件(包括工作线程的具体实现)
#include "ythreadpool.h"
const int nLimitsDefault = 1000;
const int nThreadCountDefault = 1;
size_t YMYThread::g_nDataListSizeLimits = static_cast<size_t>(nLimitsDefault);
size_t YMYThread::g_nParentPoolThreadCount = static_cast<size_t>(nThreadCountDefault);
void YMYThread::initGobal(size_t nPoolThreadCount, size_t nDataListLimits)
{
g_nParentPoolThreadCount = nPoolThreadCount;
g_nDataListSizeLimits = nDataListLimits;
}
void YMYThread::beforeStop(void)
{
TAutoLock au(m_lockForDataListAndCond);
pthread_cond_signal(&m_condForDataList);
}
int YMYThread::run(void)
{
while(!m_pParent->m_boToExit)
{
try
{
setCurrentState( "Wait for mission" );
pthread_mutex_lock(&m_lockForDataListAndCond);
if (m_datalist.empty())
pthread_cond_wait(&m_condForDataList, &m_lockForDataListAndCond);
pthread_mutex_unlock(&m_lockForDataListAndCond);
if ( m_pParent->m_boToExit )
break;
setCurrentState( "Begin handle search missions" );
handleDataList();
}
catch(...)
{
}
}
return int();
}
bool YMYThread::addDataWithoutSigna(const MYData & data)
{
if (m_datalist.size() >= g_nDataListSizeLimits)
return false;
m_datalist.push_back(data);
return true;
}
bool YMYThread::addData(const MYData & data)
{
TAutoLock au(m_lockForDataListAndCond);
if (!addDataWithoutSigna(data))
return false;
pthread_cond_signal(&m_condForDataList);
return true;
}
void YMYThread::dealWithEachData(const MYData & data)
{
std::cout <<"deal each data;"<<std::endl;
std::cout<<"thread num:"<<m_nThreadNum<<", data str:"<<data.strData<<", data num:"<<data.nNum <<std::endl;
}
int YMYThread::handleDataList(void)
{
if ( m_pParent->m_boToExit )
return int();
typeDataList datalist; //list of mission must to do
pthread_mutex_lock(&m_lockForDataListAndCond); //Condition for the mission
datalist.swap( m_datalist );
pthread_mutex_unlock(&m_lockForDataListAndCond);
setCurrentState( "Handling mission." );
typeDataList::iterator it = datalist.begin();
while (it != datalist.end())
{
dealWithEachData(*it);
it++;
}
}
///////////////////////////////////////////////////////////////////////
inline size_t getHashIndex(const std::string & strSID,size_t nThreadCount)
{
std::hash<std::string> hash_fn; //c++11的新特性
size_t nHashValue = hash_fn(strSID);
return nHashValue % nThreadCount;
}
YMYThreadPool::YMYThreadPool():m_ppThreads(NULL),m_nThreadCount(0),
m_boInit(false),m_boToExit(false)
{
}
YMYThreadPool::~YMYThreadPool(void)
{
stop();
}
void YMYThreadPool::start(void)
{
if (!m_boInit)
return;
for (size_t i = 0; i < m_nThreadCount; ++i)
if ( m_ppThreads[i] != NULL )
m_ppThreads[i]->startEx( 2 * 1024 * 1024 );
}
void YMYThreadPool::stop(void)
{
if (m_ppThreads == NULL)
return;
m_boToExit = true;
for (size_t i = 0; i < m_nThreadCount; ++i)
{
if ( m_ppThreads[i] != NULL )
{
m_ppThreads[i]->stopEx();
delete m_ppThreads[i];
}
}
delete []m_ppThreads;
m_ppThreads = NULL;
m_nThreadCount = 0;
}
void YMYThreadPool::init(size_t nThreadCount,size_t nSunDataLimits)
{
YMYThread::initGobal(nThreadCount, nSunDataLimits);
m_nThreadCount = nThreadCount;
m_ppThreads = new YMYThread *[nThreadCount];
for (size_t i = 0 ; i < m_nThreadCount; ++i)
{
m_ppThreads[i] = NULL;
m_ppThreads[i] = new YMYThread(i,this/*,&m_AllReadyEvent*/);
if (m_ppThreads[i] == NULL)
CERR( "Memory not enough for Sub-Thread" );
}
m_boInit = true;
}
bool YMYThreadPool::pushMission(const MYData & data)
{
if (!m_boInit)
return false;
unsigned nIndex = getHashIndex(data.strData, m_nThreadCount);
return m_ppThreads[nIndex]->addData(data);
}
YMYThread & YMYThreadPool::getThread(const std::string & strSid)
{
std::hash<std::string> hash_fn;
size_t nHash = hash_fn(strSid);
return *(m_ppThreads[ nHash % m_nThreadCount ]);
}main函数,注意请用单例模式去创建线程池
#include "ythreadpool.h"
#include <unistd.h>
int main()
{
YMYThreadPool p;
p.init(6, 100);
p.start();
MYData da1, da2;
da1.strData = "ds1";
da1.nNum = 11;
da2.strData = "ds2";
da2.nNum = 22;
sleep(2);
p.pushMission(da1);
p.pushMission(da2);
sleep(2);
p.stop();
return 0;
}原文:http://mingtangduyao.blog.51cto.com/4876371/1893030