ubluetooth — 低功耗蓝牙

该模块提供低功耗蓝牙控制接口。当前,它在中央,外围设备,广播和观察者角色中支持蓝牙低功耗(BLE),并且设备可以同时在多个角色中运行。

此API旨在与低功耗蓝牙协议相匹配,并为更高级的抽象(如特定的设备类型)提供构建模块。

Note

该模块仍在开发中,其类,功能,方法和常量可能会发生变化。

BLE 类

构建

class ubluetooth.BLE

返回 BLE 对象

配置

BLE.active([active])

(可选)更改BLE无线电的活动状态,并返回当前状态。

在使用此类的任何其他方法之前,必须使无线电处于活动状态。

BLE.config('param')
BLE.config(param=value, ...)

获取或设置BLE接口的配置值。为了获得一个值,参数名称应该用字符串引号,并且一次只查询一个参数。要设置值,请使用关键字语法,一次可以设置一个或多个参数。

当前支持的值为:

  • 'mac': 返回设备的MAC地址。如果设备具有固定地址(例如PYBD),则将其返回。否则(例如ESP32),当BLE接口处于活动状态时,将生成一个随机地址。

  • 'rxbuf': 设置用于存储传入事件的内部缓冲区的大小(以字节为单位)。该缓冲区是整个BLE驱动程序的全局缓冲区,因此可以处理所有事件(包括所有特征)的传入数据。增加此值可以更好地处理突发的传入数据(例如,扫描结果),并可以使中央设备接收较大的特征值。

事件处理

BLE.irq(handler, trigger=0xffff)

为BLE堆栈中的事件注册回调。handler接收两个参数,event (看下文的事件代码)和 data (其是值的特定事件元组)。

可选的 trigger 参数允许您设置程序感兴趣的事件的掩码。默认值为所有事件。

注: addr, adv_datauuid 元组中的项是引用的数据管理 ubluetooth 模块(即相同的实例将被重新使用多次调用到事件处理程序)。 如果您的程序想在处理程序之外使用此数据,则它必须首先复制它们,例如使用 bytes(addr) or bluetooth.UUID(uuid)

一个事件处理程序显示所有可能的事件:

def bt_irq(event, data):
    if event == _IRQ_CENTRAL_CONNECT:
        # 中央设备已经连接到这个外围设备
        conn_handle, addr_type, addr = data
    elif event == _IRQ_CENTRAL_DISCONNECT:
        # 中央设备已与此外围设备断开
        conn_handle, addr_type, addr = data
    elif event == _IRQ_GATTS_WRITE:
        # 中央设备已写入此特征或描述符
        conn_handle, attr_handle = data
    elif event == _IRQ_GATTS_READ_REQUEST:
        # 中央设备已发出读请求. Note: 这是一个硬件IRQ
        # 返回None来拒绝读操作
        # Note: 这事件不支持 ESP32.
        conn_handle, attr_handle = data
    elif event == _IRQ_SCAN_RESULT:
        # 一次扫描的结果
        addr_type, addr, connectable, rssi, adv_data = data
    elif event == _IRQ_SCAN_COMPLETE:
        # 扫描持续时间已完成或手动停止
        pass
    elif event == _IRQ_PERIPHERAL_CONNECT:
        #  gap_connect()连接成功
        conn_handle, addr_type, addr = data
    elif event == _IRQ_PERIPHERAL_DISCONNECT:
        # 已连接的外围设备已断开
        conn_handle, addr_type, addr = data
    elif event == _IRQ_GATTC_SERVICE_RESULT:
        # 调用gattc_discover_services()找到的每个服务
        conn_handle, start_handle, end_handle, uuid = data
    elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
        # 调用gattc_discover_services()找到的每个特征
        conn_handle, def_handle, value_handle, properties, uuid = data
    elif event == _IRQ_GATTC_DESCRIPTOR_RESULT:
        # 调用gattc_discover_descriptors()找到的每个描述符
        conn_handle, dsc_handle, uuid = data
    elif event == _IRQ_GATTC_READ_RESULT:
        # gattc_read() 已完成
        conn_handle, value_handle, char_data = data
    elif event == _IRQ_GATTC_WRITE_STATUS:
        # gattc_write() 已完成
        conn_handle, value_handle, status = data
    elif event == _IRQ_GATTC_NOTIFY:
        # 外围设备已发出通知请求
        conn_handle, value_handle, notify_data = data
    elif event == _IRQ_GATTC_INDICATE:
        #外围设备发出指示请求
        conn_handle, value_handle, notify_data = data

事件代码:

