菜鸟笔记
提升您的技术认知

面试题

实现一个lru cache 算法lru cache c 三种解法java实现lru算法及编码实现lru策略缓存lru算法常见缓存算法和lru的c 实现设计循环双端队列(deque)lru 缓存结构 (c 哈希双链表实现)lru缓存机制删除单链表中的指定节点linux 内核经典面试题拼多多社招面经:redis是重点,讲一讲redis的内存模型线程、进程、协程的区别c 经典面试题面试官:我们只想要这样的c 工程师linux c/c 学习路线链表操作汇总c 11的智能指针面试题浏览器中输入url后发生的事情常用的限流算法http协议和https协议面试题网络编程面试题目总结c 后台面试题目如何实现lru算法?如何寻找无序数组中的第k大元素?布隆过滤器 - 如何在100个亿url中快速判断某url是否存在?如何实现大整数相加?c 面试题及基本知识点总结c 给定出栈序列判定是否合法消息队列面试题要点redis缓存击穿,失效以及热点keyag真人游戏的解决方案网页在浏览器上的渲染过程几种限流算法lru算法例题c/c 常见面试知识点总结附面试真题----20210529更新引入mq消息队列的作用及其优缺点mysql面试篇社招三年后端面试题60道测开面试题,背完直接涨工资二叉树的层序遍历(两种方法实现)bitmap 海量数据处理字符串倒序输出的五种方法c语言 输入10个数,统计出并输出正数、负数和0的个数字节三面:如何设计一个高并发系统架构,网络 面试36问,ddos攻击原理c 线程池使用 c 11 编写可复用多线程任务池

使用 c 11 编写可复用多线程任务池-ag真人游戏

阅读 : 96

类的功能

  • task (任务基类)
    该类主要实现一个任务类
    virtual int dowork() = 0;

  • taskqueue (任务队列)
    该类主要针对任务的存储、删除、撤回等状态做管理

  • threadpool (线程池)
    整个线程池的核心业务处理类

代码

  • task.h
//任务的基类
#pragma once
#include 
#include 
//任务的基类
class task
{
public:
    //构造、析构函数
    task():_id(_nrequestid  ),_iscancelrequired(false),_createtime(clock()){}
    ~task(){};
    // 任务类虚接口,继承这个类的必须要实现这个接口
    virtual int dowork(void) = 0;
    // 任务已取消回调
    virtual int oncanceled(void)
    {
        return  1;
    }
    // 任务已完成
    virtual int oncompleted(int)
    {
        return 1;
    }
    // 任务是否超时
    virtual bool istimeout(const clock_t& now)
    {
        return ((now - _createtime) > 5000);
    }
    // 获取任务id
    size_t getid(void)
    {
        return _id;
    }
    //获取任务取消状态
    bool iscancelrequired(void)
    {
        return _iscancelrequired;
    }
    //设置任务取消状态
    void setcancelrequired(void)
    {
        _iscancelrequired = true;
    }
protected:
    size_t _id; //任务的唯一标识
    clock_t _createtime;    //任务创建时间,非unix时间戳
private:
    static std::atomic _nrequestid;
    std::atomic _iscancelrequired;    //任务取消状态
};
//selectany可以让我们在.h文件中初始化一个全局变量而不是只能放在.cpp中。
//这样的代码来初始化这个全局变量。既是该.h被多次include,链接器也会为我们剔除多重定义的错误。
__declspec(selectany) std::atomic task::_nrequestid = 100000;
  • taskqueue.h
