我可以使用std :: async而不用等待将来的限制吗?

高水平
我想在异步模式下调用一些没有返回值的函数,而不用等待它们完成。 如果我使用std :: async,那么在任务结束之前未来的对象不会被破坏,这会导致呼叫在我的情况下不同步。

void sendMail(const std::string& address, const std::string& message)
{
    //sending the e-mail which takes some time...
}

myResonseType processRequest(args...)
{
    //Do some processing and valuate the address and the message...

    //Sending the e-mail async
    auto f = std::async(std::launch::async, sendMail, address, message);

    //returning the response ASAP to the client
    return myResponseType;

} //<-- I'm stuck here until the async call finish to allow f to be destructed.
  // gaining no benefit from the async call.

我的问题是

  • 有没有办法克服这个限制?
  • 如果(1)不是,我是否应该实施一个线程,将采取这些“僵尸”期货,并等待他们?
  • 是(1)和(2)是否,是否有其他选择,然后建立我自己的线程池?
  • 注意:
    我宁愿不使用thread + detach选项(由@ galop1n建议),因为创建一个新的线程有我想避免的开销。 虽然使用std :: async(至少在MSVC上)正在使用内部线程池。

    谢谢。


    您可以将未来移动到全局对象中,因此当本地未来的析构函数运行时,它不必等待异步线程完成。

    std::vector<std::future<void>> pending_futures;
    
    myResonseType processRequest(args...)
    {
        //Do some processing and valuate the address and the message...
    
        //Sending the e-mail async
        auto f = std::async(std::launch::async, sendMail, address, message);
    
        // transfer the future's shared state to a longer-lived future
        pending_futures.push_back(std::move(f));
    
        //returning the response ASAP to the client
        return myResponseType;
    
    }
    

    注意如果异步线程引用了processRequest函数中的任何局部变量,则这并不安全。

    虽然使用std::async (至少在MSVC上)正在使用内部线程池。

    这实际上是不符合的,标准明确指出,使用std::launch::async运行的任务必须像新线程一样运行,所以任何线程局部变量都不能从一个任务持续到另一个任务。 但它通常不重要。


    如果你不在乎加入,你为什么不开始一个线程并分离?

    std::thread{ sendMail, address, message}.detach();   
    

    std :: async被绑定到std :: future的生命期,它返回并且它们是没有其他选择的。

    将std :: future放入由其他线程读取的等待队列中将需要与接收新任务的池相同的安全机制,例如容器周围的互斥体。

    那么你最好的选择就是一个线程池,它消耗直接在线程安全队列中推送的任务。 这并不取决于具体的实施。

    在一个线程池实现采取任何可调用的参数,线程在队列上进行极化,更好的实现应该使用条件变量(coliru):

    #include <iostream>
    #include <queue>
    #include <memory>
    #include <thread>
    #include <mutex>
    #include <functional>
    #include <string>
    
    struct ThreadPool {
        struct Task {
            virtual void Run() const = 0;
            virtual ~Task() {};
        };   
    
        template < typename task_, typename... args_ >
        struct RealTask : public Task {
            RealTask( task_&& task, args_&&... args ) : fun_( std::bind( std::forward<task_>(task), std::forward<args_>(args)... ) ) {}
            void Run() const override {
                fun_();
            }
        private:
            decltype( std::bind(std::declval<task_>(), std::declval<args_>()... ) ) fun_;
        };
    
        template < typename task_, typename... args_ >
        void AddTask( task_&& task, args_&&... args ) {
            auto lock = std::unique_lock<std::mutex>{mtx_};
            using FinalTask = RealTask<task_, args_... >;
            q_.push( std::unique_ptr<Task>( new FinalTask( std::forward<task_>(task), std::forward<args_>(args)... ) ) );
        }
    
        ThreadPool() {
            for( auto & t : pool_ )
                t = std::thread( [=] {
                    while ( true ) {
                        std::unique_ptr<Task> task;
                        {
                            auto lock = std::unique_lock<std::mutex>{mtx_};
                            if ( q_.empty() && stop_ ) 
                                break;
                            if ( q_.empty() )
                                continue;
                            task = std::move(q_.front());
                            q_.pop();
                        }
                        if (task)
                            task->Run();
                    }
                } );
        }
        ~ThreadPool() {
            {
                auto lock = std::unique_lock<std::mutex>{mtx_};
                stop_ = true;
            }
            for( auto & t : pool_ )
                t.join();
        }
    private:
        std::queue<std::unique_ptr<Task>> q_;
        std::thread pool_[8]; 
        std::mutex mtx_;
        volatile bool stop_ {};
    };
    
    void foo( int a, int b ) {
        std::cout << a << "." << b;
    }
    void bar( std::string const & s) {
        std::cout << s;
    }
    
    int main() {
        ThreadPool pool;
        for( int i{}; i!=42; ++i ) {
            pool.AddTask( foo, 3, 14 );    
            pool.AddTask( bar, " - " );    
        }
    }
    

    您可以将其移入异步调用函数的本地范围 ,而不是将未来移动到全局对象中 (并手动管理未使用的期货的删除)。

    “让异步功能走自己的未来”,可以这么说。

    我想出了这个适用于我的模板包装器(在Windows上测试过):

    #include <future>
    
    template<class Function, class... Args>
    void async_wrapper(Function&& f, Args&&... args, std::future<void>& future,
                       std::future<void>&& is_valid, std::promise<void>&& is_moved) {
        is_valid.wait(); // Wait until the return value of std::async is written to "future"
        auto our_future = std::move(future); // Move "future" to a local variable
        is_moved.set_value(); // Only now we can leave void_async in the main thread
    
        // This is also used by std::async so that member function pointers work transparently
        auto functor = std::bind(f, std::forward<Args>(args)...);
        functor();
    }
    
    template<class Function, class... Args> // This is what you call instead of std::async
    void void_async(Function&& f, Args&&... args) {
        std::future<void> future; // This is for std::async return value
        // This is for our synchronization of moving "future" between threads
        std::promise<void> valid;
        std::promise<void> is_moved;
        auto valid_future = valid.get_future();
        auto moved_future = is_moved.get_future();
    
        // Here we pass "future" as a reference, so that async_wrapper
        // can later work with std::async's return value
        future = std::async(
            async_wrapper<Function, Args...>,
            std::forward<Function>(f), std::forward<Args>(args)...,
            std::ref(future), std::move(valid_future), std::move(is_moved)
        );
        valid.set_value(); // Unblock async_wrapper waiting for "future" to become valid
        moved_future.wait(); // Wait for "future" to actually be moved
    }
    

    我有点惊讶它的工作原理,因为我认为移动的未来的析构函数将阻塞,直到我们离开async_wrapper。 它应该等待async_wrapper返回,但它正在等待这个函数。 从逻辑上讲,它应该是一个僵局,但事实并非如此。

    我还尝试在async_wrapper的末尾添加一行以手动清空未来对象:

    our_future = std::future<void>();
    

    这也不会阻止。

    链接地址: http://www.djcxy.com/p/30811.html

    上一篇: Can I use std::async without waiting for the future limitation?

    下一篇: How and When to use `async` and `await`