service with uncompleted completion handlers

I wrote an asynchronous boost::asio TCP application that uses a pool of threads declared as std::vector<std::thread> mIOServicePool . These threads asynchronously read and write TCP data to a server. The following code is taken from the GUI's start push button event handler.

// launch multiple asio service threads to
// handle the protocol instances - effectively
// thread pooling the ioservice
//mpIOService->reset();
for (auto i=0; i<3; i++) {
mIOServicePool.emplace_back(
    std::thread([this]() { mpIOService->run(); }));
}

The code is part of a Qt based GUI application with the mIOServicePool stored as a member of the GUI's mainwindow class.

This works fine when I start the application and leave it running, however, while attempting to restart the connection to the back end server, things start to go wrong. The problem is most likely related to uncompleted handlers which I thought would have been flushed when I reset the io_service::work associated with the io_service (when the GUI stop button is pressed). The problem manifests itself when I attempt to start the TCP communications (on windows at least) via an access violation while reading memory asio::socket's stream buffer. As you can see from the stack trace below, it is handling a completion handler associated with the read socket.

app739.exe!boost::asio::basic_streambuf<std::allocator<char> >::commit(unsigned __int64 n) Line 226 C++
app739.exe!boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >::operator()(const boost::system::error_code & ec, unsigned __int64 bytes_transferred, int start) Line 624    C++
app739.exe!boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>::operator()() Line 129  C++
app739.exe!boost::asio::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, ...) Line 70    C++
app739.exe!boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > & context) Line 39 C++
app739.exe!boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > > * this_handler) Line 685  C++
app739.exe!boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > > & context) Line 39 C++
app739.exe!boost::asio::detail::win_iocp_socket_recv_op<boost::asio::mutable_buffers_1,boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > > >::do_complete(boost::asio::detail::win_iocp_io_service * owner, boost::asio::detail::win_iocp_operation * base, const boost::system::error_code & result_ec, unsigned __int64 bytes_transferred) Line 97    C++
app739.exe!boost::asio::detail::win_iocp_operation::complete(boost::asio::detail::win_iocp_io_service & owner, const boost::system::error_code & ec, unsigned __int64 bytes_transferred) Line 47    C++
app739.exe!boost::asio::detail::win_iocp_io_service::do_one(bool block, boost::system::error_code & ec) Line 406    C++
app739.exe!boost::asio::detail::win_iocp_io_service::run(boost::system::error_code & ec) Line 164   C++
app739.exe!boost::asio::io_service::run() Line 59   C++
app739.exe!MainWindow::on_pushButtonStart_clicked::__l13::<lambda>() Line 943   C++

This answer indicates that the problem may have to do with io_service.reset() . The stack-trace at the time of the access violation shows that the thread was processing an asio completion handler. I think from reading other posts that the key to resolving this is to correctly sequence io_service.stop() and io_service.reset() to the boost::asio::io_service object, also it may be important to reset the socket prior to stopping the io_service or resetting the sentinel work object.

The code below shows how I attempt to stop the io_service threads, while debugging the code I do see all the threads complete their joins so I do not understand why there are completion handlers outsanding.

void
MainWindow::stopSys()
{
    // make sure that we have no more work keeping services alive
    mpWork.reset();
    // check to see if the protocol threads were started
    if (mVCDUProtocol) {
        // terminate protocol thread by setting the shared mShutdown atomic flag
        mVCDUProtocol->shutdown();
        // Once each thread sees the shutdown flag, it will cleanly
        // terminate so we can call join here to wait for the entire
        // pool to finish
        std::for_each(mIOServicePool.begin(), mIOServicePool.end(),
            [](std::thread& rNext) {
                rNext.join();
            });
        mIOServicePool.clear();
    }
}

My code shown below is pretty straightforward. It kicks off an asynchronous resolve - which is handled within a lambda handler. From there, it calls the start_async_ops(endPointIter) to perform an asynchronous connect() and from this lambda, the code calls VCDUProtocol::do_read() which performs a boost::asio::async_read_until() to wait for the data from the server.

