| #!/usr/bin/python |
| # Script for testing the Attribute D-Bus API |
| |
| import sys |
| from optparse import OptionParser, OptionValueError |
| from binascii import hexlify, unhexlify |
| |
| import gobject |
| import dbus |
| import dbus.service |
| import dbus.mainloop.glib |
| |
| def handle_set_property(option, opt_str, value, parser): |
| """Handle --set-property parameter.""" |
| |
| char_path = parser.values.char_path |
| if not char_path: |
| raise OptionValueError, "%s requires --char-path" % opt_str |
| if len(value.split("=")) != 2: |
| raise OptionValueError, "%s must have format \"property=value\"" % opt_str |
| if not getattr(parser.values, option.dest, None): |
| setattr(parser.values, option.dest, {}) |
| props = getattr(parser.values, option.dest) |
| if not props.get(char_path): |
| props[char_path] = [] |
| props[char_path].append(value.split("=")) |
| |
| def command_parse(): |
| """Parse command line options.""" |
| |
| usage = """ |
| Usage: %s [options]""" % sys.argv[0] |
| parser = OptionParser(usage=usage) |
| parser.add_option("-i", "--adapter", action="store", type="string", |
| help="Specify local adapter interface") |
| parser.add_option("--listen", action="store_true", |
| help="Listen for notifications and indications (requires policy changes)") |
| parser.add_option("--char-path", action="store", type="string", |
| help="D-Bus path for specific characteristic") |
| parser.add_option("--set-property", action="callback", type="string", |
| metavar="NAME=VALUE", callback=handle_set_property, |
| help="Set property for characteristic") |
| return parser.parse_args() |
| |
| def dbus_type_to_str(d): |
| """Convert a D-Bus array to a hexdump.""" |
| |
| if isinstance(d, dbus.Array): |
| return hexlify("".join([str(x) for x in d])) |
| else: |
| return str(d) |
| |
| def hexdump_to_dbus(data): |
| """Convert an hexadecimal dump to D-Bus array.""" |
| |
| d = [dbus.Byte(ord(i)) for i in unhexlify(data)] |
| return dbus.Array(d, signature=dbus.Signature('y'), variant_level=1) |
| |
| class Watcher(dbus.service.Object): |
| @dbus.service.method("org.bluez.Watcher", in_signature="oay", out_signature="") |
| def ValueChanged(self, char, newvalue): |
| print "Watcher: new value for %s: %s" % (char, dbus_type_to_str(newvalue)) |
| |
| def handle_characteristics(char): |
| for (path, props) in char.GetCharacteristics().iteritems(): |
| if options.char_path and path != options.char_path: |
| continue |
| |
| if not options.set_property: |
| ret = "Characteristic: %s\nProperties:\n" % path |
| for (k, v) in props.iteritems(): |
| ret += "\t%s: %s\n" % (k, dbus_type_to_str(v)) |
| print ret |
| elif options.set_property.get(path, None): |
| for (k, v) in options.set_property[path]: |
| char2 = dbus.Interface(bus.get_object("org.bluez", |
| path), "org.bluez.Characteristic") |
| if k == "Value": |
| char2.SetProperty(k, hexdump_to_dbus(v)) |
| else: |
| char2.SetProperty(k, v) |
| |
| def handle_services(device): |
| for s in device.GetProperties()["Services"]: |
| char = dbus.Interface(bus.get_object("org.bluez", s), |
| "org.bluez.Characteristic") |
| if options.listen: |
| char.RegisterCharacteristicsWatcher(watcher_path) |
| else: |
| handle_characteristics(char) |
| |
| if __name__ == "__main__": |
| (options, args) = command_parse() |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| bus = dbus.SystemBus() |
| manager = dbus.Interface(bus.get_object("org.bluez", "/"), |
| "org.bluez.Manager") |
| |
| if options.adapter: |
| path = manager.FindAdapter(options.adapter) |
| else: |
| path = manager.DefaultAdapter() |
| |
| adapter = dbus.Interface(bus.get_object("org.bluez", path), |
| "org.bluez.Adapter") |
| |
| if options.listen: |
| watcher_path = "/test/watcher" |
| watcher = Watcher(bus, watcher_path) |
| |
| for d in adapter.GetProperties()["Devices"]: |
| device = dbus.Interface(bus.get_object("org.bluez", d), |
| "org.bluez.Device") |
| handle_services(device) |
| |
| if options.listen: |
| print "Waiting for incoming notifications/indications..." |
| mainloop = gobject.MainLoop() |
| mainloop.run() |