菜鸟笔记
提升您的技术认知

可伸缩多线程任务队列-ag真人游戏

  在我们的工作中,我们经常需要异步执行一些任务,下面介绍的这个可伸缩多线程队列,可满足我们的需求。

  主要有以下几个功能:

    1、任务队列是多线程,许多任务可以异步进行,任务队列使用线程池来执行任务。

    2、任务队列支持优先级,优先级高的任务优先执行(即使是后来添加的)

    3、任务队列可以被暂停,但是用户还是可以添加任务,当任务队列被唤醒时,任务可以继续执行下去

    4、在运行过程中,任务队列使用的线程池,用户可以自行增加和减少

  大体框架主要由3个类构成

    1、cjob,任务类,用户需要从该类派生来实现自身需要完成的任务

    2、cjobexecuter,任务执行类,任务均由该类来调用执行,每一个类相当于对应一个线程

    3、cmthreadedjobq,多线程任务队列,添加任务已经任务的分发均由该类完成,该类维护一个任务队列和一个完成队列的线程池。

  类图如下:

  该例子中,cjobexecuter和cmthreadjobq这两个类的调用关系是非常值得我们学习的,同时,cjob作为一个基类,子类派生可以实现不同的任务,可扩展性也不错。源代码解析如下:

  job.h文件:

class cjob
{
public:
    cjob();
    virtual ~cjob();
    
    bool m_completed;         //任务是否完成:true 完成,false 未完成
    static long lastusedid;   //最后的id
    
    //================================================================================================
    //函数名:                  setpriority
    //函数描述:                设置任务优先级
    //输入:                    [in] priority 优先级别
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void setpriority(int priority);
    //================================================================================================
    //函数名:                  getpriority
    //函数描述:                返回任务优先级
    //输入:                    无
    //输出:                    无
    //返回:                    任务优先级
    //================================================================================================
    int getpriority();
    
    //================================================================================================
    //函数名:                  getid
    //函数描述:                返回任务id
    //输入:                    无
    //输出:                    无
    //返回:                    任务id
    //================================================================================================
    long getid();
    
    //================================================================================================
    //函数名:                  setautodelete
    //函数描述:                设置完成任务后是否删除任务
    //输入:                    [in] autodeleteflag
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void setautodelete(bool autodeleteflag = true);
    //================================================================================================
    //函数名:                  autodelete
    //函数描述:                返回删除任务标记
    //输入:                    无
    //输出:                    无
    //返回:                    任务标记
    //================================================================================================
    bool autodelete();
    //================================================================================================
    //函数名:                  execute
    //函数描述:                任务真正工作的函数,纯虚函数,需要子类化实现
    //输入:                    无
    //输出:                    无
    //返回:                    任务id
    //================================================================================================
    virtual void execute() = 0;    
private:
    long m_id;               //任务id
    bool m_autodeleteflag;   //是否自动删除任务标记,true 删除,false 不删除,默认为true
    int m_priority;          //任务优先级,默认为5

};

  job.cpp文件:

long cjob::lastusedid = 0;
cjob::cjob()
{
    this->m_id = interlockedincrement(&lastusedid);
    this->m_autodeleteflag = true;
    this->m_priority = 5;
    this->m_completed= false;
}
cjob::~cjob()
{
}
bool cjob::autodelete()
{
    return m_autodeleteflag;
}
void cjob::setautodelete(bool autodeleteflag)
{
    m_autodeleteflag = autodeleteflag;
}
long cjob::getid()
{
    return this->m_id;
}
int cjob::getpriority()
{
    return this->m_priority;    
}
void cjob::setpriority(int priority)
{
    this->m_priority = priority;
}

  jobexecuter.h文件:

//一个对象对应一个线程,执行任务job
class cjobexecuter
{
public:
    cjobexecuter(cmthreadedjobq *pjobq);
    virtual ~cjobexecuter();
    
    //================================================================================================
    //函数名:                  stop
    //函数描述:                停止执行任务
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void stop();
    