void
VCDUProtocol::prosimAsyncIOThreadFn()
{
    static auto& gLogger = gpLogger->getLoggerRef(
        gUseSysLog ? Logger::LogDest::SysLog :
        Logger::LogDest::EventLog);
    try {
        // convert the host-name/port to a usable endpoint
        tcp::resolver resolver(*mpIOService);
        tcp::resolver::query query(mProtocolConfig.getProsimHostName(),
            std::to_string(mProtocolConfig.getProsimPortNum()));
        const auto endPointIter = std::find_if(
            resolver.resolve(query), tcp::resolver::iterator(),
            [](const tcp::endpoint& next) {
                return next.protocol() == tcp::v4();
            });
        if (endPointIter != tcp::resolver::iterator()) {
            mpSocket = std::make_unique<tcp::socket>(*mpIOService);
            mpSocketTimer = std::make_unique<deadline_timer>(*mpIOService);
            start_async_ops(endPointIter);
        }
    } catch (std::exception& rEx) {
        LOG_ERROR(gLogger, gChannel) << boost::format(
            "%1%: %2%")
            % __FUNCTION__
            % rEx.what();
    }
}

void
VCDUProtocol::start_async_ops(tcp::resolver::iterator endpoint_iter)
{
    // Start the connect actor.
    do_connect(endpoint_iter);

    // Start the deadline actor. You will note that we're not setting any
    // particular deadline here. Instead, the connect and input actors will
    // update the deadline prior to each asynchronous operation.
    mpSocketTimer->async_wait(boost::bind(
        &VCDUProtocol::check_deadline, this, _1));
}

void
VCDUProtocol::do_connect(
    tcp::resolver::iterator endpoint_iter)
{
    if (endpoint_iter != tcp::resolver::iterator()) {
        // Set a deadline for the connect operation to complete.
        mpSocketTimer->expires_from_now(boost::posix_time::seconds(5));
        boost::asio::async_connect(*mpSocket, endpoint_iter,
            [this](boost::system::error_code ec, tcp::resolver::iterator) {
                if (!mShutdownFlag && !ec) {
                    // successfully connected here - cancel the
                    // connect timer and kick off async write ops
                    mpSocketTimer->cancel();
                    // kick off the prosim read operation
                    do_read();
                }
            });
    } else {
        // No more endpoints. Close the socket.
        shutdown();
    }
}

void
VCDUProtocol::do_read()
{
    // Start or continue an asynchronous line reads.  This will read at least
    // up to a carriage return or line feed
    async_read_until(*mpSocket, *mTLS->mSocketStreamBuf, "rn",
        boost::bind(&VCDUProtocol::handle_read, this, _1));
}

This is the asynchronous read completion handler - THIS NEEDS TO BE CANCELLED, I read somewhere that simply closing the socket is insufficient as the completions handlers will not get called. Should I call the cancel;

/**
 * Asynchronous read callback.
 *
 * @param ec      [in] Boost ASIO library error code.
 */
void
VCDUProtocol::handle_read(const boost::system::error_code& ec)
{
    static auto& gLogger = gpLogger->getLoggerRef(
        gUseSysLog ? Logger::LogDest::SysLog :
        Logger::LogDest::EventLog);
    if (!mShutdownFlag) {
        if (!ec) {
            // Extract the newline-delimited message from the buffer.
            std::string line;
            std::istream is(mTLS->mSocketStreamBuf.get());
            while (std::getline(is, line)) {
                // Critical Section
                std::lock_guard<std::mutex> lock (gMutexGuard);
                // handle partial line reads
                if (is.eof()) {
                    mTLS->mPartialLine = std::move(line);
                    continue;
                } else if (!mTLS->mPartialLine.empty()) {
                    line = std::move(mTLS->mPartialLine) + line;
                }
                . . .
                // update GUI
                mpListener->handlePageUpdate(
                    mProtocolConfig.getCduID(),
                    mTLS->mVCDUPage, bRefreshCDU);
                }
                // not really required
                line.clear();
            }
            // keep reading
            do_read();
        } else {
            LOG_ERROR(gLogger, gChannel) << boost::format(
                "CDU_%1%: handle_read - error[%2%]")
                % mProtocolConfig.getCduID()
                % ec.message();
            shutdown();
        }
    }
}

restarting tcp boost asio io_service with uncompleted completion handlers

AFAICT that's impossible.

The documentation clearly specifies that reset() must be called before run() can be called again.

I think the only viable alternative is to create your own event loop based on eg poll_one() and thereby prevent having to stop the service in the first place.

This is the asynchronous read completion handler - THIS NEEDS TO BE CANCELLED, I read somewhere that simply closing the socket is insufficient as the completions handlers will not get called.

That's not true. Cancelling a socket will cancel the operations in flight and they will cause the completion handlers to be invoked with ec == operation_aborted . Closing the socket will probably cause different error codes like bad_socket .

链接地址: http://www.djcxy.com/p/62684.html

上一篇: 如何在同一个select语句中使用count和group

下一篇: 服务与未完成的完成处理程序