#pragma once
#include 
#include 
#include 
#include <unordered_map>
#include 
#include 
//任务队列
template
class taskqueue
{
public:
    //向队列的末尾插入任务,task是任务类
    void put_back(std::shared_ptr& task)
    {
        std::unique_lock lock(_mutexqueue);
        _queue.push_back(task);
        _conditput.notify_one();
    }
    //向队列的头部插入任务
    void put_front(std::shared_ptr& task)
    {
        std::unique_lock lock(_mutexqueue);
        _queue.push_front(task);
        _conditput.notify_one();
    }
    //获取队首(并将任务加到运行任务列表中),返回tase是任务类
    std::shared_ptr get(void) {
        std::unique_lock lock(_mutexqueue);
        if (_queue.empty())
            return nullptr;
        //lock_guard取代了mutex的lock()和unlock();
        std::lock_guard lock_doing_task(_mutexdoingtask);
        std::shared_ptr& task = _queue.front();
        _mapdoingtask.insert(std::make_pair(task->getid(), task));
        _queue.pop_front();
        return task;
    }
    //获取双向链表queue的大小
    size_t size(void)
    {
        std::unique_lock lock(_mutexqueue);
        return _queue.size();
    }
    //释放队列
    void release(void)
    {
        deletealltasks();
        _conditput.notify_all();
    }
    //删除任务(从就绪队列删除,如果就绪队列没有,则看执行队列有没有,有的话置下取消状态位)
    int deletetask(size_t nid)
    {
        std::unique_lock lock(_mutexqueue, std::defer_lock);
        lock.lock();
        auto it = _queue.begin();
        for (; it != _queue.end();   it) 
        {
            if ((*it)->getid() == nid) 
            {
                _queue.erase(it);
                lock.unlock();
                return 0;
            }
        }
        //下面的逻辑可能会造成死锁,这里要及时释放
        lock.unlock();
        // 试图取消正在执行的任务
        {
            std::lock_guard lock_doing_task(_mutexdoingtask);
            auto it_map = _mapdoingtask.find(nid);
            if (it_map != _mapdoingtask.end())
                it_map->second->setcancelrequired();
        }
        //任务执行完后再返回
        while (_mapdoingtask.count(nid))
            std::this_thread::sleep_for(std::chrono::milliseconds(20));
        return 0;
    }
    //删除所有任务
    int deletealltasks(void)
    {
        std::unique_lock lock(_mutexqueue, std::defer_lock);
        lock.lock();
        if (!_queue.empty())
            _queue.clear();//清空
        {
            std::lock_guard lock_doing_task(_mutexdoingtask);
            if (!_mapdoingtask.empty()) 
            {
                auto it_map = _mapdoingtask.begin();
                for (; it_map != _mapdoingtask.end();   it_map)
                    it_map->second->setcancelrequired();
            }
        }
        lock.unlock();
        //任务执行完后再返回
        while (!_mapdoingtask.empty())
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
        return 0;
    }
    //任务完成回调(从运行列表中删除指定任务)
    int ontaskfinished(size_t nid)
    {
        std::lock_guard lock_doing_task(_mutexdoingtask);
        auto it_map = _mapdoingtask.find(nid);
        if (it_map != _mapdoingtask.end())
            _mapdoingtask.erase(it_map);
        return 0;
    }
    //判断任务是否执行完毕
    std::shared_ptr istaskprocessed(size_t nid)
    {
        std::lock_guard lock_queue(_mutexqueue);
        auto it = _queue.begin();
        for (; it != _queue.end();   it) {
            if ((*it)->getid() == nid)
                return *it;
        }
        std::lock_guard lock_doing_task(_mutexdoingtask);
        auto it_map = _mapdoingtask.find(nid);
        if (it_map != _mapdoingtask.end())
            return it_map->second;
        return nullptr;
    }
    //等待有任务到达(带超时:超时自动唤醒)
    bool wait(std::chrono::milliseconds millsec)
    {
        std::unique_lock lock(_mutexconditput);
        _conditput.wait_for(lock, millsec);
        return true;
    }
private:
    //就绪的任务
    std::mutex _mutexqueue;
    std::deque> _queue;
    //条件变量
    std::mutex _mutexconditput;
    std::condition_variable _conditput;
    //运行的任务
    std::mutex _mutexdoingtask;
    std::unordered_map > _mapdoingtask;
};
  • threadpool.h
