There is a request to implement some d-bus method which calls an coroutine function with yield_context and deliver the response of the function as a result. The problem is that the coroutine function shouldn't be called again before it returns for previous call, because underlying system doesn't allow multiple in-flight message and provided coroutine function library just sends message to the underlying system and waits for response with yield context without any kind of serialization.
So, I implemented the code like the below. Basic idea is queueing requests and waiting until the response came while another coroutine pops and processes the queued requests.
struct RequestResponse
{
boost::asio::steady_timer& timer;
const UnderlyingSystemLibrary::ByteArray& request;
std::chrono::milliseconds timeout;
boost::system::error_code ec;
UnderlyingSystemLibrary::ByteArray response;
};
class ExecuteLock
{
public:
ExecuteLock(bool& executing) : executing(executing)
{
if (!executing)
executing = locked = true;
else
locked = false;
}
~ExecuteLock()
{
if (locked)
executing = false;
}
bool isLocked()
{
return locked;
}
private:
bool& executing;
bool locked;
};
......
/* wrapper function to prohibit multiple in-flight message */
void Wrapper::sendReceiveYield(
boost::asio::yield_context yield,
std::shared_ptr<RequestResponse> message)
{
static constexpr size_t limit = 100;
ExecuteLock lock(isSending); /* Custom lock to check another coroutine. It'll be released on return */
messageQueue.push(message); /* Push message into message queue */
if (!lock.isLocked()) /* If another coroutine holds lock */
{
/* A coroutine that holds lock will do the work */
return;
}
/* If there is no coroutine which has run already */
while (!messageQueue.empty()) /* Pop and process message until empty */
{
auto& message = messageQueue.front();
std::tie(message->ec, message->response) =
underlyingSystemLibrary->sendReceiveYield(
yield,
message->request,
message->timeout); /* Call underlying system function with yield context */
message->timer.cancel(); /* Cancel timer after return from underlying system function */
messageQueue.pop();
}
}
std::pair<boost::system::error_code, UnderlyingSystemLibrary::ByteArray>
Wrapper::sendReceiveYield(
boost::asio::yield_context yield,
const UnderlyingSystemLibrary::ByteArray& request,
std::chrono::milliseconds timeout)
{
static const std::chrono::milliseconds async_timeout(1000);
auto executor = boost::asio::get_associated_executor(yield);
boost::asio::steady_timer timer(executor);
auto message = std::make_shared<RequestResponse>(
timer,
request,
timeout,
boost::system::error_code(),
UnderlyingSystemLibrary::ByteArray()
);
boost::asio::spawn(strand,
[this, message](boost::asio::yield_context yield) {
sendReceiveYield(yield, message); /* call wrapper function within a strand */
});
auto ec = boost::system::error_code();
timer.expires_after(std::max(timeout, async_timeout));
timer.async_wait(yield[ec]); /* Wait by timer. The timer will be canceled by wrapper function. */
if (ec != boost::asio::error::operation_aborted)
{
phosphor::logging::log<phosphor::logging::level::ERR>(
"transaction timeout"); /* It is error if the timer isn't canceled. */
return std::make_pair(ec, message->response);
}
/* Return response */
return std::make_pair(message->ec, message->response);
}
It works fine but I don't think that this is an optimal solution. Could you give me some example or document about this kind of problem?
I spent considerable time making your code self-contained. I cannot really figure it out as USL/Wrapper seem to only make sense if Wrapper actually derives from USL, but then it doesn't really make sense you have underlyingSystemLibrary->
spelled out ¯\(ツ)/¯.
Regardless, the two sendReceiveYield
functions are both defined with Wrapper::
qualification.
To be honest, it seems like the overload returning the pair should actually be a member of
RequestResponse
, after all:auto& message = messageQueue.front(); std::tie(message->ec, message->response) = sendReceiveYield(yield, message->request, message->timeout); /* Call underlying system function with yield context */ message->timer.cancel(); /* Cancel timer after return from underlying system function */
Can then just look like
messageQueue.front()->sendReceiveYield(yield);
so ... I went with the following:
Live On Coliru
With all that, I still don't know where to start:
sendReceiveYield
overloads seem to recursively call eachotherRequestResponse
item a new RequestResponse
appears to be instantiatedSo let me instead go back to first principles.
messageQueue
Now, note that given the strand, the queue in itself has all the information you needed:
void Wrapper::Send(Message msg) { // assume invoked on strand
messageQueue.push(std::move(msg));
if (messageQueue.size() == 1) {
// "fork" a separate coro to drain the queue
spawn(strand, [this](yield_context yield) {
while (!messageQueue.empty()) {
// do the actual send operation
actual_operation(messageQueue.front(), yield);
messageQueue.pop();
}
});
}
}
That at least avoids the weirdness with the ExecuteLock
.
I don't hate the use of a waitable timer to signal completion. (I would want to separate that from timeouts).
However, if you need to have the response there are multiple ways about it. One way is to use regular promises. You can replace the request object with
struct RequestResponse {
USL::ByteArray const& request;
std::promise<USL::ByteArray> response;
};
And arrive at something like:
USL::ByteArray Wrapper::SendRecieve(Message msg) { // assume invoked on strand
messageQueue.push(std::move(msg));
if (messageQueue.size() == 1) {
// "fork" a separate coro to drain the queue
spawn(strand, [this](yield_context yield) {
while (!messageQueue.empty()) {
// do the actual send operation
auto& msg = messageQueue.front();
try {
msg->response.set_value(actual_operation(msg->request, yield));
} catch (...) {
msg->response.set_exception(std::current_exception());
}
messageQueue.pop();
}
});
}
return msg->response.get_future().get();
}
However, that's obviously a blocking operation. If you need to be able to asynchronously await the result, you'll have to write an asynchronous operation with an initiation function that allows you to pass any completion token