1import asyncio 2from dbus_fast.aio import MessageBus 3from dbus_fast.service import ServiceInterface, method 4import dbus_fast.introspection as intr 5from dbus_fast import BusType 6 7 8class SampleInterface(ServiceInterface): 9 def __init__(self): 10 super().__init__('test.interface') 11 12 @method() 13 def Ping(self): 14 pass 15 16 @method() 17 def ConcatStrings(self, what1: 's', what2: 's') -> 's': # noqa: F821 18 return what1 + what2 19 20 21async def main(): 22 bus_name = 'dbus.fast.sample' 23 obj_path = '/test/path' 24 25 bus = await MessageBus(bus_type=BusType.SYSTEM).connect() 26 bus2 = await MessageBus(bus_type=BusType.SYSTEM).connect() 27 28 await bus.request_name(bus_name) 29 30 service_interface = SampleInterface() 31 bus.export(obj_path, service_interface) 32 33 introspection = await bus2.introspect(bus_name, obj_path) 34 assert type(introspection) is intr.Node 35 obj = bus2.get_proxy_object(bus_name, obj_path, introspection) 36 interface = obj.get_interface(service_interface.name) 37 38 result = await interface.call_ping() 39 assert result is None 40 41 result = await interface.call_concat_strings('hello ', 'world') 42 assert result == 'hello world' 43 44 45asyncio.run(main()) 46