ubluetooth — Bluetooth Low Energy

The module provides Bluetooth Low Energy control interface. Currently, it supports BLE in central, peripheral, broadcast and observer roles, and the device can run in multiple roles simultaneously.

This API is designed to match the BLE protocol and provides building blocks for more advanced abstractions (such as specific device types).

Note

The module is still under development and its classes, functions, methods and constants may change.

BLE Class

Construct

class ubluetooth.BLE

Returns BLE Object

Configure

BLE.active([active])

(Optional)Change the active state of the BLE wireless and return to the current state.

Before using any other method of this class, the radio to be at active state.

BLE.config('param')
BLE.config(param=value, ...)

Obtain or set the configuration value of the BLE interface. In order to obtain a value, parameter names should be quoted in strings, and only one parameter is queried at a time. To set the value, use the keyword syntax, you can set one or more parameters at a time.

The current supported values are:

  • 'mac': Returns the device MAC address. If the device has a fixed address (such as PYBD), it is returned. Otherwise (such as ESP32), when the BLE interface is active, a random address will be generated.
  • 'rxbuf': Set the size (in bytes) of the internal buffer used to store incoming events. This buffer is a global buffer for the entire BLE driver, so it can handle incoming data for all events (including all characteristics). Increasing this value can better handle bursts of incoming data (for example, scan results) and enable the central device to receive larger feature values.

Event Handler

BLE.irq(handler, trigger=0xffff)

Register callbacks for events in the BLE stack. The handler receives two parameters, event (See the event code below) and data (Is a specific event tuple of values).

The optional trigger parameter allows you to set a mask for events of interest to the program. The default is all events.

Note: The items in the addr, adv_data and uuid uples are referenced data management ubluetooth module (i.e. the same instance will be reused multiple times to the event handler). If your program wants to use this data outside of the handler, it must first copy them, for example using bytes(addr) or bluetooth.UUID(uuid) .

An event handler displays all possible events:

def bt_irq(event, data):
    if event == _IRQ_CENTRAL_CONNECT:
        # The central device is already connected to this peripheral device
        conn_handle, addr_type, addr = data
    elif event == _IRQ_CENTRAL_DISCONNECT:
        # The central device has been disconnected from this peripheral device
        conn_handle, addr_type, addr = data
    elif event == _IRQ_GATTS_WRITE:
        # The central device has written this feature or descriptor
        conn_handle, attr_handle = data
    elif event == _IRQ_GATTS_READ_REQUEST:
        # The central device has issued a read request. Note: This is a hardware IRQ
        # Return NONE to reject the read operation
        # Note: This event does not support ESP32.
        conn_handle, attr_handle = data
    elif event == _IRQ_SCAN_RESULT:
        # The result of a scan
        addr_type, addr, connectable, rssi, adv_data = data
    elif event == _IRQ_SCAN_COMPLETE:
        # Scan duration has been completed or manually stopped
        pass
    elif event == _IRQ_PERIPHERAL_CONNECT:
        #  gap_connect() connected
        conn_handle, addr_type, addr = data
    elif event == _IRQ_PERIPHERAL_DISCONNECT:
        # Connected peripherals are disconnected
        conn_handle, addr_type, addr = data
    elif event == _IRQ_GATTC_SERVICE_RESULT:
        # Call gattc_discover_services() for each service found
        conn_handle, start_handle, end_handle, uuid = data
    elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
        # Call gattc_discover_services() every feature found
        conn_handle, def_handle, value_handle, properties, uuid = data
    elif event == _IRQ_GATTC_DESCRIPTOR_RESULT:
        # Call gattc_discover_descriptors() every descriptor found
        conn_handle, dsc_handle, uuid = data
    elif event == _IRQ_GATTC_READ_RESULT:
        # gattc_read() completed
        conn_handle, value_handle, char_data = data
    elif event == _IRQ_GATTC_WRITE_STATUS:
        # gattc_write() completed
        conn_handle, value_handle, status = data
    elif event == _IRQ_GATTC_NOTIFY:
        # The peripheral device has issued a notification request
        conn_handle, value_handle, notify_data = data
    elif event == _IRQ_GATTC_INDICATE:
        # Peripheral equipment issues instructions
        conn_handle, value_handle, notify_data = data