    //================================================================================================
    //函数名:                  execute
    //函数描述:                执行一个任务
    //输入:                    [in] pjob 任务指针
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void execute(cjob* pjob);
    
    static uint threadfunction(lpvoid pparam); //线程函数
    
    cmthreadedjobq* m_pjobq;                   //指向线程任务队列指针
    cjob* m_pjob2do;                           //指向正在执行任务的指针
    int m_flag;                                //线程执行标记
    cwinthread* m_pexecuterthread;             //线程标识符
};

  jobexecuter.cpp文件:

#define stop_working -1
#define keep_working  0
cjobexecuter::cjobexecuter(cmthreadedjobq *pjobq)
{
    this->m_pjobq= pjobq;
    this->m_pexecuterthread= afxbeginthread(threadfunction,this);
    this->m_pjob2do = null;
    this->m_flag = keep_working;
}
cjobexecuter::~cjobexecuter()
{
    if(this->m_pexecuterthread!= null )    
    {
        this->m_pexecuterthread->exitinstance();
        delete m_pexecuterthread;    
    }
}
uint cjobexecuter::threadfunction(lpvoid pparam)
{    
    cjobexecuter *pexecuter = (cjobexecuter *)pparam;
    pexecuter->m_flag = 1;
    ::sleep(1);
    csinglelock singlelock(&pexecuter->m_pjobq->m_cs);        
    while(pexecuter->m_flag !=stop_working )
    {
        if(pexecuter->m_pjob2do!=  null)
        {
            pexecuter->m_pjob2do->execute();
            pexecuter->m_pjob2do->m_completed = true;    
            if(pexecuter->m_pjob2do->autodelete())
                delete pexecuter->m_pjob2do;
            pexecuter->m_pjob2do = null;
        }
        if(pexecuter->m_pjobq == null) break;
        
        csinglelock singlelock(&pexecuter->m_pjobq->m_cs);        
        singlelock.lock();
        if(pexecuter->m_pjobq->getnoofexecuter() > pexecuter->m_pjobq->getmaxnoofexecuter()) //cjobexecuter个数大于最大值,自动销毁
        {
            pexecuter->stop();    
            singlelock.unlock();    
        }
        else
        {
            pexecuter->m_pjobq->addfreejobexecuter(pexecuter);      //完成任务后,添加到cmthreadedjobq的空闲队列中
            singlelock.unlock();    
            pexecuter->m_pjobq->m_pobserverthread->resumethread();        
            pexecuter->m_pexecuterthread->suspendthread();        
        }                
    }
    
    if(pexecuter->m_pjobq != null)
    {
        pexecuter->m_pjobq->deletejobexecuter(pexecuter);
    }
    else
    {
        delete pexecuter;
    }
    return 0;
}
void cjobexecuter::execute(cjob* pjob)
{
    this->m_pjob2do = pjob;
    ::sleep(0);
    this->m_pexecuterthread->resumethread();
}
void cjobexecuter::stop()
{
    this->m_flag = stop_working;
    this->m_pexecuterthread->resumethread();
}

  mthreadedjobq.h文件:

typedef ctypedptrlist< cptrlist ,cjob*>cjobqlist;
//线程池任务队列
class cmthreadedjobq
{
public:
    typedef struct thnode
    {
        cjobexecuter* pexecuter;
        thnode * pnext ;
    } thnode;
    
    cmthreadedjobq();
    virtual ~cmthreadedjobq();
    //================================================================================================
    //函数名:                  deletejobexecuter
    //函数描述:                删除一个jobexecuter对象
    //输入:                    [in] pex
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void deletejobexecuter(cjobexecuter *pex);
    
    //================================================================================================
    //函数名:                  setmaxnoofexecuter
    //函数描述:                设置cjobexecuter的个数
    //输入:                    [in] value
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void setmaxnoofexecuter(int value);
    //================================================================================================
    //函数名:                  addjobexecuter
    //函数描述:                添加一个cjobexecuter
    //输入:                    [in] pex
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void addjobexecuter(cjobexecuter *pex);
    
