使用现代化C++实现线程池的基础上优化。

class ThreadPool {
private:
    // 任务的抽象基类
    class ITask {
    public:
        virtual ~ITask() = default;
        virtual void execute() = 0;
    };

    // 使用模板派生类来包装任意类型的任务
    template<typename F>
    class Task : public ITask {
    private:
        F func_;
    public:
        explicit Task(F&& func) : func_(std::forward<F>(func)) {}
        void execute() override {
            func_();
        }
    };

    std::atomic<bool> shutdown_;
    BlockingQueue<std::unique_ptr<ITask>> queue_; // 队列存储任务的智能指针
    std::vector<std::thread> threads_;

    void worker_thread() {
        while (!shutdown_) {
            auto task = queue_.pop(); // 从队列中获取任务
            if (task) {
                task->execute();
            }
        }
    }

public:
    explicit ThreadPool(size_t nThreads = std::thread::hardware_concurrency() * 2)
        : shutdown_(false) {
        if (nThreads == 0) {
            nThreads = 8;
        }
        threads_.reserve(nThreads);
        for (size_t i = 0; i < nThreads; ++i) {
            threads_.emplace_back(&ThreadPool::worker_thread, this);
        }
    }

    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

    ~ThreadPool() {
        shutdown();
    }

    void shutdown() {
        if (shutdown_.exchange(true)) {
            return;
        }
        // 为了唤醒所有可能在 pop 等待的线程,需要向队列中放入足够多的空任务
        for (size_t i = 0; i < threads_.size(); ++i) {
            queue_.push(nullptr);
        }
        for (auto& thread : threads_) {
            if (thread.joinable()) {
                thread.join();
            }
        }
    }

    template<typename F, typename... Args>
    auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>> {
        // 将函数和参数绑定
        auto bound_task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);

        // 使用 packaged_task 来获取 future
        using ResultType = std::invoke_result_t<F, Args...>;
        using PackagedTaskType = std::packaged_task<ResultType()>;

        auto task_ptr = std::make_shared<PackagedTaskType>(std::move(bound_task));

        // 从 packaged_task 获取 future
        std::future<ResultType> future = task_ptr->get_future();

        // 将 packaged_task 包装到自定义任务类中
        // 使用 lambda 捕获智能指针,避免直接拷贝 packaged_task
        auto wrapper_func = [task_ptr]() {
            (*task_ptr)();
        };

        // 创建自定义任务并放入队列
        queue_.push(std::make_unique<Task<decltype(wrapper_func)>>(std::move(wrapper_func)));

        return future;
    }
};

文章作者: Liccsu
本文链接:
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Liccsu's blog
喜欢就支持一下吧
打赏
微信 微信