Source code for bleak.backends.bluezdbus.scanner

import sys
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    if sys.platform != "linux":
        assert False, "This backend is only available on Linux"

import logging
from collections.abc import Callable, Coroutine
from typing import Any, Literal, Optional
from warnings import warn

if sys.version_info < (3, 12):
    from typing_extensions import override
else:
    from typing import override

from dbus_fast import Variant

from bleak.args.bluez import BlueZDiscoveryFilters as _BlueZDiscoveryFilters
from bleak.args.bluez import BlueZScannerArgs as _BlueZScannerArgs
from bleak.backends.bluezdbus.defs import Device1
from bleak.backends.bluezdbus.manager import get_global_bluez_manager
from bleak.backends.scanner import (
    AdvertisementData,
    AdvertisementDataCallback,
    BaseBleakScanner,
)
from bleak.exc import BleakError

logger = logging.getLogger(__name__)


_DEPRECATED: dict[str, Any] = {
    "BlueZDiscoveryFilters": _BlueZDiscoveryFilters,
    "BlueZScannerArgs": _BlueZScannerArgs,
}


def __getattr__(name: str):
    if value := _DEPRECATED.get(name):
        warn(
            f"importing {name} from bleak.backends.bluezdbus.scanner is deprecated, use bleak.args.bluez instead",
            DeprecationWarning,
            stacklevel=2,
        )
        return value
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


[docs] class BleakScannerBlueZDBus(BaseBleakScanner): """The native Linux Bleak BLE Scanner. For possible values for `filters`, see the parameters to the ``SetDiscoveryFilter`` method in the `BlueZ docs <https://github.com/bluez/bluez/blob/master/doc/org.bluez.Adapter.rst#void-setdiscoveryfilterdict-filter>`_ Args: detection_callback: Optional function that will be called each time a device is discovered or advertising data has changed. service_uuids: Optional list of service UUIDs to filter on. Only advertisements containing this advertising data will be received. Specifying this also enables scanning while the screen is off on Android. scanning_mode: Set to ``"passive"`` to avoid the ``"active"`` scanning mode. **bluez: Dictionary of arguments specific to the BlueZ backend. **adapter (str): Bluetooth adapter to use for discovery. """ def __init__( self, detection_callback: Optional[AdvertisementDataCallback], service_uuids: Optional[list[str]], scanning_mode: Literal["active", "passive"], *, bluez: _BlueZScannerArgs, **kwargs: Any, ): super(BleakScannerBlueZDBus, self).__init__(detection_callback, service_uuids) self._scanning_mode = scanning_mode # kwarg "device" is for backwards compatibility self._adapter: Optional[str] = kwargs.get("adapter", kwargs.get("device")) # callback from manager for stopping scanning if it has been started self._stop: Optional[Callable[[], Coroutine[Any, Any, None]]] = None # Discovery filters self._filters: dict[str, Variant] = {} self._filters["Transport"] = Variant("s", "le") self._filters["DuplicateData"] = Variant("b", False) if self._service_uuids: self._filters["UUIDs"] = Variant("as", self._service_uuids) filters = bluez.get("filters") if filters is not None: self.set_scanning_filter(filters=filters) self._or_patterns = bluez.get("or_patterns") if self._scanning_mode == "passive" and service_uuids: logger.warning( "service uuid filtering is not implemented for passive scanning, use bluez or_patterns as a workaround" ) if self._scanning_mode == "passive" and not self._or_patterns: raise BleakError("passive scanning mode requires bluez or_patterns")
[docs] @override async def start(self) -> None: manager = await get_global_bluez_manager() if self._adapter: adapter_path = f"/org/bluez/{self._adapter}" else: adapter_path = manager.get_default_adapter() self.seen_devices = {} if self._scanning_mode == "passive": self._stop = await manager.passive_scan( adapter_path, self._or_patterns, self._handle_advertising_data, self._handle_device_removed, ) else: self._stop = await manager.active_scan( adapter_path, self._filters, self._handle_advertising_data, self._handle_device_removed, )
[docs] @override async def stop(self) -> None: if self._stop: # avoid reentrancy stop, self._stop = self._stop, None await stop()
[docs] def set_scanning_filter(self, **kwargs: Any) -> None: """Sets OS level scanning filters for the BleakScanner. For possible values for `filters`, see the parameters to the ``SetDiscoveryFilter`` method in the `BlueZ docs <https://github.com/bluez/bluez/blob/master/doc/org.bluez.Adapter.rst#void-setdiscoveryfilterdict-filter>`_ See variant types here: <https://python-dbus-next.readthedocs.io/en/latest/type-system/> Keyword Args: filters (dict): A dict of filters to be applied on discovery. """ for k, v in kwargs.get("filters", {}).items(): if k == "UUIDs": self._filters[k] = Variant("as", v) elif k == "RSSI": self._filters[k] = Variant("n", v) elif k == "Pathloss": self._filters[k] = Variant("n", v) elif k == "Transport": self._filters[k] = Variant("s", v) elif k == "DuplicateData": self._filters[k] = Variant("b", v) elif k == "Discoverable": self._filters[k] = Variant("b", v) elif k == "Pattern": self._filters[k] = Variant("s", v) else: logger.warning("Filter '%s' is not currently supported." % k)
# Helper methods def _handle_advertising_data(self, path: str, props: Device1) -> None: """ Handles advertising data received from the BlueZ manager instance. Args: path: The D-Bus object path of the device. props: The D-Bus object properties of the device. """ _service_uuids = props.get("UUIDs", []) if not self.is_allowed_uuid(_service_uuids): return # Get all the information wanted to pack in the advertisement data _local_name = props.get("Name") _manufacturer_data = { k: bytes(v) for k, v in props.get("ManufacturerData", {}).items() } _service_data = {k: bytes(v) for k, v in props.get("ServiceData", {}).items()} # Get tx power data tx_power = props.get("TxPower") # Pack the advertisement data advertisement_data = AdvertisementData( local_name=_local_name, manufacturer_data=_manufacturer_data, service_data=_service_data, service_uuids=_service_uuids, tx_power=tx_power, rssi=props.get("RSSI", -127), platform_data=(path, props), ) device = self.create_or_update_device( path, props["Address"], # BlueZ generates a name based on the address if no name is available. # To match other backends, we replace this with None. ( None if props["Alias"] == props["Address"].replace(":", "-") else props["Alias"] ), {"path": path, "props": props}, advertisement_data, ) self.call_detection_callbacks(device, advertisement_data) def _handle_device_removed(self, device_path: str) -> None: """ Handles a device being removed from BlueZ. """ try: del self.seen_devices[device_path] except KeyError: # The device will not have been added to self.seen_devices if no # advertising data was received, so this is expected to happen # occasionally. pass