ubluetooth
— 低功耗蓝牙¶
该模块提供低功耗蓝牙控制接口。当前,它在中央,外围设备,广播和观察者角色中支持蓝牙低功耗(BLE),并且设备可以同时在多个角色中运行。
此API旨在与低功耗蓝牙协议相匹配,并为更高级的抽象(如特定的设备类型)提供构建模块。
Note
该模块仍在开发中,其类,功能,方法和常量可能会发生变化。
BLE 类¶
配置¶
- BLE.active([active])¶
(可选)更改BLE无线电的活动状态,并返回当前状态。
在使用此类的任何其他方法之前,必须使无线电处于活动状态。
- BLE.config('param')¶
- BLE.config(param=value, ...)
获取或设置BLE接口的配置值。为了获得一个值,参数名称应该用字符串引号,并且一次只查询一个参数。要设置值,请使用关键字语法,一次可以设置一个或多个参数。
当前支持的值为:
'mac'
: 返回设备的MAC地址。如果设备具有固定地址(例如PYBD),则将其返回。否则(例如ESP32),当BLE接口处于活动状态时,将生成一个随机地址。'rxbuf'
: 设置用于存储传入事件的内部缓冲区的大小(以字节为单位)。该缓冲区是整个BLE驱动程序的全局缓冲区,因此可以处理所有事件(包括所有特征)的传入数据。增加此值可以更好地处理突发的传入数据(例如,扫描结果),并可以使中央设备接收较大的特征值。
事件处理¶
- BLE.irq(handler, trigger=0xffff)¶
为BLE堆栈中的事件注册回调。handler接收两个参数,
event
(看下文的事件代码)和data
(其是值的特定事件元组)。可选的 trigger 参数允许您设置程序感兴趣的事件的掩码。默认值为所有事件。
注:
addr
,adv_data
和uuid
元组中的项是引用的数据管理ubluetooth
模块(即相同的实例将被重新使用多次调用到事件处理程序)。 如果您的程序想在处理程序之外使用此数据,则它必须首先复制它们,例如使用bytes(addr)
orbluetooth.UUID(uuid)
。一个事件处理程序显示所有可能的事件:
def bt_irq(event, data): if event == _IRQ_CENTRAL_CONNECT: # 中央设备已经连接到这个外围设备 conn_handle, addr_type, addr = data elif event == _IRQ_CENTRAL_DISCONNECT: # 中央设备已与此外围设备断开 conn_handle, addr_type, addr = data elif event == _IRQ_GATTS_WRITE: # 中央设备已写入此特征或描述符 conn_handle, attr_handle = data elif event == _IRQ_GATTS_READ_REQUEST: # 中央设备已发出读请求. Note: 这是一个硬件IRQ # 返回None来拒绝读操作 # Note: 这事件不支持 ESP32. conn_handle, attr_handle = data elif event == _IRQ_SCAN_RESULT: # 一次扫描的结果 addr_type, addr, connectable, rssi, adv_data = data elif event == _IRQ_SCAN_COMPLETE: # 扫描持续时间已完成或手动停止 pass elif event == _IRQ_PERIPHERAL_CONNECT: # gap_connect()连接成功 conn_handle, addr_type, addr = data elif event == _IRQ_PERIPHERAL_DISCONNECT: # 已连接的外围设备已断开 conn_handle, addr_type, addr = data elif event == _IRQ_GATTC_SERVICE_RESULT: # 调用gattc_discover_services()找到的每个服务 conn_handle, start_handle, end_handle, uuid = data elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT: # 调用gattc_discover_services()找到的每个特征 conn_handle, def_handle, value_handle, properties, uuid = data elif event == _IRQ_GATTC_DESCRIPTOR_RESULT: # 调用gattc_discover_descriptors()找到的每个描述符 conn_handle, dsc_handle, uuid = data elif event == _IRQ_GATTC_READ_RESULT: # gattc_read() 已完成 conn_handle, value_handle, char_data = data elif event == _IRQ_GATTC_WRITE_STATUS: # gattc_write() 已完成 conn_handle, value_handle, status = data elif event == _IRQ_GATTC_NOTIFY: # 外围设备已发出通知请求 conn_handle, value_handle, notify_data = data elif event == _IRQ_GATTC_INDICATE: #外围设备发出指示请求 conn_handle, value_handle, notify_data = data
事件代码:
from micropython import const
_IRQ_CENTRAL_CONNECT = const(1 << 0)
_IRQ_CENTRAL_DISCONNECT = const(1 << 1)
_IRQ_GATTS_WRITE = const(1 << 2)
_IRQ_GATTS_READ_REQUEST = const(1 << 3)
_IRQ_SCAN_RESULT = const(1 << 4)
_IRQ_SCAN_COMPLETE = const(1 << 5)
_IRQ_PERIPHERAL_CONNECT = const(1 << 6)
_IRQ_PERIPHERAL_DISCONNECT = const(1 << 7)
_IRQ_GATTC_SERVICE_RESULT = const(1 << 8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(1 << 9)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(1 << 10)
_IRQ_GATTC_READ_RESULT = const(1 << 11)
_IRQ_GATTC_WRITE_STATUS = const(1 << 12)
_IRQ_GATTC_NOTIFY = const(1 << 13)
_IRQ_GATTC_INDICATE = const(1 << 14)
为了节省固件中的空间,这些常量不包括在 ubluetooth
。将您需要的从上面的列表中添加到您的程序中。
广播者(Advertiser)¶
- BLE.gap_advertise(interval_us, adv_data=None, resp_data=None, connectable=True)¶
以指定的时间间隔(以微秒为单位)开始广播。该间隔将四舍五入到最接近的625微妙。要停止广播,请将 interval_us 设置 为None。
adv_data 和 resp_data 可以是任何 buffer 类型 (例如
bytes
,bytearray
,str
)。 adv_data 包含在所有广播中,并发送 resp_data 以应答有效的扫描。注意:如果 adv_data (或 resp_data )为None,则将重用传递到上一个调用的数据
gap_advertise
。 这样一来,广播者就可以使用来恢复广播gap_advertise(interval_us)
。为了清除广播负载,传递一个空的bytes,即b’’。
观察者 (Scanner)¶
- BLE.gap_scan(duration_ms[, interval_us][, window_us])¶
运行持续指定时间(以毫秒为单位)的扫描操作。
要无限期扫描,请将 duration_ms 设置为
0
。要停止扫描,请将 duration_ms 设置为None
。使用 interval_us 和 window_us 可以选择配置占空比。 扫描器将每间隔一微秒运行一次 window_us 微秒,总计持续时间为毫秒。默认间隔和窗口分别为1.28秒和11.25毫秒。
对于每个扫描结果,_IRQ_SCAN_RESULT 将引发该事件。
停止扫描(由于持续时间结束或明确停止)时,_IRQ_SCAN_COMPLETE 将引发该事件。
外围设备 (GATT Server)¶
BLE外围设备具有一组注册服务。每个服务可能包含特性,每个特性都有一个值。特征也可以包含描述符,描述符本身具有值。
这些值存储在本地,并通过在服务注册过程中生成的“值柄”进行访问。它们也可以被远程的中央设备读取或写入。 此外,外围设备可以通过连接句柄将特征“通知”到已连接的中央设备。
特征和描述符的默认最大为20个字节。任何由中央设备写给它们的都会被截短到这个长度。但是,任何本地写操作都会增加最大大小,
所以,如果你写想更长的数据,请注册后使用 gatts_write
。例如, gatts_write(char_handle, bytes(100))
- BLE.gatts_register_services(services_definition)¶
使用指定的服务配置外围设备,替换所有现有服务。
services_definition 是一个服务的列表,其中每个服务都是一个包含UUID和特征列表的二元元组。
每个特征都是一个包含 UUID,flags 值以及一个可选的描述符列表的2或3元素元组。
每个描述符是一个包含UUID和一个flags值的二元元组。
flags是一个按位或组合的
ubluetooth.FLAG_READ
,ubluetooth.FLAG_WRITE
和ubluetooth.FLAG_NOTIFY
。如下文所定义的值:返回值是元组的列表(每个服务一个元素)(每个元素是一个值句柄)。特征和描述符句柄按照定义的顺序被展平到相同的元组中。
以下示例注册了两个服务 (Heart Rate, and Nordic UART):
HR_UUID = bluetooth.UUID(0x180D) HR_CHAR = (bluetooth.UUID(0x2A37), bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,) HR_SERVICE = (HR_UUID, (HR_CHAR,),) UART_UUID = bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E') UART_TX = (bluetooth.UUID('6E400003-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,) UART_RX = (bluetooth.UUID('6E400002-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_WRITE,) UART_SERVICE = (UART_UUID, (UART_TX, UART_RX,),) SERVICES = (HR_SERVICE, UART_SERVICE,) ( (hr,), (tx, rx,), ) = bt.gatts_register_services(SERVICES)
这三个值柄(
hr
,tx
,rx
)可与使用gatts_read
,gatts_write
, 和gatts_notify
。注意:注册服务之前,必须停止广告。
- BLE.gatts_read(value_handle)¶
读取本地的值柄 (该值由
gatts_write
或远程的中央设备写入)。
- BLE.gatts_write(value_handle, data)¶
写入本地的值柄,该值可由中央设备读取。
- BLE.gatts_notify(conn_handle, value_handle[, data])¶
通知连接的中央设备此值已更改,并且应发出此外围设备的当前值的读取值。
如果指定了数据,则将该值作为通知的一部分发送到中央设备,从而避免了需要单独的读取请求的情况。请注意,这不会更新存储的本地值。
- BLE.gatts_set_buffer(value_handle, len, append=False)¶
设置内部缓冲区大小(以字节为单位)。这将限制可以接收的最大值。默认值为20。 将
append
设置为 True 会将所有远程写入追加到当前值,而不是替换当前值。这样最多可以缓冲len个字节。 使用时gatts_read
,将在读取后清除该值。这个功能在实现某些东西时很有用,比如Nordic UART服务。
中央设备 (GATT Client)¶
- BLE.gap_connect(addr_type, addr, scan_duration_ms=2000)¶
连接到外围设备。成功,将触发
_IRQ_PERIPHERAL_CONNECT
事件。
- BLE.gap_disconnect(conn_handle)¶
断开指定的连接句柄。成功,将触发
_IRQ_PERIPHERAL_DISCONNECT
事件。 如果未连接连接句柄,返回False
,否则返回True
。
- BLE.gattc_discover_services(conn_handle)¶
查询已连接的外围设备的服务。
对于发现的每个服务, 会触发
_IRQ_GATTC_SERVICE_RESULT
事件。
- BLE.gattc_discover_characteristics(conn_handle, start_handle, end_handle)¶
在已连接的外围设备上查询指定范围内的特征。 每次特征发现,会触发
_IRQ_GATTC_CHARACTERISTIC_RESULT
事件。
- BLE.gattc_discover_descriptors(conn_handle, start_handle, end_handle)¶
在连接的外围设备中查询指定范围内的描述符。
每次特征发现,会触发
_IRQ_GATTC_DESCRIPTOR_RESULT
事件。
- BLE.gattc_read(conn_handle, value_handle)¶
向连接的外围设备发出远程读取,以获取指定的特性或描述符句柄。
如果成功,会触发
_IRQ_GATTC_READ_RESULT
事件
- BLE.gattc_write(conn_handle, value_handle, data, mode=0)¶
针对指定的特征或描述符句柄向连接的外围设备发出远程写操作。
mode
mode=0
(默认)是无响应写操作:写操作将发送到远程外围设备,但不会返回确认信息,也不会引发任何事件。mode=1
i是响应写入:请求远程外围设备发送其已接收到数据的响应/确认。
如果从远程外围设备收到响应,
_IRQ_GATTC_WRITE_STATUS
事件将触发。
UUID 类¶
构建¶
- class ubluetooth.UUID(value)¶
用指定的值创建一个UUID实例。
该值可以是:
一个16位整数。例如
0x2908
.128位UUID字符串。例如
'6E400001-B5A3-F393-E0A9-E50E24DCCA9E'
.
常量¶
1# Helpers for generating BLE advertising payloads.
2
3from micropython import const
4import struct
5import bluetooth
6
7# Advertising payloads are repeated packets of the following form:
8# 1 byte data length (N + 1)
9# 1 byte type (see constants below)
10# N bytes type-specific data
11
12_ADV_TYPE_FLAGS = const(0x01)
13_ADV_TYPE_NAME = const(0x09)
14_ADV_TYPE_UUID16_COMPLETE = const(0x3)
15_ADV_TYPE_UUID32_COMPLETE = const(0x5)
16_ADV_TYPE_UUID128_COMPLETE = const(0x7)
17_ADV_TYPE_UUID16_MORE = const(0x2)
18_ADV_TYPE_UUID32_MORE = const(0x4)
19_ADV_TYPE_UUID128_MORE = const(0x6)
20_ADV_TYPE_APPEARANCE = const(0x19)
21
22
23# Generate a payload to be passed to gap_advertise(adv_data=...).
24def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0):
25 payload = bytearray()
26
27 def _append(adv_type, value):
28 nonlocal payload
29 payload += struct.pack('BB', len(value) + 1, adv_type) + value
30
31 _append(_ADV_TYPE_FLAGS, struct.pack('B', (0x01 if limited_disc else 0x02) + (0x00 if br_edr else 0x04)))
32
33 if name:
34 _append(_ADV_TYPE_NAME, name)
35
36 if services:
37 for uuid in services:
38 b = bytes(uuid)
39 if len(b) == 2:
40 _append(_ADV_TYPE_UUID16_COMPLETE, b)
41 elif len(b) == 4:
42 _append(_ADV_TYPE_UUID32_COMPLETE, b)
43 elif len(b) == 16:
44 _append(_ADV_TYPE_UUID128_COMPLETE, b)
45
46 # See org.bluetooth.characteristic.gap.appearance.xml
47 _append(_ADV_TYPE_APPEARANCE, struct.pack('<h', appearance))
48
49 return payload
50
51
52def decode_field(payload, adv_type):
53 i = 0
54 result = []
55 while i + 1 < len(payload):
56 if payload[i + 1] == adv_type:
57 result.append(payload[i + 2:i + payload[i] + 1])
58 i += 1 + payload[i]
59 return result
60
61
62def decode_name(payload):
63 n = decode_field(payload, _ADV_TYPE_NAME)
64 return str(n[0], 'utf-8') if n else ''
65
66
67def decode_services(payload):
68 services = []
69 for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE):
70 services.append(bluetooth.UUID(struct.unpack('<h', u)[0]))
71 for u in decode_field(payload, _ADV_TYPE_UUID32_COMPLETE):
72 services.append(bluetooth.UUID(struct.unpack('<d', u)[0]))
73 for u in decode_field(payload, _ADV_TYPE_UUID128_COMPLETE):
74 services.append(bluetooth.UUID(u))
75 return services
76
77
78def demo():
79 payload = advertising_payload(name='micropython', services=[bluetooth.UUID(0x181A), bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E')])
80 print(payload)
81 print(decode_name(payload))
82 print(decode_services(payload))
83
84if __name__ == '__main__':
85 demo()
1# This example demonstrates a simple temperature sensor peripheral.
2#
3# The sensor's local value updates every second, and it will notify
4# any connected central every 10 seconds.
5
6import bluetooth
7import random
8import struct
9import time
10from ble_advertising import advertising_payload
11
12from micropython import const
13_IRQ_CENTRAL_CONNECT = const(1 << 0)
14_IRQ_CENTRAL_DISCONNECT = const(1 << 1)
15
16# org.bluetooth.service.environmental_sensing
17_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
18# org.bluetooth.characteristic.temperature
19_TEMP_CHAR = (bluetooth.UUID(0x2A6E), bluetooth.FLAG_READ|bluetooth.FLAG_NOTIFY,)
20_ENV_SENSE_SERVICE = (_ENV_SENSE_UUID, (_TEMP_CHAR,),)
21
22# org.bluetooth.characteristic.gap.appearance.xml
23_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
24
25class BLETemperature:
26 def __init__(self, ble, name='mpy-temp'):
27 self._ble = ble
28 self._ble.active(True)
29 self._ble.irq(handler=self._irq)
30 ((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
31 self._connections = set()
32 self._payload = advertising_payload(name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER)
33 self._advertise()
34
35 def _irq(self, event, data):
36 # Track connections so we can send notifications.
37 if event == _IRQ_CENTRAL_CONNECT:
38 conn_handle, _, _, = data
39 self._connections.add(conn_handle)
40 elif event == _IRQ_CENTRAL_DISCONNECT:
41 conn_handle, _, _, = data
42 self._connections.remove(conn_handle)
43 # Start advertising again to allow a new connection.
44 self._advertise()
45
46 def set_temperature(self, temp_deg_c, notify=False):
47 # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
48 # Write the local value, ready for a central to read.
49 self._ble.gatts_write(self._handle, struct.pack('<h', int(temp_deg_c * 100)))
50 if notify:
51 for conn_handle in self._connections:
52 # Notify connected centrals to issue a read.
53 self._ble.gatts_notify(conn_handle, self._handle)
54
55 def _advertise(self, interval_us=500000):
56 self._ble.gap_advertise(interval_us, adv_data=self._payload)
57
58
59def demo():
60 ble = bluetooth.BLE()
61 temp = BLETemperature(ble)
62
63 t = 25
64 i = 0
65
66 while True:
67 # Write every second, notify every 10 seconds.
68 i = (i + 1) % 10
69 temp.set_temperature(t, notify=i == 0)
70 # Random walk the temperature.
71 t += random.uniform(-0.5, 0.5)
72 time.sleep_ms(1000)
73
74
75if __name__ == '__main__':
76 demo()