I'd like to make a non-blocking call to a function in a remote D-Bus service using PyQt4.QtDBus. Adapting from Qt's C++ documentation, I came up with the following test program:
from PyQt4 import QtCore, QtDBus
class DeviceInterface(QtDBus.QDBusAbstractInterface):
def __init__(self, service, path, connection, parent=None):
super().__init__(service, path, 'org.freedesktop.UDisks.Device',
connection, parent)
@QtCore.pyqtSlot(QtDBus.QDBusArgument)
def callFinishedSlot(self, arg):
print("Got result:", arg)
if __name__ == '__main__':
import sys
app = QtCore.QCoreApplication(sys.argv)
dev = DeviceInterface('org.freedesktop.UDisks',
'/org/freedesktop/UDisks/devices/sda1',
QtDBus.QDBusConnection.systemBus(), app)
async = dev.asyncCall("FilesystemListOpenFiles");
watcher = QtDBus.QDBusPendingCallWatcher(async, dev)
watcher.finished.connect(dev.callFinishedSlot)
sys.exit(app.exec_())
It seems to work. When I run it, it prints:
Got result: <PyQt4.QtDBus.QDBusPendingCallWatcher object at 0xb740c77c>
The problem is, I don't know how to convert the QDBusPendingCallWatcher
to something (e.g., QDBusMessage
) that I can extract the results from. The example from the C++ documentation does this:
void MyClass.callFinishedSlot(QDBusPendingCallWatcher *call)
{
QDBusPendingReply<QString, QByteArray> reply = *call;
if (reply.isError()) {
showError();
} else {
QString text = reply.argumentAt<0>();
QByteArray data = reply.argumentAt<1>();
showReply(text, data);
}
call->deleteLater();
}
Can anyone tell me how to translate the C++ slot to something that will work with PyQt4? (I'm using PyQt4.9.1 with Qt 4.8.1.)
Okay, the trick seems to be to construct a QDBusPendingReply
from the QDBusPendingCallWatcher
instance (many thanks to Phil Thompson for pointing this out on the PyQt mailing list). As it turns out, this same technique also works in C++. I had the UDisk object path wrong in my original code, plus a few other minor typos, so here's a complete, working example for posterity:
from PyQt4 import QtCore, QtDBus
class DeviceInterface(QtDBus.QDBusAbstractInterface):
def __init__(self, service, path, connection, parent=None):
super().__init__(service, path, 'org.freedesktop.UDisks.Device',
connection, parent)
def callFinishedSlot(self, call):
# Construct a reply object from the QDBusPendingCallWatcher
reply = QtDBus.QDBusPendingReply(call)
if reply.isError():
print(reply.error().message())
else:
print(" PID UID COMMAND")
print("------- ------- ------------------------------------")
for pid, uid, cmd in reply.argumentAt(0):
print("{0:>7d} {1:>7d} {2}".format(pid, uid, cmd))
# Important: Tell Qt we are finished processing this message
call.deleteLater()
if __name__ == '__main__':
import sys
import signal
signal.signal(signal.SIGINT, signal.SIG_DFL)
app = QtCore.QCoreApplication(sys.argv)
dev = DeviceInterface('org.freedesktop.UDisks',
'/org/freedesktop/UDisks/devices/sda1',
QtDBus.QDBusConnection.systemBus(), app)
async = dev.asyncCall("FilesystemListOpenFiles");
watcher = QtDBus.QDBusPendingCallWatcher(async, dev)
watcher.finished.connect(dev.callFinishedSlot)
sys.exit(app.exec_())
This should work on most recent Linux distros, and is a good example of using PyQt to invoke a D-Bus method that returns a compound type.