from micropython import const
_IRQ_CENTRAL_CONNECT                 = const(1 << 0)
_IRQ_CENTRAL_DISCONNECT              = const(1 << 1)
_IRQ_GATTS_WRITE                     = const(1 << 2)
_IRQ_GATTS_READ_REQUEST              = const(1 << 3)
_IRQ_SCAN_RESULT                     = const(1 << 4)
_IRQ_SCAN_COMPLETE                   = const(1 << 5)
_IRQ_PERIPHERAL_CONNECT              = const(1 << 6)
_IRQ_PERIPHERAL_DISCONNECT           = const(1 << 7)
_IRQ_GATTC_SERVICE_RESULT            = const(1 << 8)
_IRQ_GATTC_CHARACTERISTIC_RESULT     = const(1 << 9)
_IRQ_GATTC_DESCRIPTOR_RESULT         = const(1 << 10)
_IRQ_GATTC_READ_RESULT               = const(1 << 11)
_IRQ_GATTC_WRITE_STATUS              = const(1 << 12)
_IRQ_GATTC_NOTIFY                    = const(1 << 13)
_IRQ_GATTC_INDICATE                  = const(1 << 14)

为了节省固件中的空间,这些常量不包括在 ubluetooth 。将您需要的从上面的列表中添加到您的程序中。

广播者(Advertiser)

BLE.gap_advertise(interval_us, adv_data=None, resp_data=None, connectable=True)

以指定的时间间隔(以微秒为单位)开始广播。该间隔将四舍五入到最接近的625微妙。要停止广播,请将 interval_us 设置 为None。

adv_dataresp_data 可以是任何 buffer 类型 (例如 bytes, bytearray, str)。 adv_data 包含在所有广播中,并发送 resp_data 以应答有效的扫描。

注意:如果 adv_data (或 resp_data )为None,则将重用传递到上一个调用的数据 gap_advertise 。 这样一来,广播者就可以使用来恢复广播 gap_advertise(interval_us) 。为了清除广播负载,传递一个空的bytes,即b’’。

观察者 (Scanner)

BLE.gap_scan(duration_ms[, interval_us][, window_us])

运行持续指定时间(以毫秒为单位)的扫描操作。

要无限期扫描,请将 duration_ms 设置为 0 。要停止扫描,请将 duration_ms 设置为 None

使用 interval_uswindow_us 可以选择配置占空比。 扫描器将每间隔一微秒运行一次 window_us 微秒,总计持续时间为毫秒。默认间隔和窗口分别为1.28秒和11.25毫秒。

对于每个扫描结果,_IRQ_SCAN_RESULT 将引发该事件。

停止扫描(由于持续时间结束或明确停止)时,_IRQ_SCAN_COMPLETE 将引发该事件。

外围设备 (GATT Server)

BLE外围设备具有一组注册服务。每个服务可能包含特性,每个特性都有一个值。特征也可以包含描述符,描述符本身具有值。

这些值存储在本地,并通过在服务注册过程中生成的“值柄”进行访问。它们也可以被远程的中央设备读取或写入。 此外,外围设备可以通过连接句柄将特征“通知”到已连接的中央设备。

特征和描述符的默认最大为20个字节。任何由中央设备写给它们的都会被截短到这个长度。但是,任何本地写操作都会增加最大大小, 所以,如果你写想更长的数据,请注册后使用 gatts_write 。例如, gatts_write(char_handle, bytes(100))

BLE.gatts_register_services(services_definition)

使用指定的服务配置外围设备,替换所有现有服务。

services_definition 是一个服务的列表,其中每个服务都是一个包含UUID和特征列表的二元元组。

每个特征都是一个包含 UUIDflags 值以及一个可选的描述符列表的2或3元素元组。

每个描述符是一个包含UUID和一个flags值的二元元组。

flags是一个按位或组合的 ubluetooth.FLAG_READubluetooth.FLAG_WRITEubluetooth.FLAG_NOTIFY 。如下文所定义的值:

返回值是元组的列表(每个服务一个元素)(每个元素是一个值句柄)。特征和描述符句柄按照定义的顺序被展平到相同的元组中。

以下示例注册了两个服务 (Heart Rate, and Nordic UART):