#pragma once
#include 
#include 
#include 
#include 
#include 
#include "task.h"
#include "taskqueue.h"
class threadpool
{
public:
    // 线程池配置参数
    typedef struct tagthreadpoolconfig {
        int nmaxthreadsnum; // 最大线程数量
        int nminthreadsnum; // 最小线程数量
        double dbtaskaddthreadrate; // 增 最大线程任务比 (任务数量与线程数量,什么比例的时候才加)
        double dbtasksubthreadrate; // 减 最小线程任务比 (任务数量与线程数量,什么比例的时候才减)
    } threadpoolconfig;
public:
    //构造函数
    threadpool(void):_taskqueue(new taskqueue()), _atccurtotalthrnum(0), _atcworking(true){}
    //析构函数
    ~threadpool(void)
    {
        release();
    }
    //初始化资源
    int init(const threadpoolconfig& threadpoolconfig) {
        // 错误的设置
        if (threadpoolconfig.dbtaskaddthreadrate < threadpoolconfig.dbtasksubthreadrate)
            return 87;
        _threadpoolconfig.nmaxthreadsnum = threadpoolconfig.nmaxthreadsnum;
        _threadpoolconfig.nminthreadsnum = threadpoolconfig.nminthreadsnum;
        _threadpoolconfig.dbtaskaddthreadrate = threadpoolconfig.dbtaskaddthreadrate;
        _threadpoolconfig.dbtasksubthreadrate = threadpoolconfig.dbtasksubthreadrate;
        int ret = 0;
        // 创建线程池
        if (_threadpoolconfig.nminthreadsnum > 0)
            ret = addprothreads(_threadpoolconfig.nminthreadsnum);
        return ret;
    }
    // 添加任务
    int addtask(std::shared_ptr taskptr, bool priority=false)
    {
        const double& rate = getthreadtaskrate();
        int ret = 0;
        if (priority) 
        {
            if (rate > 1000)
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
            _taskqueue->put_front(taskptr);
        }
        else 
        {
            // 检测任务数量
            if (rate > 100) {
                taskptr->oncanceled();
                return 298;
            }
            // 将任务推入队列
            _taskqueue->put_back(taskptr);
        }
        // 检查是否要扩展线程
        if (_atccurtotalthrnum < _threadpoolconfig.nmaxthreadsnum
            && rate > _threadpoolconfig.dbtaskaddthreadrate)
            ret = addprothreads(1);
        return ret;
    }
    // 删除任务(从就绪队列删除,如果就绪队列没有,则看执行队列有没有,有的话置下取消状态位)
    int deletetask(size_t nid)
    {
        return _taskqueue->deletetask(nid);
    }
    // 删除所有任务
    int deletealltasks(void)
    {
        return _taskqueue->deletealltasks();
    }
    std::shared_ptr istaskprocessed(size_t nid)
    {
        return _taskqueue->istaskprocessed(nid);
    }
    // 释放资源(释放线程池、释放任务队列)
    bool release(void)
    {
        // 1、停止线程池。
        // 2、清楚就绪队列。
        // 3、等待执行队列为0
        releasethreadpool();
        _taskqueue->release();
        int i = 0;
        while (_atccurtotalthrnum != 0) 
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            // 异常等待
            if (i   == 10)
                exit(23);
        }
        _atccurtotalthrnum = 0;
        return true;
    }
    // 获取当前线程任务比
    double getthreadtaskrate(void)
    {
        if (_atccurtotalthrnum != 0)
            return _taskqueue->size() * 1.0 / _atccurtotalthrnum;
        return 0;
    }
    // 当前线程是否需要结束
    bool shouldend(void)
    {
        bool bflag = false;
        double dbthreadtaskrate = getthreadtaskrate();
        // 检查线程与任务比率
        if (!_atcworking || _atccurtotalthrnum > _threadpoolconfig.nminthreadsnum
            && dbthreadtaskrate < _threadpoolconfig.dbtasksubthreadrate)
            bflag = true;
        return bflag;
    }
    // 释放线程池
    bool releasethreadpool(void)
    {
        _threadpoolconfig.nminthreadsnum = 0;
        _threadpoolconfig.dbtasksubthreadrate = 0;
        _atcworking = false;
        return true;
    }
    // 添加指定数量的处理线程
    int addprothreads(int nthreadsnum)
    {
        try {
            for (; nthreadsnum > 0; --nthreadsnum)
                std::thread(&threadpool::taskprocessthread, this).detach();
        }
        catch (...){
            return 155;
        }
        return 0;
    }
    // 任务处理线程函数
    void taskprocessthread(void)
    {
        int ntaskprocret = 0;
        // 线程增加
        _atccurtotalthrnum.fetch_add(1);
        std::chrono::milliseconds mills_sleep(500);
        std::shared_ptr ptask;
        while (_atcworking) 
        {
            // 从任务队列中获取任务
            ptask = _taskqueue->get();  //get会将任务添加到运行任务的map中去
            if (ptask == nullptr) 
            {
                if (shouldend())
                    break;
                // 进入睡眠池
                _taskqueue->wait(mills_sleep);
                continue;
            }
            // 检测任务取消状态
            if (ptask->iscancelrequired())
                ptask->oncanceled();
            else
                // 处理任务
                ptask->oncompleted(ptask->dowork());
            // 从运行任务队列中移除任务
            _taskqueue->ontaskfinished(ptask->getid());
            // 判断线程是否需要结束
            if (shouldend())
                break;
        }
        // 线程个数减一
        _atccurtotalthrnum.fetch_sub(1);
    }