    //================================================================================================
    //函数名:                  getjobexecuter
    //函数描述:                返回一个cjobexecuter
    //输入:                    无
    //输出:                    无
    //返回:                    处理任务的指针
    //================================================================================================
    cjobexecuter* getjobexecuter();
    //================================================================================================
    //函数名:                  addfreejobexecuter
    //函数描述:                添加一个cjobexecuter
    //输入:                    [in] pex
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void addfreejobexecuter(cjobexecuter *pex);
    //================================================================================================
    //函数名:                  addjob
    //函数描述:                添加一个任务
    //输入:                    [in] pjob
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void addjob(cjob *pjob);
    
    //================================================================================================
    //函数名:                  getmaxnoofexecuter
    //函数描述:                获取cjobexecuter个数的最大值
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    int getmaxnoofexecuter();
    
    //================================================================================================
    //函数名:                  getnoofexecuter
    //函数描述:                获取当前cjobexecuter的个数
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    int getnoofexecuter();
    static uint jobobserverthreadfunction(lpvoid);
    //================================================================================================
    //函数名:                  pause
    //函数描述:                挂起jobobserverthread线程
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void pause();
    //================================================================================================
    //函数名:                  resume
    //函数描述:                唤醒jobobserverthread线程
    //输入:                    无
    //输出:                    无
    //返回:                    无
    //================================================================================================
    void resume();    
        
    cwinthread* m_pobserverthread; //向空闲的executer线程添加任务的线程
    ccriticalsection m_cs;         //关键代码段,用于互斥
    cjobqlist m_jobqlist;          //任务队列
private :
    bool m_pause;                  //jobobserverthread线程运行标记
    int m_maxnoofexecuter;         //cjobexecuter最大个数
    int m_noofexecuter;            //当前cjobexecuter个数
    thnode* m_pfreeelist;          //维护空闲处理任务线程的队列
    thnode* m_pallelist;           //维护所有处理任务线程的队列
};

  mthreadedjobq.cpp文件:

cmthreadedjobq::cmthreadedjobq()
{
    m_maxnoofexecuter = 2;
    m_pause = false;
    m_pobserverthread = afxbeginthread(jobobserverthreadfunction,this);
    m_pfreeelist =null;
    m_noofexecuter =0;
    m_pallelist = null;
}
cmthreadedjobq::~cmthreadedjobq()
{
    thnode* ptempnode;
    while (m_pallelist != null) 
    {    
        ptempnode = m_pallelist->pnext;
        delete m_pallelist->pexecuter;        
        delete m_pallelist;        
        m_pallelist = ptempnode;    
    }    
    while (m_pfreeelist != null) 
    {    ptempnode = m_pfreeelist->pnext;        
        delete m_pfreeelist;        
        m_pfreeelist = ptempnode;    
    }    
    m_pobserverthread->exitinstance();    
    delete m_pobserverthread;
}
void cmthreadedjobq::pause()
{
    this->m_pause = true;
}
void cmthreadedjobq::resume()
{
    this->m_pause = false;
    this->m_pobserverthread->resumethread();
}
uint cmthreadedjobq::jobobserverthreadfunction(lpvoid pparam)
{
    cmthreadedjobq *pmtjq = (cmthreadedjobq *)pparam;
    cjobexecuter *pjexecuter;
    while(true)
    {
        sleep(100);
        if(pmtjq->m_pause != true)
        {
            while(!pmtjq->m_jobqlist.isempty() )
            {
                pjexecuter = pmtjq->getjobexecuter();
                if( pjexecuter!=null)
                {                
                    pmtjq->m_cs.lock();
                    pjexecuter->execute(pmtjq->m_jobqlist.gethead());
                    pmtjq->m_jobqlist.removehead();
                    afxgetapp()->m_pmainwnd->postmessage(refresh_list);
                    pmtjq->m_cs.unlock();
                }
                else
                {
                    break;
                }
                if(pmtjq->m_pause == true)
                    break;
            }
        }
        pmtjq->m_pobserverthread->suspendthread();
    }
    return 0;
}
int cmthreadedjobq::getnoofexecuter()
{
    return this->m_noofexecuter;
}
int cmthreadedjobq::getmaxnoofexecuter()
{
    return this->m_maxnoofexecuter;
}
void cmthreadedjobq::addjob(cjob *pjob)
{
    cjob * ptempjob;
    csinglelock slock(&this->m_cs);
    slock.lock();    
    position pos,lastpos;
    pos = this->m_jobqlist.getheadposition();    
    lastpos = pos;
    if(pos != null)
        ptempjob =this->m_jobqlist.gethead();
    while(pos != null )
    {        
        if( pjob->getpriority() > ptempjob->getpriority())
            break;
        lastpos = pos;
        ptempjob =     this->m_jobqlist.getnext(pos);        
    }    
    if(pos == null)    
        this->m_jobqlist.addtail(pjob);
    else
        this->m_jobqlist.insertbefore(lastpos,pjob);
    this->m_pobserverthread->resumethread();
    slock.unlock();
}
void cmthreadedjobq::addfreejobexecuter(cjobexecuter *pex)
{
    m_cs.lock();
    thnode* node = new thnode;
    node->pexecuter = pex;
    node->pnext = this->m_pfreeelist;
    this->m_pfreeelist = node;
    m_cs.unlock();
}
cjobexecuter* cmthreadedjobq::getjobexecuter()
{
    thnode *ptemp;
    cjobexecuter *pex=null;
    m_cs.lock();
    if(this->m_pfreeelist != null)  //有空闲cjobexecuter,就返回
    {
        ptemp = this->m_pfreeelist;
        this->m_pfreeelist = this->m_pfreeelist->pnext;
        pex = ptemp->pexecuter;
        delete ptemp ;
        m_cs.unlock();
        return pex;
    }
    if(this->m_noofexecuter < this->m_maxnoofexecuter) //没有空闲cjobexecuter,并且当前cjobexecuter小于最大值,就生成一个新的cjobexecuter
    {
        pex =  new cjobexecuter(this);
        this->addjobexecuter(pex);
        this->m_noofexecuter  ;
        m_cs.unlock();
        return pex;
    }
    m_cs.unlock();
    return null;
}
void cmthreadedjobq::addjobexecuter(cjobexecuter *pex)
{
    m_cs.lock();
    thnode* node = new thnode;
    node->pexecuter= pex;
    node->pnext = this->m_pallelist;
    this->m_pallelist = node;
    m_cs.unlock();
}
void cmthreadedjobq::setmaxnoofexecuter(int value)
{
    this->m_cs.lock();
    if(value >1 && value <11)
        this->m_maxnoofexecuter = value;
    m_pobserverthread->resumethread();
    this->m_cs.unlock();
}
void cmthreadedjobq::deletejobexecuter(cjobexecuter *pex)
{
    thnode* pnode,*pnodep;
    csinglelock singlelock(&m_cs);    
    singlelock.lock();    
    if(this->m_pallelist != null)
    {
        pnode = this->m_pallelist;
        if(pnode->pexecuter == pex )    
        {
          this->m_pallelist = pnode->pnext;
          delete pnode;          
        }
        else
        {
            pnodep =pnode;
            pnode  = pnode->pnext ;            
            while(pnode != null )
            {
                if(pnode->pexecuter== pex ) break;
                pnodep = pnode;
                pnode  = pnode->pnext ;            
            }
            if(pnode!= null)
            {
                pnodep->pnext = pnode->pnext;
                delete pnode;
            }
        }
    }
    this->m_noofexecuter--;
    singlelock.unlock();
    pex->stop();
    sleep(1);
    delete pex;
}

  以上,就是该可伸缩多线程任务的主体框架,当我们工作需要实现类似这样的需要:异步执行多个不同的任务时,这个例子就是一个很好的参考例子,我研究这些代码只是为了让我在遇到这种问题的时候,可以有一个思路去思考,而不至于无从下手,仅此而已。

网站地图