HR_UUID = bluetooth.UUID(0x180D)
HR_CHAR = (bluetooth.UUID(0x2A37), bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,)
HR_SERVICE = (HR_UUID, (HR_CHAR,),)
UART_UUID = bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E')
UART_TX = (bluetooth.UUID('6E400003-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,)
UART_RX = (bluetooth.UUID('6E400002-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_WRITE,)
UART_SERVICE = (UART_UUID, (UART_TX, UART_RX,),)
SERVICES = (HR_SERVICE, UART_SERVICE,)
( (hr,), (tx, rx,), ) = bt.gatts_register_services(SERVICES)

这三个值柄(hr, tx, rx)可与使用 gatts_read, gatts_write, 和 gatts_notify

注意:注册服务之前,必须停止广告。

BLE.gatts_read(value_handle)

读取本地的值柄 (该值由 gatts_write 或远程的中央设备写入)。

BLE.gatts_write(value_handle, data)

写入本地的值柄,该值可由中央设备读取。

BLE.gatts_notify(conn_handle, value_handle[, data])

通知连接的中央设备此值已更改,并且应发出此外围设备的当前值的读取值。

如果指定了数据,则将该值作为通知的一部分发送到中央设备,从而避免了需要单独的读取请求的情况。请注意,这不会更新存储的本地值。

BLE.gatts_set_buffer(value_handle, len, append=False)

设置内部缓冲区大小(以字节为单位)。这将限制可以接收的最大值。默认值为20。 将 append 设置为 True 会将所有远程写入追加到当前值,而不是替换当前值。这样最多可以缓冲len个字节。 使用时 gatts_read ,将在读取后清除该值。这个功能在实现某些东西时很有用,比如Nordic UART服务。

中央设备 (GATT Client)

BLE.gap_connect(addr_type, addr, scan_duration_ms=2000)

连接到外围设备。成功,将触发 _IRQ_PERIPHERAL_CONNECT 事件。

BLE.gap_disconnect(conn_handle)

断开指定的连接句柄。成功,将触发 _IRQ_PERIPHERAL_DISCONNECT 事件。 如果未连接连接句柄,返回 False ,否则返回 True

BLE.gattc_discover_services(conn_handle)

查询已连接的外围设备的服务。

对于发现的每个服务, 会触发 _IRQ_GATTC_SERVICE_RESULT 事件。

BLE.gattc_discover_characteristics(conn_handle, start_handle, end_handle)

在已连接的外围设备上查询指定范围内的特征。 每次特征发现,会触发 _IRQ_GATTC_CHARACTERISTIC_RESULT 事件。

BLE.gattc_discover_descriptors(conn_handle, start_handle, end_handle)

在连接的外围设备中查询指定范围内的描述符。

每次特征发现,会触发 _IRQ_GATTC_DESCRIPTOR_RESULT 事件。

BLE.gattc_read(conn_handle, value_handle)

向连接的外围设备发出远程读取,以获取指定的特性或描述符句柄。

如果成功,会触发 _IRQ_GATTC_READ_RESULT 事件

BLE.gattc_write(conn_handle, value_handle, data, mode=0)

针对指定的特征或描述符句柄向连接的外围设备发出远程写操作。

  • mode

    • mode=0 (默认)是无响应写操作:写操作将发送到远程外围设备,但不会返回确认信息,也不会引发任何事件。

    • mode=1 i是响应写入:请求远程外围设备发送其已接收到数据的响应/确认。

如果从远程外围设备收到响应,_IRQ_GATTC_WRITE_STATUS 事件将触发。

UUID 类

构建

class ubluetooth.UUID(value)

用指定的值创建一个UUID实例。

该值可以是:

  • 一个16位整数。例如 0x2908.

  • 128位UUID字符串。例如 '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'.

常量

ubluetooth.FLAG_READ
ubluetooth.FLAG_WRITE
ubluetooth.FLAG_NOTIFY
ble_advertising.py(BLE广播)
 1# Helpers for generating BLE advertising payloads.
 2
 3from micropython import const
 4import struct
 5import bluetooth
 6
 7# Advertising payloads are repeated packets of the following form:
 8#   1 byte data length (N + 1)
 9#   1 byte type (see constants below)
10#   N bytes type-specific data
11
12_ADV_TYPE_FLAGS = const(0x01)
13_ADV_TYPE_NAME = const(0x09)
14_ADV_TYPE_UUID16_COMPLETE = const(0x3)
15_ADV_TYPE_UUID32_COMPLETE = const(0x5)
16_ADV_TYPE_UUID128_COMPLETE = const(0x7)
17_ADV_TYPE_UUID16_MORE = const(0x2)
18_ADV_TYPE_UUID32_MORE = const(0x4)
19_ADV_TYPE_UUID128_MORE = const(0x6)
20_ADV_TYPE_APPEARANCE = const(0x19)
21
22
23# Generate a payload to be passed to gap_advertise(adv_data=...).
24def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0):
25    payload = bytearray()
26
27    def _append(adv_type, value):
28        nonlocal payload
29        payload += struct.pack('BB', len(value) + 1, adv_type) + value
30
31    _append(_ADV_TYPE_FLAGS, struct.pack('B', (0x01 if limited_disc else 0x02) + (0x00 if br_edr else 0x04)))
32
33    if name:
34        _append(_ADV_TYPE_NAME, name)
35
36    if services:
37        for uuid in services:
38            b = bytes(uuid)
39            if len(b) == 2:
40                _append(_ADV_TYPE_UUID16_COMPLETE, b)
41            elif len(b) == 4:
42                _append(_ADV_TYPE_UUID32_COMPLETE, b)
43            elif len(b) == 16:
44                _append(_ADV_TYPE_UUID128_COMPLETE, b)
45
46    # See org.bluetooth.characteristic.gap.appearance.xml
47    _append(_ADV_TYPE_APPEARANCE, struct.pack('<h', appearance))
48
49    return payload
50
51
52def decode_field(payload, adv_type):
53    i = 0
54    result = []
55    while i + 1 < len(payload):
56        if payload[i + 1] == adv_type:
57            result.append(payload[i + 2:i + payload[i] + 1])
58        i += 1 + payload[i]
59    return result
60
61
62def decode_name(payload):
63    n = decode_field(payload, _ADV_TYPE_NAME)
64    return str(n[0], 'utf-8') if n else ''
65
66
67def decode_services(payload):
68    services = []
69    for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE):
70        services.append(bluetooth.UUID(struct.unpack('<h', u)[0]))
71    for u in decode_field(payload, _ADV_TYPE_UUID32_COMPLETE):
72        services.append(bluetooth.UUID(struct.unpack('<d', u)[0]))
73    for u in decode_field(payload, _ADV_TYPE_UUID128_COMPLETE):
74        services.append(bluetooth.UUID(u))
75    return services
76
77
78def demo():
79    payload = advertising_payload(name='micropython', services=[bluetooth.UUID(0x181A), bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E')])
80    print(payload)
81    print(decode_name(payload))
82    print(decode_services(payload))
83
84if __name__ == '__main__':
85    demo()
这个例子演示了一个简单的温度传感器外设
 1# This example demonstrates a simple temperature sensor peripheral.
 2#
 3# The sensor's local value updates every second, and it will notify
 4# any connected central every 10 seconds.
 5
 6import bluetooth
 7import random
 8import struct
 9import time
10from ble_advertising import advertising_payload
11
12from micropython import const
13_IRQ_CENTRAL_CONNECT                 = const(1 << 0)
14_IRQ_CENTRAL_DISCONNECT              = const(1 << 1)
15
16# org.bluetooth.service.environmental_sensing
17_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
18# org.bluetooth.characteristic.temperature
19_TEMP_CHAR = (bluetooth.UUID(0x2A6E), bluetooth.FLAG_READ|bluetooth.FLAG_NOTIFY,)
20_ENV_SENSE_SERVICE = (_ENV_SENSE_UUID, (_TEMP_CHAR,),)
21
22# org.bluetooth.characteristic.gap.appearance.xml
23_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
24
25class BLETemperature:
26    def __init__(self, ble, name='mpy-temp'):
27        self._ble = ble
28        self._ble.active(True)
29        self._ble.irq(handler=self._irq)
30        ((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
31        self._connections = set()
32        self._payload = advertising_payload(name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER)
33        self._advertise()
34
35    def _irq(self, event, data):
36        # Track connections so we can send notifications.
37        if event == _IRQ_CENTRAL_CONNECT:
38            conn_handle, _, _, = data
39            self._connections.add(conn_handle)
40        elif event == _IRQ_CENTRAL_DISCONNECT:
41            conn_handle, _, _, = data
42            self._connections.remove(conn_handle)
43            # Start advertising again to allow a new connection.
44            self._advertise()
45
46    def set_temperature(self, temp_deg_c, notify=False):
47        # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
48        # Write the local value, ready for a central to read.
49        self._ble.gatts_write(self._handle, struct.pack('<h', int(temp_deg_c * 100)))
50        if notify:
51            for conn_handle in self._connections:
52                # Notify connected centrals to issue a read.
53                self._ble.gatts_notify(conn_handle, self._handle)
54
55    def _advertise(self, interval_us=500000):
56        self._ble.gap_advertise(interval_us, adv_data=self._payload)
57
58
59def demo():
60    ble = bluetooth.BLE()
61    temp = BLETemperature(ble)
62
63    t = 25
64    i = 0
65
66    while True:
67        # Write every second, notify every 10 seconds.
68        i = (i + 1) % 10
69        temp.set_temperature(t, notify=i == 0)
70        # Random walk the temperature.
71        t += random.uniform(-0.5, 0.5)
72        time.sleep_ms(1000)
73
74
75if __name__ == '__main__':
76    demo()