I am using BlueZ-obexd and pydbus to create a python obex client for opp file transfer. So far, I have been able to get file transfer working, and I am trying to implement RemoveSession from the API to trigger only after the transfer has completed or failed. I know there is a status property in the API too, but I have absolutely no idea how to use it. I don't have a lot of experience with dbus or pydbus, so any help would be great. My code is below.
import pydbus
import time
import subprocess
# Setup of device specific values
address = subprocess.check_output("bluetoothctl paired-devices", shell=True)
dev_id = str(address)[9:26]
TRANSFER_IFACE = 'org.bluez.obex.Transfer1'
ses = pydbus.SessionBus()
obex = ses.get('org.bluez.obex', '/org/bluez/obex')
ses1 = obex.CreateSession(dev_id, {'Target': pydbus.Variant('s', 'OPP')})
ses1_dbus = ses.get('org.bluez.obex', ses1)
props = ses1_dbus.SendFile('/home/pi/Desktop/image.jpg')
obex.RemoveSession(ses1)
The documentation for SendFile
says that a DBus object path and a dictionary of properties is returned. The interface of the object returned is org.bluez.obex.Transfer1
which allows the Status
to be monitored.
The ideal way to monitor this status is with the event loop from GLib.MainLoop and watch for the PropertiesChanged
signal on the DBus object path returned by SendFile
.
Also, with getting the paired device information, there is a BlueZ DBus API for getting that information without the need to do a subprocess
call to bluetoothctl
.
I've tried to do an example of both:
import pydbus
from gi.repository import GLib
from time import sleep
OBEX_TRANSFER = 'org.bluez.obex.Transfer1'
BLUEZ_DEVICE = 'org.bluez.Device1'
ses_bus = pydbus.SessionBus()
sys_bus = pydbus.SystemBus()
mngr = sys_bus.get('org.bluez', '/')
mainloop = GLib.MainLoop()
def paired_devices():
mngd_objs = mngr.GetManagedObjects()
for path, iface in mngd_objs.items():
if BLUEZ_DEVICE in iface:
addr = iface.get(BLUEZ_DEVICE, {}).get('Address')
name = iface.get(BLUEZ_DEVICE, {}).get('Name', addr)
paired = iface.get(BLUEZ_DEVICE, {}).get('Paired')
if paired:
return addr
def exit_session():
print('Exit session')
mainloop.quit()
obex.RemoveSession(ses1)
def transfer_status_handler(iface, props_changed, props_removed):
if iface == OBEX_TRANSFER:
status = props_changed.get('Status')
if status == 'complete':
print('Transfer complete')
exit_session()
elif status == 'queued':
print('Still queued')
elif status == 'active':
print('transferring')
elif status == 'suspended':
print('Suspended')
elif status == 'error':
print('error')
exit_session()
obex = ses_bus.get('org.bluez.obex', '/org/bluez/obex')
my_addr = paired_devices()
ses1 = obex.CreateSession(my_addr, {'Target': pydbus.Variant('s', 'OPP')})
ses1_dbus = ses_bus.get('org.bluez.obex', ses1)
path, props = ses1_dbus.SendFile('/home/pi/.bashrc')
transfer = ses_bus.get('org.bluez.obex', path)
transfer.onPropertiesChanged = transfer_status_handler
try:
mainloop.run()
except KeyboardInterrupt:
exit_session()