Event code:

from micropython import const
_IRQ_CENTRAL_CONNECT                 = const(1 << 0)
_IRQ_CENTRAL_DISCONNECT              = const(1 << 1)
_IRQ_GATTS_WRITE                     = const(1 << 2)
_IRQ_GATTS_READ_REQUEST              = const(1 << 3)
_IRQ_SCAN_RESULT                     = const(1 << 4)
_IRQ_SCAN_COMPLETE                   = const(1 << 5)
_IRQ_PERIPHERAL_CONNECT              = const(1 << 6)
_IRQ_PERIPHERAL_DISCONNECT           = const(1 << 7)
_IRQ_GATTC_SERVICE_RESULT            = const(1 << 8)
_IRQ_GATTC_CHARACTERISTIC_RESULT     = const(1 << 9)
_IRQ_GATTC_DESCRIPTOR_RESULT         = const(1 << 10)
_IRQ_GATTC_READ_RESULT               = const(1 << 11)
_IRQ_GATTC_WRITE_STATUS              = const(1 << 12)
_IRQ_GATTC_NOTIFY                    = const(1 << 13)
_IRQ_GATTC_INDICATE                  = const(1 << 14)

To save space in the firmware, these constants are not included in ubluetooth . Add what you need from the above list to your program.

Advertiser

BLE.gap_advertise(interval_us, adv_data=None, resp_data=None, connectable=True)

Start broadcasting at the specified time interval (in microseconds). This interval will be rounded to the nearest 625 microseconds. To stop broadcasting, set interval_us to NONE.

adv_data and resp_data can be any buffer type (such as bytes, bytearray, str)。 adv_data is included in all broadcasts and resp_data 以is sent in response to a valid scan.

Note:If adv_data (or resp_data )is NONE, Then the data passed to the previous call gap_advertise will be reused. In this way, the broadcaster can use to resume the broadcast gap_advertise(interval_us) . In order to clear the broadcast load, pass an empty bytes, that is b’’ .

Scanner

BLE.gap_scan(duration_ms[, interval_us][, window_us])
Run a scan operation that lasts for a specified time (in milliseconds).

To scan indefinitely, set duration_ms to 0 . To stop scanning, set duration_ms to None .

Use interval_us and window_us to choose to configure the duty cycle. The scanner will run once every microsecond window_us microseconds for a total duration of milliseconds. The default interval and window are 1.28 seconds and 11.25 milliseconds, respectively.

For each scan result, _IRQ_SCAN_RESULT will raise the event.

When the scan is stopped (due to the end of the duration or an explicit stop), _IRQ_SCAN_COMPLETE will raise the event.

GATT Server

BLE gatt have a set of registration services. Each service may contain features, and each feature has a value. Features can also contain descriptors, which themselves have values.

These values are stored locally and accessed through the “value handle” generated during the service registration process. In addition, the peripheral device can “notify” the feature to the connected central device through the connection handle.

The default feature and descriptor maximum is 20 bytes. Anything written to them by the central device will be truncated to this length. However, any local write operation will increase the maximum size. So if you want to write longer data, please use gatts_write . Such as , gatts_write(char_handle, bytes(100))

BLE.gatts_register_services(services_definition)

Configure peripheral devices with specified services to replace all existing services.

services_definition is a list of services, where each service is a binary tuple containing a UUID and feature list.

Each feature is a 2 or 3 element tuple containing UUIDflags value and an optional descriptor list.

Each descriptor is a binary tuple containing UUID and a flags value.

flags is a bitwise or combined ubluetooth.FLAG_READubluetooth.FLAG_WRITE and ubluetooth.FLAG_NOTIFY . as defined below:

The return value is a list of tuples (one element per service) (each element is a value handle). Features and descriptor handles are flattened into the same tuple in the defined order.

Examples of registers of two services (Heart Rate, and Nordic UART):