private:
    std::shared_ptr > _taskqueue;   //任务队列
    threadpoolconfig _threadpoolconfig; //线程池配置
    std::atomic _atcworking;  //线程池是否被要求结束
    std::atomic _atccurtotalthrnum;    //当前线程个数
};
  • funtask.h
#pragma once
#include 
#include "task.h"
class functask:public task
{
public:
    functask(std::function f) : _pf(f) {}
    functask(void) : _pf(nullptr){}
    virtual ~functask(){}
    template 
    void asynbind(f(*f)(args...), args... args)
    {
        _pf = std::bind(f, args...);
    }
    virtual int dowork()
    {
        if (_pf == nullptr)
            return 86;
        return _pf();
    }
private:
    typedef std::function pvfunc;
    pvfunc _pf;
};
  • main.cpp
#pragma once
#include 
#include 
#include 
#include 
#include "threadpool.h"
#include "functask.h"
using namespace std;
int vfunction(void)
{
    std::cout << __function__ << std::endl;
    return 0;
}
int counter(int a,int b)
{
    std::cout << a << ":" << b << std::endl;
    return 0;
}
int main()
{
    threadpool::threadpoolconfig threadpoolconfig;
    threadpoolconfig.nmaxthreadsnum = 100;
    threadpoolconfig.nminthreadsnum = 5;
    threadpoolconfig.dbtaskaddthreadrate = 3;
    threadpoolconfig.dbtasksubthreadrate = 0.5;
    clock_t start = clock();
    {
        std::shared_ptr threadpool(new threadpool);
        threadpool->init(threadpoolconfig);
        int i = 1;
        while (true)
        {
            /*std::shared_ptr request(new functask(vfunction));
            threadpool->addtask(request);*/
            std::shared_ptr request(new functask);
            request->asynbind(counter, i  , 1);
            threadpool->addtask(request);
            if (request->getid() == 110000) {
                break;
            }
        }
        threadpool->release();
    }
    clock_t finish = clock();
    std::cout << "duration:" << finish - start << "ms" << std::endl;
    cout << "main:thread" << endl;
    return 0;
}
网站地图