HR_UUID = bluetooth.UUID(0x180D)
HR_CHAR = (bluetooth.UUID(0x2A37), bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,)
HR_SERVICE = (HR_UUID, (HR_CHAR,),)
UART_UUID = bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E')
UART_TX = (bluetooth.UUID('6E400003-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,)
UART_RX = (bluetooth.UUID('6E400002-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_WRITE,)
UART_SERVICE = (UART_UUID, (UART_TX, UART_RX,),)
SERVICES = (HR_SERVICE, UART_SERVICE,)
( (hr,), (tx, rx,), ) = bt.gatts_register_services(SERVICES)

These three value handles (hr, tx, rx) can be used with gatts_read, gatts_write, and gatts_notify

Note:To stop advertising, then registering for the service.

BLE.gatts_read(value_handle)

Read local value handle (the value is determined by gatts_write or write by remote central device).

BLE.gatts_write(value_handle, data)

Write a local value handle, the value can be read by the central device.

BLE.gatts_notify(conn_handle, value_handle[, data])

Notify the connected central device that this value has changed, and should issue a reading of the current value of this peripheral.

If data is specified, the value is sent to the central device as part of the notification, thereby avoiding the need for a separate read request. Please note that this will not update the stored local value.

BLE.gatts_set_buffer(value_handle, len, append=False)

Set internal buffer size (in bytes). This will limit the maximum value that can be received. The default value is 20. Setting append to True will append all remote writes to the current value instead of replacing the current value. This can buffer up to len bytes. Upon using gatts_read ,the value will be cleared after reading. This feature is useful when implementing something, such as Nordic UART service。

GATT Client


BLE.gap_connect(addr_type, addr, scan_duration_ms=2000)

Successfully connected to peripheral devices, the _IRQ_PERIPHERAL_CONNECT event will be triggered.

BLE.gap_disconnect(conn_handle)

Upon successfully disconnect the specified connection handle, the _IRQ_PERIPHERAL_DISCONNECT event will be triggered. If the connection handle is not connected, return False , otherwise return True .

BLE.gattc_discover_services(conn_handle)

Query services of connected peripheral devices.

For each service discovered, the _IRQ_GATTC_SERVICE_RESULT event will be triggered.

BLE.gattc_discover_characteristics(conn_handle, start_handle, end_handle)

Query the characteristics within the specified range on the connected peripheral device.

Every time a feature is found, the _IRQ_GATTC_CHARACTERISTIC_RESULT event will be triggered.

BLE.gattc_discover_descriptors(conn_handle, start_handle, end_handle)

Query the descriptors in the specified range in the connected peripheral device.

Every time a feature is found, the _IRQ_GATTC_DESCRIPTOR_RESULT event will be triggered.

BLE.gattc_read(conn_handle, value_handle)

Send a remote read to the connected peripheral device to obtain the specified characteristic or descriptor handle.

If successfully, the _IRQ_GATTC_READ_RESULT event will be triggered.

BLE.gattc_write(conn_handle, value_handle, data, mode=0)

Send a remote write operation to the connected peripheral device for the specified feature or descriptor handle.

  • mode

    • mode=0 (Default) is an unresponsive write operation:The write operation will be sent to the remote peripheral device, but no confirmation message will be returned, and no event will be raised.
    • mode=1 i is the response write:Request a remote peripheral device to send a response / acknowledgement that it has received data.

If a response is received from a remote peripheral device,the _IRQ_GATTC_WRITE_STATUS event will be triggered.

UUID class

Create

class ubluetooth.UUID(value)

Create a UUID instance with the specified value。

The value can be:

  • A 16-bit integer. For example 0x2908.
  • 128-bit UUID string. Such as '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'.

Constant

ubluetooth.FLAG_READ
ubluetooth.FLAG_WRITE
ubluetooth.FLAG_NOTIFY
ble_advertising.py(BLE advertising)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# Helpers for generating BLE advertising payloads.

from micropython import const
import struct
import bluetooth

# Advertising payloads are repeated packets of the following form:
#   1 byte data length (N + 1)
#   1 byte type (see constants below)
#   N bytes type-specific data

_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_UUID16_MORE = const(0x2)
_ADV_TYPE_UUID32_MORE = const(0x4)
_ADV_TYPE_UUID128_MORE = const(0x6)
_ADV_TYPE_APPEARANCE = const(0x19)


# Generate a payload to be passed to gap_advertise(adv_data=...).
def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0):
    payload = bytearray()

    def _append(adv_type, value):
        nonlocal payload
        payload += struct.pack('BB', len(value) + 1, adv_type) + value

    _append(_ADV_TYPE_FLAGS, struct.pack('B', (0x01 if limited_disc else 0x02) + (0x00 if br_edr else 0x04)))

    if name:
        _append(_ADV_TYPE_NAME, name)

    if services:
        for uuid in services:
            b = bytes(uuid)
            if len(b) == 2:
                _append(_ADV_TYPE_UUID16_COMPLETE, b)
            elif len(b) == 4:
                _append(_ADV_TYPE_UUID32_COMPLETE, b)
            elif len(b) == 16:
                _append(_ADV_TYPE_UUID128_COMPLETE, b)

    # See org.bluetooth.characteristic.gap.appearance.xml
    _append(_ADV_TYPE_APPEARANCE, struct.pack('<h', appearance))

    return payload


def decode_field(payload, adv_type):
    i = 0
    result = []
    while i + 1 < len(payload):
        if payload[i + 1] == adv_type:
            result.append(payload[i + 2:i + payload[i] + 1])
        i += 1 + payload[i]
    return result


def decode_name(payload):
    n = decode_field(payload, _ADV_TYPE_NAME)
    return str(n[0], 'utf-8') if n else ''


def decode_services(payload):
    services = []
    for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE):
        services.append(bluetooth.UUID(struct.unpack('<h', u)[0]))
    for u in decode_field(payload, _ADV_TYPE_UUID32_COMPLETE):
        services.append(bluetooth.UUID(struct.unpack('<d', u)[0]))
    for u in decode_field(payload, _ADV_TYPE_UUID128_COMPLETE):
        services.append(bluetooth.UUID(u))
    return services


def demo():
    payload = advertising_payload(name='micropython', services=[bluetooth.UUID(0x181A), bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E')])
    print(payload)
    print(decode_name(payload))
    print(decode_services(payload))

if __name__ == '__main__':
    demo()
This example demonstrates a simple temperature sensor peripheral
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# This example demonstrates a simple temperature sensor peripheral.
#
# The sensor's local value updates every second, and it will notify
# any connected central every 10 seconds.

import bluetooth
import random
import struct
import time
from ble_advertising import advertising_payload

from micropython import const
_IRQ_CENTRAL_CONNECT                 = const(1 << 0)
_IRQ_CENTRAL_DISCONNECT              = const(1 << 1)

# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_TEMP_CHAR = (bluetooth.UUID(0x2A6E), bluetooth.FLAG_READ|bluetooth.FLAG_NOTIFY,)
_ENV_SENSE_SERVICE = (_ENV_SENSE_UUID, (_TEMP_CHAR,),)

# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)

class BLETemperature:
    def __init__(self, ble, name='mpy-temp'):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(handler=self._irq)
        ((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
        self._connections = set()
        self._payload = advertising_payload(name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER)
        self._advertise()

    def _irq(self, event, data):
        # Track connections so we can send notifications.
        if event == _IRQ_CENTRAL_CONNECT:
            conn_handle, _, _, = data
            self._connections.add(conn_handle)
        elif event == _IRQ_CENTRAL_DISCONNECT:
            conn_handle, _, _, = data
            self._connections.remove(conn_handle)
            # Start advertising again to allow a new connection.
            self._advertise()

    def set_temperature(self, temp_deg_c, notify=False):
        # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
        # Write the local value, ready for a central to read.
        self._ble.gatts_write(self._handle, struct.pack('<h', int(temp_deg_c * 100)))
        if notify:
            for conn_handle in self._connections:
                # Notify connected centrals to issue a read.
                self._ble.gatts_notify(conn_handle, self._handle)

    def _advertise(self, interval_us=500000):
        self._ble.gap_advertise(interval_us, adv_data=self._payload)


def demo():
    ble = bluetooth.BLE()
    temp = BLETemperature(ble)

    t = 25
    i = 0

    while True:
        # Write every second, notify every 10 seconds.
        i = (i + 1) % 10
        temp.set_temperature(t, notify=i == 0)
        # Random walk the temperature.
        t += random.uniform(-0.5, 0.5)
        time.sleep_ms(1000)


if __name__ == '__main__':
    demo()