avatarProto Bioengineering

Free AI web copilot to create summaries, insights and extended knowledge, download it at here

16549

Abstract

tion_">callback</span>(<span class="hljs-params">sender, data</span>): <span class="hljs-built_in">print</span>(<span class="hljs-string">f"<span class="hljs-subst">{sender}</span> -- <span class="hljs-subst">{data}</span>"</span>)</pre></div><p id="3375"><b>Note:</b> We use <a href="https://realpython.com/python-f-strings/">f-strings</a> (<code>f”{sender}”</code>) to format our output. They allow the use of {variables} directly inside of strings.</p><h2 id="6c72">The Full Script at This Point</h2><p id="eb41">Our code, <code>stream.py</code>, will now look like this:</p><div id="8cb8"><pre><span class="hljs-comment"># Connect to and subscribe to notification for multiple Movella DOTs</span>

<span class="hljs-keyword">import</span> asyncio <span class="hljs-keyword">from</span> bleak <span class="hljs-keyword">import</span> BleakClient

short_payload_char_uuid = <span class="hljs-string">"15172004-4947-11e9-8646-d663bd873d93"</span> addresses = [ <span class="hljs-string">"509808FF-ECFE-895D-C1FE-BE5AC5DB6204"</span>, <span class="hljs-string">"338312FA-C3D1-183F-325A-0726AFDBEB78"</span> ]

<span class="hljs-keyword">def</span> <span class="hljs-title function_">callback</span>(<span class="hljs-params">sender, data</span>): <span class="hljs-built_in">print</span>(<span class="hljs-string">f"<span class="hljs-subst">{sender}</span> -- <span class="hljs-subst">{data}</span>"</span>)

<span class="hljs-keyword">async</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">connect</span>(<span class="hljs-params">address</span>): <span class="hljs-keyword">async</span> <span class="hljs-keyword">with</span> BleakClient(address) <span class="hljs-keyword">as</span> client: <span class="hljs-built_in">print</span>(<span class="hljs-string">f"Client connection to `<span class="hljs-subst">{client.address}</span>: <span class="hljs-subst">{client.is_connected}</span>"</span>)

    <span class="hljs-keyword">await</span> client.start_notify(short_payload_char_uuid, callback)

<span class="hljs-keyword">async</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">main</span>(): <span class="hljs-keyword">await</span> asyncio.gather(*(connect(addr) <span class="hljs-keyword">for</span> addr <span class="hljs-keyword">in</span> addresses))

<span class="hljs-keyword">if</span> name == <span class="hljs-string">"main"</span>: asyncio.run(main())</pre></div><p id="9de8">Now, for every address in <code>addresses</code> , there is a <code>connect()</code> function running, which connects to the DOT and then subscribes to notifications. We have 2 devices for this example, so 2 <code>connect</code>s are running. If we had 5 addresses, gather would kick off 5 <code>connect</code> functions.</p><p id="2da5"><b>We don’t have any data yet though, because we need to turn on measurement in Step 4 and “listen” for data in Step 5.</b></p><h1 id="5090">Step 4: Set and Turn on Measurement Mode</h1><p id="7922">We’ll do these in one step, since they’re both done with a single function from Bleak, <code>write_gatt_char()</code>.</p><p id="da67">All we have to do is send a short string of numbers to the <code>Measurement Characteristic</code>. This string will tell the DOT <b>which of the 18 measurement modes</b> we want it to use and tell it to <b>turn on</b>.</p><p id="d9f7">For this tutorial, we’ll use <b>mode #6</b> below: <b>Free Acceleration</b>.</p><figure id="d355"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*BW8Tti2UHUKN3dDOOGv6AQ.png"><figcaption></figcaption></figure><p id="c831">And the string we send will look something like:</p><div id="43e2"><pre><span class="hljs-attr">binary_message</span> = b<span class="hljs-string">"\x01\x01\x06"</span></pre></div><p id="0f34">The <code>binary_message</code> has 3 hex numbers that specify:</p><ul><li><code>\x01</code> — A default value that the <a href="https://www.movella.com/support/software-documentation">DOT BLE docs</a> say is required</li><li><code>\x01</code> — whether measurement is on or not. <code>01</code> = on. <code>00</code> = off.</li><li><code>\x06</code> — specifies that we want <b>Free Acceleration</b> (option <code>6</code> from the measurement modes list)</li></ul><p id="c7bc">For this step,<b> we only have to add 3 lines to our script. The following 2:</b></p><div id="095c"><pre>binary_message = b<span class="hljs-string">"\x01\x01\x06"</span> <span class="hljs-keyword">await</span> client.write_gatt_char(measurement_characteristic_uuid, binary_message)</pre></div><p id="ed9f">And add this near the top of the file:</p><div id="57ee"><pre><span class="hljs-attr">measurement_char_uuid</span> = <span class="hljs-string">"15172001-4947-11e9-8646-d663bd873d93"</span></pre></div><p id="9407">Now, the full script is:</p><div id="d48b"><pre><span class="hljs-comment"># Turn on Free Acceleration mode for multiple Movella DOTs</span>

<span class="hljs-keyword">import</span> asyncio <span class="hljs-keyword">from</span> bleak <span class="hljs-keyword">import</span> BleakClient

measurement_char_uuid = <span class="hljs-string">"15172001-4947-11e9-8646-d663bd873d93"</span> short_payload_char_uuid = <span class="hljs-string">"15172004-4947-11e9-8646-d663bd873d93"</span> addresses = [ <span class="hljs-string">"509808FF-ECFE-895D-C1FE-BE5AC5DB6204"</span>, <span class="hljs-string">"338312FA-C3D1-183F-325A-0726AFDBEB78"</span> ]

<span class="hljs-keyword">def</span> <span class="hljs-title function_">callback</span>(<span class="hljs-params">sender, data</span>): <span class="hljs-built_in">print</span>(<span class="hljs-string">f"<span class="hljs-subst">{sender}</span> -- <span class="hljs-subst">{data}</span>"</span>)

<span class="hljs-keyword">async</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">connect</span>(<span class="hljs-params">address</span>): <span class="hljs-comment"># Connect to the DOT and stream data</span> <span class="hljs-keyword">async</span> <span class="hljs-keyword">with</span> BleakClient(address) <span class="hljs-keyword">as</span> client: <span class="hljs-built_in">print</span>(<span class="hljs-string">f"Client connection to `<span class="hljs-subst">{client.address}</span>: <span class="hljs-subst">{client.is_connected}</span>"</span>)

    <span class="hljs-comment"># Subscribe to data notifications</span>
    <span class="hljs-keyword">await</span> client.start_notify(short_payload_char_uuid, callback)

    <span class="hljs-comment"># Set and turn on measurement mode</span>
    binary_message = <span class="hljs-string">b"\x01\x01\x06"</span>
    <span class="hljs-keyword">await</span> client.write_gatt_char(measurement_char_uuid, binary_message)

<span class="hljs-keyword">async</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">main</span>(): <span class="hljs-keyword">await</span> asyncio.gather(*(connect(addr) <span class="hljs-keyword">for</span> addr <span class="hljs-keyword">in</span> addresses))

<span class="hljs-keyword">if</span> name == <span class="hljs-string">"main"</span>: asyncio.run(main())</pre></div><p id="d59f">Measurement mode is now on. Now, <b>we only need one more line of code in the next step to get data.</b></p><h1 id="67eb">Step 5: “Listen” for Data</h1><p id="478a">When we run the script from the last step, <b>it quits without giving us any data.</b></p><p id="135f"><b>To fix this, we need to tell our script to wait,</b> by using <code>asyncio.sleep()</code>:</p><div id="c17a"><pre><span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">10.0</span>) <span class="hljs-comment"># Stream data for 10.0 seconds.</span></pre></div><p id="2194">This will tell our code to <b>wait for Bluetooth messages</b> to come in from the DOT, which have our accelerometer data.</p><p id="a514">Add this below <code>write_gatt_char()</code> as:</p><div id="1cf4"><pre><span class="hljs-comment"># Set and turn on measurement mode</span> binary_message = <span class="hljs-string">b"\x01\x01\x06"</span> <span class="hljs-keyword">await</span> client.write_gatt_char(measurement_char_uuid, binary_message)

<span class="hljs-comment"># Stream data for 10 seconds</span> <span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">10.0</span>)</pre></div><p id="a21c"><b>The full script that can get raw data is:</b></p><div id="6020"><pre><span class="hljs-comment"># Stream Free Acceleration data from multiple Movella DOTs</span>

<span class="hljs-keyword">import</span> asyncio <span class="hljs-keyword">from</span> bleak <span class="hljs-keyword">import</span> BleakClient

measurement_char_uuid = <span class="hljs-string">"15172001-4947-11e9-8646-d663bd873d93"</span> short_payload_char_uuid = <span class="hljs-string">"15172004-4947-11e9-8646-d663bd873d93"</span> addresses = [ <span class="hljs-string">"509808FF-ECFE-895D-C1FE-BE5AC5DB6204"</span>, <span class="hljs-string">"338312FA-C3D1-183F-325A-0726AFDBEB78"</span> ]

<span class="hljs-keyword">def</span> <span class="hljs-title function_">callback</span>(<span class="hljs-params">sender, data</span>): <span class="hljs-built_in">print</span>(<span class="hljs-string">f"<span class="hljs-subst">{sender}</span> -- <span class="hljs-subst">{data}</span>"</span>)

<span class="hljs-keyword">async</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">connect</span>(<span class="hljs-params">address</span>): <span class="hljs-comment"># Connect to the DOT and stream data</span> <span class="hljs-keyword">async</span> <span class="hljs-keyword">with</span> BleakClient(address) <span class="hljs-keyword">as</span> client: <span class="hljs-built_in">print</span>(<span class="hljs-string">f"Client connection to `<span class="hljs-subst">{client.address}</span>: <span class="hljs-subst">{client.is_connected}</span>"</span>)

    <span class="hljs-comment"># Subscribe to data notifications</span>
    <span class="hljs-keyword">await</span> client.start_notify(short_payload_char_uuid, callback)

    <span class="hljs-comment"># Set and turn on measurement mode</span>
    binary_message = <span class="hljs-string">b"\x01\x01\x06"</span>
    <span class="hljs-keyword">await</span> client.write_gatt_char(measurement_char_uuid, binary_message, response=<span class="hljs-literal">True</span>)

    <span class="hljs-comment"># Stream data for 10 seconds</span>
    <span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">10.0</span>)

<span class="hljs-keyword">async</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">main</span>(): <span class="hljs-keyword">await</span> asyncio.gather(*(connect(addr) <span class="hljs-keyword">for</span> addr <span class="hljs-keyword">in</span> addresses))

<span class="hljs-keyword">if</span> name == <span class="hljs-string">"main"</span>: asyncio.run(main())</pre></div><p id="7a65"><b>If we run this, now we get data:</b></p><figure id="374a"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*kGR-eHAnllb4tqXxlKj2Mg.png"><figcaption></figcaption></figure><p id="f63f"><b><i>Troubleshooting:</i></b><i> if you don’t see any sensor data like above, add <code>response=True</code> as a 3rd argument to <code>write_gatt_char()</code></i>:</p><div id="7c67"><pre><span class="hljs-keyword">await</span> client.write_gatt_char(measurement_char_uuid, binary_message, response=<span class="hljs-literal">True</span>)</pre></div><p id="9cb7">However, the data above is in a <code>bytearray</code> , which is just a bunch of hex numbers like <code>\x8d\x00</code>. We need to convert it to real numbers.</p><h1 id="05fe">Step 6: Convert the Data to Real Numbers with NumPy</h1><p id="d049">To convert the <code>bytearray</code>s to real numbers, we’ll use <a href="https://numpy.org/">NumPy</a>. NumPy is the master of working with math, bits, and bytes in Python.</p><p id="ec3c">According to the <b>Measurement Service</b> section (Section 3) of the <a href="https://www.movella.com/support/software-documentation">DOT BLE Specification documentation</a>, each Bluetooth notification for “Free Acceleration” is 20 bytes. It has 16 bytes of actual data , made up of a <b>timestamp </b>and <b>X, Y, Z free acceleration</b> data, and 4 bytes of zeros for padding.</p><p id="c73e"><b>Section 3.5 Measurement Data</b> also shows that the actual numbers (X, Y, Z) in free acceleration data are 12 bytes total, meaning that the data is:</p><ul><li>a 4-byte <b>timestamp</b></li><li>a 4-byte <b>X </b>accelerometer value</li><li>a 4-byte <b>Y</b> accelerometer value</li><li>a 4-byte <b>Z</b> accelerometer value</li><li>4-bytes of <b>zeroes</b></li></ul><p id="6ce6">We’ll use this function to convert the <code>bytearray</code>s to real numbers:</p><div id="6d19"><pre><span class="hljs-keyword">import</span> numpy as np ... def encode_free_acceleration(bytes_): data_segments = np.dtype([ (<span class="hljs-string">'timestamp'</span>, np.<span class="hljs-type">uint32</span>), (<span class="hljs-string">'x'</span>, np.<span class="hljs-type">float32</span>), (<span class="hljs-string">'y'</span>, np.<span class="hljs-type">float32</span>), (<span class="hljs-string">'z'</span>, np.<span class="hljs-type">float32</span>), (<span class="hljs-string">'zero_padding'</span>, np.<span class="hljs-type">uint32</span>) ]) formatted_data = np.frombuffer(bytes_, dtype=data_segments) <span class="hljs-keyword">return</span> formatted_data</pre></div><p id="c5c9"><b>You don’t have to understand it yet to use it in your code.</b> Just know that it’s using <code>data_segments</code> as a template to slice up the <code>bytes</code> into neat chunks and converting them to real numbers.</p><p id="2e9b">Let’s make sure we use this new function in our callback function, so that we print real numbers:</p><div id="dbb9"><pre>def <span class="hljs-built_in">callback</span>(sender, data): <span class="hljs-built_in">print</span>(<span class="hljs-built_in">encode_free_acceleration</span>(data))</pre></div><p id="3410">Note that the data has to be chopped up differently for each of the DOT’s 18 measurement modes.</p><p id="90a0">If we run the full code:</p><div id="92e9"><pre><span class="hljs-comment"># Stream Free Acceleration data from multiple Movella DOTs</span>

<span class="hljs-keyword">import</span> numpy <span class="hljs-keyword">as</span> np <span class="hljs-keyword">import</span> asyncio <span class="hljs-keyword">from</span> bleak <span class="hljs-keyword">import</span> BleakClient

measurement_char_uuid = <span class="hljs-string">"15172001-4947-11e9-8646-d663bd873d93"</span> short_payload_char_uuid = <span class="hljs-string">"15172004-4947-11e9-8646-d663bd873d93"</span> addresses = [ <span class="hljs-string">"509808FF-ECFE-895D-C1FE-BE5AC5DB6204"</span>, <span class="hljs-string">"338312FA-C3D1-183F-325A-0726AFDBEB78"</span> ]

<span class="hljs-keyword">def</span> <span class="hljs-title function_">callback</span>(<span class="hljs-params">sender, data</span>): <span class="hljs-built_in">print</span>(encode_free_acceleration(data))

<span class="hljs-keyword">def</span> <span class="hljs-title function_">encode_free_acceleration</span>(<span class="hljs-params">bytes_</span>): data_segments = np.dtype([ (<span class="hljs-string">'timestamp'</span>, np.uint32), (<span class="hljs-string">'x'</span>, np.float32), (<span class="hljs-string">'y'</span>, np.float32), (<span class="hljs-string">'z'</span>, np.float32), (<span class="hljs-string">'zero_padding'</span>, np.uint32) ]) formatted_data = np.frombuffer(bytes_, dtype=data_segments) <span class="hljs-keyword">return</span> formatted_data

<span class="hljs-keyword">async</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">connect</span>(<span class="hljs-params">address</span>): <span class="hljs-comment"># Connect to the DOT and stream data</span> <span class="hljs-keyword">async</span> <span class="hljs-keyword">with</span> BleakClient(address) <span class="hljs-keyword">as</span> client: <span class="hljs-built_in">print</span>(<span class="hljs-

Options

string">f"Client connection to `<span class="hljs-subst">{client.address}</span>: <span class="hljs-subst">{client.is_connected}</span>"</span>)

    <span class="hljs-comment"># Subscribe to data notifications</span>
    <span class="hljs-keyword">await</span> client.start_notify(short_payload_char_uuid, callback)

    <span class="hljs-comment"># Set and turn on measurement mode</span>
    binary_message = <span class="hljs-string">b"\x01\x01\x06"</span>
    <span class="hljs-keyword">await</span> client.write_gatt_char(measurement_char_uuid, binary_message, response=<span class="hljs-literal">True</span>)

    <span class="hljs-comment"># Stream data for 10 seconds</span>
    <span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">10.0</span>)

<span class="hljs-keyword">async</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">main</span>(): <span class="hljs-keyword">await</span> asyncio.gather(*(connect(addr) <span class="hljs-keyword">for</span> addr <span class="hljs-keyword">in</span> addresses))

<span class="hljs-keyword">if</span> name == <span class="hljs-string">"main"</span>: asyncio.run(main())</pre></div><p id="4777">We’ll get real data!</p><figure id="4cc8"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*brY46qzHUv6sGPA9zPWYGg.png"><figcaption></figcaption></figure><p id="dcd6">The only problem is that <b>we don’t know which DOT</b> each line came from. <b>We’ll fix that</b> by writing a short <code>class</code> that print’s the device’s address.</p><h1 id="366e">Step 7: Tag Each Data Line with Its Device’s Address</h1><p id="37ed">We can’t give the callback function an argument directly, since the <code>(sender, data)</code> arguments are fixed (<i>unless we use <a href="https://docs.python.org/3/library/functools.html">functools and partials</a></i>). Instead, we’ll move our <code>callback()</code> into a <code>class</code> that holds the device address.</p><div id="3cb6"><pre><span class="hljs-keyword">class</span> <span class="hljs-title class_">NotificationHandler</span>: <span class="hljs-string">'''This class allows us to add the DOT's UUID address to the data that gets printed'''</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">init</span>(<span class="hljs-params">self, device_address</span>): self.device_address = device_address

<span class="hljs-keyword">def</span> <span class="hljs-title function_">callback</span>(<span class="hljs-params">self, sender, data</span>):
    <span class="hljs-built_in">print</span>(<span class="hljs-string">f"<span class="hljs-subst">{self.device_address}</span> -- <span class="hljs-subst">{encode_free_acceleration(data)}</span>"</span>)</pre></div><p id="21f5">This way, we can create an instance of the class for each DOT device with the device’s UUID address, <b>then we can incorporate that address into the printed data</b> via the class’s <code>self.device_address</code> variable.</p><p id="a6ba">We’ll instantiate the class for each DOT device with <code>nh = NotificationHandler(address)</code> , then give <code>nh.callback()</code> as the new callback function:</p><div id="74ab"><pre>...

<span class="hljs-keyword">async</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">stream</span>(<span class="hljs-params">address</span>): nh = NotificationHandler(address) <span class="hljs-keyword">async</span> <span class="hljs-keyword">with</span> BleakClient(address, timeout=<span class="hljs-number">10.0</span>) <span class="hljs-keyword">as</span> client: <span class="hljs-comment"># Check if connection was successful</span> <span class="hljs-built_in">print</span>(<span class="hljs-string">f"Client connection: <span class="hljs-subst">{client.is_connected}</span>"</span>) <span class="hljs-comment"># prints True or False</span>

    <span class="hljs-comment"># Subscribe to notifications from the Short Payload Characteristic</span>
    <span class="hljs-keyword">await</span> client.start_notify(short_payload_characteristic_uuid, nh.callback)

...</pre></div><p id="3a6b">Now, <code>start_notify()</code>‘s callback can access the “<code>self</code>” variables from <code>NotificationHandler</code>, which includes the DOT’s address.</p><p id="f6da"><b>The full script that prints accelerometer data with UUID addresses:</b></p><div id="9a6e"><pre><span class="hljs-comment"># Stream Free Acceleration data from multiple Movella DOTs</span>

<span class="hljs-keyword">import</span> numpy <span class="hljs-keyword">as</span> np <span class="hljs-keyword">import</span> asyncio <span class="hljs-keyword">from</span> bleak <span class="hljs-keyword">import</span> BleakClient

measurement_char_uuid = <span class="hljs-string">"15172001-4947-11e9-8646-d663bd873d93"</span> short_payload_char_uuid = <span class="hljs-string">"15172004-4947-11e9-8646-d663bd873d93"</span> addresses = [ <span class="hljs-string">"509808FF-ECFE-895D-C1FE-BE5AC5DB6204"</span>, <span class="hljs-string">"338312FA-C3D1-183F-325A-0726AFDBEB78"</span> ]

<span class="hljs-keyword">class</span> <span class="hljs-title class_">NotificationHandler</span>: <span class="hljs-string">'''This class allows us to add the DOT's UUID address to the data that gets printed'''</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">init</span>(<span class="hljs-params">self, device_address</span>): self.device_address = device_address

<span class="hljs-keyword">def</span> <span class="hljs-title function_">callback</span>(<span class="hljs-params">self, sender, data</span>):
    <span class="hljs-built_in">print</span>(<span class="hljs-string">f"<span class="hljs-subst">{self.device_address}</span> -- <span class="hljs-subst">{encode_free_acceleration(data)}</span>"</span>)

<span class="hljs-keyword">def</span> <span class="hljs-title function_">encode_free_acceleration</span>(<span class="hljs-params">bytes_</span>): data_segments = np.dtype([ (<span class="hljs-string">'timestamp'</span>, np.uint32), (<span class="hljs-string">'x'</span>, np.float32), (<span class="hljs-string">'y'</span>, np.float32), (<span class="hljs-string">'z'</span>, np.float32), (<span class="hljs-string">'zero_padding'</span>, np.uint32) ]) formatted_data = np.frombuffer(bytes_, dtype=data_segments) <span class="hljs-keyword">return</span> formatted_data

<span class="hljs-keyword">async</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">connect</span>(<span class="hljs-params">address</span>): nh = NotificationHandler(address) <span class="hljs-comment"># Connect to the DOT and stream data</span> <span class="hljs-keyword">async</span> <span class="hljs-keyword">with</span> BleakClient(address) <span class="hljs-keyword">as</span> client: <span class="hljs-built_in">print</span>(<span class="hljs-string">f"Client connection to `<span class="hljs-subst">{client.address}</span>: <span class="hljs-subst">{client.is_connected}</span>"</span>)

    <span class="hljs-comment"># Subscribe to data notifications</span>
    <span class="hljs-keyword">await</span> client.start_notify(short_payload_char_uuid, nh.callback)

    <span class="hljs-comment"># Set and turn on measurement mode</span>
    binary_message = <span class="hljs-string">b"\x01\x01\x06"</span>
    <span class="hljs-keyword">await</span> client.write_gatt_char(measurement_char_uuid, binary_message, response=<span class="hljs-literal">True</span>)

    <span class="hljs-comment"># Stream data for 10 seconds</span>
    <span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">10.0</span>)

<span class="hljs-keyword">async</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">main</span>(): <span class="hljs-keyword">await</span> asyncio.gather(*(connect(addr) <span class="hljs-keyword">for</span> addr <span class="hljs-keyword">in</span> addresses))

<span class="hljs-keyword">if</span> name == <span class="hljs-string">"main"</span>: asyncio.run(main())</pre></div><h1 id="b707">Step 8: Extra Formatting (Optional)</h1><p id="c8be">We can get rid of the <code>[(brackets)]</code> around the data by changing the callback to:</p><div id="9035"><pre><span class="hljs-keyword">def</span> <span class="hljs-title function_">callback</span>(<span class="hljs-params">self, sender, data</span>): free_acceleration = encode_free_acceleration(data)[<span class="hljs-number">0</span>] free_acceleration = <span class="hljs-built_in">str</span>(free_acceleration)[<span class="hljs-number">1</span>:-<span class="hljs-number">1</span>] <span class="hljs-built_in">print</span>(<span class="hljs-string">f"<span class="hljs-subst">{self.device_address}</span> -- <span class="hljs-subst">{free_acceleration}</span>"</span>)</pre></div><p id="e010">Then the data will look like:</p><figure id="bf36"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*aKDakqw428_mwObayBFNEw.png"><figcaption></figcaption></figure><p id="290d">Above is the <b>DOT’s address</b>, the <b>timestamp</b>, and the <b>X, Y, and Z</b> of the accelerometer (and a zero <code>0</code> for padding).</p><p id="78b1">You now have real data from your Movella DOTs. Check out our next article on <a href="https://readmedium.com/how-to-save-movella-dot-data-to-a-sqlite-database-7159c09543c0">how to save this data to a SQLite database</a>.</p><h1 id="c788">Troubleshooting on Linux and Windows</h1><p id="8167">If you are on a Linux system or any other system that has trouble connecting to Bluetooth devices in quick succession, <a href="https://gist.github.com/protobioengineering/1e32932ed613bee6f304f38588f87ffd">try this workaround code on our GitHub</a>. This code separates the device-finding and device-connecting parts to avoid errors like <code>bleak.exc.BleakDBusError: Operation already in progress</code>.</p><h1 id="45ba">The Full Script</h1><p id="612d">This is <b>all the code</b> we need to get Free Acceleration data from two Movella DOTs:</p><div id="fc38"><pre><span class="hljs-comment"># Stream Free Acceleration data from multiple Movella DOTs</span>

<span class="hljs-keyword">import</span> numpy <span class="hljs-keyword">as</span> np <span class="hljs-keyword">import</span> asyncio <span class="hljs-keyword">from</span> bleak <span class="hljs-keyword">import</span> BleakClient

measurement_char_uuid = <span class="hljs-string">"15172001-4947-11e9-8646-d663bd873d93"</span> short_payload_char_uuid = <span class="hljs-string">"15172004-4947-11e9-8646-d663bd873d93"</span>

<span class="hljs-comment"># Replace this with a list of your DOT's UUID addresses </span> <span class="hljs-comment"># (or "MAC addresses" for Windows and Linux users)</span> addresses = [ <span class="hljs-string">"509808FF-ECFE-895D-C1FE-BE5AC5DB6204"</span>, <span class="hljs-string">"338312FA-C3D1-183F-325A-0726AFDBEB78"</span> ]

<span class="hljs-keyword">class</span> <span class="hljs-title class_">NotificationHandler</span>: <span class="hljs-string">'''This class allows us to add the DOT's UUID address to the data that gets printed'''</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">init</span>(<span class="hljs-params">self, device_address</span>): self.device_address = device_address

<span class="hljs-keyword">def</span> <span class="hljs-title function_">callback</span>(<span class="hljs-params">self, sender, data</span>):
    free_acceleration = encode_free_acceleration(data)[<span class="hljs-number">0</span>]
    free_acceleration = <span class="hljs-built_in">str</span>(free_acceleration)[<span class="hljs-number">1</span>:-<span class="hljs-number">1</span>]
    <span class="hljs-built_in">print</span>(<span class="hljs-string">f"<span class="hljs-subst">{self.device_address}</span> -- <span class="hljs-subst">{free_acceleration}</span>"</span>)

<span class="hljs-keyword">def</span> <span class="hljs-title function_">encode_free_acceleration</span>(<span class="hljs-params">bytes_</span>): data_segments = np.dtype([ (<span class="hljs-string">'timestamp'</span>, np.uint32), (<span class="hljs-string">'x'</span>, np.float32), (<span class="hljs-string">'y'</span>, np.float32), (<span class="hljs-string">'z'</span>, np.float32), (<span class="hljs-string">'zero_padding'</span>, np.uint32) ]) formatted_data = np.frombuffer(bytes_, dtype=data_segments) <span class="hljs-keyword">return</span> formatted_data

<span class="hljs-keyword">async</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">connect</span>(<span class="hljs-params">address</span>): nh = NotificationHandler(address) <span class="hljs-comment"># Connect to the DOT and stream data</span> <span class="hljs-keyword">async</span> <span class="hljs-keyword">with</span> BleakClient(address) <span class="hljs-keyword">as</span> client: <span class="hljs-built_in">print</span>(<span class="hljs-string">f"Client connection to `<span class="hljs-subst">{client.address}</span>: <span class="hljs-subst">{client.is_connected}</span>"</span>)

    <span class="hljs-comment"># Subscribe to data notifications</span>
    <span class="hljs-keyword">await</span> client.start_notify(short_payload_char_uuid, nh.callback)

    <span class="hljs-comment"># Set and turn on measurement mode</span>
    binary_message = <span class="hljs-string">b"\x01\x01\x06"</span>
    <span class="hljs-keyword">await</span> client.write_gatt_char(measurement_char_uuid, binary_message, response=<span class="hljs-literal">True</span>)

    <span class="hljs-comment"># Stream data for 10 seconds</span>
    <span class="hljs-keyword">await</span> asyncio.sleep(<span class="hljs-number">10.0</span>)

<span class="hljs-keyword">async</span> <span class="hljs-keyword">def</span> <span class="hljs-title function_">main</span>(): <span class="hljs-keyword">await</span> asyncio.gather(*(connect(addr) <span class="hljs-keyword">for</span> addr <span class="hljs-keyword">in</span> addresses))

<span class="hljs-keyword">if</span> name == <span class="hljs-string">"main"</span>: asyncio.run(main())</pre></div><p id="c6c6">If you’re getting errors (like <code>BleakDBusError</code>), try <a href="https://gist.github.com/protobioengineering/1e32932ed613bee6f304f38588f87ffd">this version of the script from our GitHub</a>.</p><p id="b538">Output:</p><figure id="37cd"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*aKDakqw428_mwObayBFNEw.png"><figcaption>Above is the DOT’s address, the timestamp, and the X, Y, and Z of the accelerometer (and a zero <code>0</code> for padding).</figcaption></figure><h1 id="f4c8">Questions and Feedback</h1><p id="50ec">If you have questions or feedback, email us at [email protected] or message us on <a href="http://instagram.com/protobioengineering">Instagram (@protobioengineering)</a>.</p><p id="d465">If you liked this article, consider supporting us by <a href="https://ko-fi.com/protobio">donating a coffee</a>.</p><h1 id="52d1">Related Resources</h1><ul><li><a href="https://base.xsens.com/s/xsens-dot-landing-page">Movella Support and Knowledge Base</a></li><li><a href="https://www.movella.com/support/software-documentation">Movella SDKs and Bluetooth Documentation</a></li><li><a href="https://realpython.com/async-io-python/">AsyncIO in Python: A Complete Walkthrough</a></li></ul><h1 id="5852">Other Bluetooth and Python Stuff</h1><ul><li><a href="https://readmedium.com/how-to-stream-data-from-a-movella-dot-wearable-sensor-with-a-mac-and-python-5822e76fb43e">How to Stream Data from a Single Movella DOT with a Mac and Python</a></li><li><a href="https://readmedium.com/how-to-make-a-detailed-bluetooth-le-scanner-with-a-macbook-and-python-8e2c7dccfd39?postPublishedType=repub">How to Make a Detailed Bluetooth LE Scanner with a MacBook and Python</a></li><li><a href="https://readmedium.com/how-to-make-a-simple-bluetooth-scanner-on-windows-with-python-245b58b53793">How to Make a Simple Bluetooth Scanner on Windows with Python</a></li></ul></article></body>

How to Stream Data from Multiple Movella DOTs with a Mac and Python

Track your body movements with Movella wearable sensors, Python, and a MacBook.

Photo by Alessio Soggetti on Unsplash

Movella DOTs (formerly known as “Xsens” DOTs) are wearable devices that you can wear on a wrist, arm, or ankle to track your movement.

Movella makes SDKs, so that you can write Python code for them on Windows or Linux. However, there is no such SDK for Mac OS computers. Therefor, we’ll write our own to stream data from some DOTs using Python 3 and Bleak.

This code will be much like our code for streaming data from a single Movella DOT, except that we’ll use AsyncIO’s gather() function to talk to multiple DOTs at once.

The full script is available at the end of this article.

The final output of this script: the DOT’s address, a timestamp, and the X, Y, and Z of the accelerometer.

Prerequisite Knowledge

We’ll be using Movella’s DOT BLE Services Specifications documentation to know how to interact with the DOTs.

You should know:

But if you don’t, the full script is available at the end of this article anyway.

Check out our other articles if you want to start simple:

Requirements

  • a Mac computer (Windows/Linux might work)
  • Python 3
  • Bleak (a Python Bluetooth LE library)
  • NumPy (a Python math library)
  • 2+ Movella DOT sensors

Steps

In this tutorial, we will:

0. Install the Bleak and NumPy Python libraries

  1. “Scan” for the DOTs Bluetooth addresses
  2. Connect to the Movella DOTs in Python
  3. “Subscribe” to measurement data notifications
  4. Set and Turn on the measurement mode
  5. Listen” for data and print it
  6. Format the data with NumPy
  7. Tag the data with its device’s address

The final script is at the bottom of this article.

Let’s get started.

Step 0: Download and Install Bleak and Numpy

Bleak is the most popular Bluetooth LE library for Python, and it works on Mac, Windows, and Linux.

NumPy helps with math operations on big data in Python. It will make reading the data from the DOTs easy.

To install Bleak and Numpy, run these in Terminal:

pip install bleak

pip install numpy

For more assistance, check out Bleak and NumPy’s installation help pages.

If you’re new to Pip, it’s the most popular package manager for Python and makes installing libraries for any project easy. Pip comes included with most Python installs. If you don’t have it, install Pip here.

Step 1: Scan for the DOTs’ Addresses

Just like every website has a URL and every house has a street address, Bluetooth devices have wireless addresses.

There are some quirks to how these addresses work on Mac computers, but in short, each Bluetooth device will get a UUID, kind of like:

12345678-e89b-12d3-a456-426614174000
OR
ABCDEFG1-2222-3333-4444-56789AAABBBC

This is how your Mac differentiates the DOTs from every other Bluetooth device near you.

Note: For Windows and Linux, your device’s address will be a MAC address, which looks like ab:cd:ef:11:22:33, but functions the same as a UUID.

We’ll use the Bluetooth scanner from our previous tutorial on How to Write a Simple Bluetooth Scanner with a MacBook and Python (Windows version here).

Scanner code:

# scanner.py

import asyncio
from bleak import BleakScanner

async def main():
    devices = await BleakScanner.discover()
    for device in devices:
        print(device)

asyncio.run(main())

Put this in a file called scanner.py. Before you run it, make sure your computer’s Bluetooth is on and the DOT is on and blinking.

Run the scanner. It will run silently for 10 seconds, then a bunch of UUID addresses will show up on the screen. Your DOTs will come up on screen Xsens DOT or Movella DOT.

We have two DOTs at our office, so this gives us 2 UUID addresses:

  • 509808FF-ECFE-895D-C1FE-BE5AC5DB6204
  • 338312FA-C3D1–183F-325A-0726AFDBEB78

We’ll plug these into a different Python file in the next steps, which will connect to and stream data from the DOTs.

For more info on Bluetooth LE, asyncio, and how device connection works, check out our article, How to Connect to a Bluetooth Device with a MacBook and Python.

Step 2: Connect to the DOTs

Start a new Python file called stream.py. The rest of the code will go in this file. We don’t need scanner.py anymore, since we only needed it to get our DOTs’ Bluetooth addresses.

Like our tutorial on connecting to a single Movella DOT, we’ll use Bleak and asyncio. The difference is that we’ll tell asyncio to kick off multiple, separate asynchronous functions at once for all of our devices by using asyncio.gather().

While the code for connecting to a single DOT looks like this:

# Connects to a single Movella DOT

import asyncio
from bleak import BleakClient

async def main():
    address = "338312FA-C3D1-183F-325A-0726AFDBEB78" # Movella DOT UUID

    async with BleakClient(address) as client:
        # prints True if connection was successful
        print(f"Client connection to `{client.address}: {client.is_connected}")

if __name__ == "__main__":
    asyncio.run(main())

We add gather() to connect to multiple DOTs like this:

# Connects to multiple Movella DOTs

import asyncio
from bleak import BleakClient

addresses = [
    "509808FF-ECFE-895D-C1FE-BE5AC5DB6204",
    "338312FA-C3D1-183F-325A-0726AFDBEB78"
]

async def connect(address):
    async with BleakClient(address) as client:
        # prints True if connection was successful
        print(f"Client connection to `{client.address}: {client.is_connected}")

async def main():
    await asyncio.gather(*(connect(addr) for addr in addresses))


if __name__ == "__main__":
    asyncio.run(main())

We moved all the stuff we need a single DOT to do from main() into a new function, connect(). Now, we’ll give a bunch of instances of connect(), each with a different DOT UUID address, to asyncio.gather() and asyncio will run them all at the same time.

Without the * notation, we could also write gather() as:

await asyncio.gather(connect(addresses[0]), connect(addresses[1]))

If we test the above connection code, we get this:

Step 3: “Subscribe” to Data Notifications

In this step, we’ll tell the DOT that we’re going to be listening for movement data. Our computer has to “subscribe” to the message that the DOTs send out (AKA “notifications”) in order to be ready to hear the messages. Then in Step 4, we’ll “listen” for the data.

This is counterintuitive, since our code/laptop is not “asking” the device for its data. Instead, we’re listening for the data. Asking for the data 30–60 times a second would be a waste of energy. It’s faster and less energy intensive to tell the DOT to send data once at the beginning.

To subscribe to notifications, we’ll use Bleak’s start_notify().

To use start_notify(), we give it 2 arguments:

  • The “characteristic” we want it to listen for data from
  • The function to use whenever our computer “hears” a Bluetooth notification message (i.e. gets the measurement data). This is known as a “callback function.”

Like so:

def our_callback_function(sender, data):
    # Print the measurement data or whatever we want here
...
characteristic_uuid = "15172004-4947-11e9-8646-d663bd873d93"
...
await client.start_notify(characteristic_uuid, our_callback_function)

A characteristic is basically a thing that a Bluetooth device can do. A device can have:

  • a battery percentage characteristic
  • a heart rate characteristic
  • an accelerometer measuring characteristic

And more. We interact with characteristics by reading them, writing them, or subscribing to them (in the case of live data streaming).

Above, the characteristic_uuid is the UUID of the Short Payload Characteristic, which is the characteristic that we have to subscribe to. We got this UUID from the Movella DOT BLE Services Specifications documentation.

For whichever measurement mode we pick (the DOT has 18 modes), we’ll need to both:

  • turn on the measurement mode (which we do in the next step)
  • subscribe to the corresponding Short/Medium/Long Payload Characteristic

It’s counterintuitive, but this is how Bluetooth LE works.

The Callback Function

We can put whatever we want in our callback() function. We’ll have it print the characteristic that it got data from (sender) as well as the data itself.

def callback(sender, data):
    print(f"{sender} -- {data}")

Note: We use f-strings (f”{sender}”) to format our output. They allow the use of {variables} directly inside of strings.

The Full Script at This Point

Our code, stream.py, will now look like this:

# Connect to and subscribe to notification for multiple Movella DOTs

import asyncio
from bleak import BleakClient


short_payload_char_uuid = "15172004-4947-11e9-8646-d663bd873d93"
addresses = [
    "509808FF-ECFE-895D-C1FE-BE5AC5DB6204",
    "338312FA-C3D1-183F-325A-0726AFDBEB78"
]

def callback(sender, data):
    print(f"{sender} -- {data}")

async def connect(address):
    async with BleakClient(address) as client:
        print(f"Client connection to `{client.address}: {client.is_connected}")

        await client.start_notify(short_payload_char_uuid, callback)


async def main():
    await asyncio.gather(*(connect(addr) for addr in addresses))


if __name__ == "__main__":
    asyncio.run(main())

Now, for every address in addresses , there is a connect() function running, which connects to the DOT and then subscribes to notifications. We have 2 devices for this example, so 2 connects are running. If we had 5 addresses, gather would kick off 5 connect functions.

We don’t have any data yet though, because we need to turn on measurement in Step 4 and “listen” for data in Step 5.

Step 4: Set and Turn on Measurement Mode

We’ll do these in one step, since they’re both done with a single function from Bleak, write_gatt_char().

All we have to do is send a short string of numbers to the Measurement Characteristic. This string will tell the DOT which of the 18 measurement modes we want it to use and tell it to turn on.

For this tutorial, we’ll use mode #6 below: Free Acceleration.

And the string we send will look something like:

binary_message = b"\x01\x01\x06"

The binary_message has 3 hex numbers that specify:

  • \x01 — A default value that the DOT BLE docs say is required
  • \x01 — whether measurement is on or not. 01 = on. 00 = off.
  • \x06 — specifies that we want Free Acceleration (option 6 from the measurement modes list)

For this step, we only have to add 3 lines to our script. The following 2:

binary_message = b"\x01\x01\x06"
await client.write_gatt_char(measurement_characteristic_uuid, binary_message)

And add this near the top of the file:

measurement_char_uuid = "15172001-4947-11e9-8646-d663bd873d93"

Now, the full script is:

# Turn on Free Acceleration mode for multiple Movella DOTs

import asyncio
from bleak import BleakClient

measurement_char_uuid = "15172001-4947-11e9-8646-d663bd873d93"
short_payload_char_uuid = "15172004-4947-11e9-8646-d663bd873d93"
addresses = [
    "509808FF-ECFE-895D-C1FE-BE5AC5DB6204",
    "338312FA-C3D1-183F-325A-0726AFDBEB78"
]

def callback(sender, data):
    print(f"{sender} -- {data}")

async def connect(address):
    # Connect to the DOT and stream data
    async with BleakClient(address) as client:
        print(f"Client connection to `{client.address}: {client.is_connected}")
    
        # Subscribe to data notifications
        await client.start_notify(short_payload_char_uuid, callback)

        # Set and turn on measurement mode
        binary_message = b"\x01\x01\x06"
        await client.write_gatt_char(measurement_char_uuid, binary_message)


async def main():
    await asyncio.gather(*(connect(addr) for addr in addresses))


if __name__ == "__main__":
    asyncio.run(main())

Measurement mode is now on. Now, we only need one more line of code in the next step to get data.

Step 5: “Listen” for Data

When we run the script from the last step, it quits without giving us any data.

To fix this, we need to tell our script to wait, by using asyncio.sleep():

await asyncio.sleep(10.0) # Stream data for 10.0 seconds.

This will tell our code to wait for Bluetooth messages to come in from the DOT, which have our accelerometer data.

Add this below write_gatt_char() as:

# Set and turn on measurement mode
binary_message = b"\x01\x01\x06"
await client.write_gatt_char(measurement_char_uuid, binary_message)

# Stream data for 10 seconds
await asyncio.sleep(10.0)

The full script that can get raw data is:

# Stream Free Acceleration data from multiple Movella DOTs

import asyncio
from bleak import BleakClient

measurement_char_uuid = "15172001-4947-11e9-8646-d663bd873d93"
short_payload_char_uuid = "15172004-4947-11e9-8646-d663bd873d93"
addresses = [
    "509808FF-ECFE-895D-C1FE-BE5AC5DB6204",
    "338312FA-C3D1-183F-325A-0726AFDBEB78"
]

def callback(sender, data):
    print(f"{sender} -- {data}")

async def connect(address):
    # Connect to the DOT and stream data
    async with BleakClient(address) as client:
        print(f"Client connection to `{client.address}: {client.is_connected}")
    
        # Subscribe to data notifications
        await client.start_notify(short_payload_char_uuid, callback)

        # Set and turn on measurement mode
        binary_message = b"\x01\x01\x06"
        await client.write_gatt_char(measurement_char_uuid, binary_message, response=True)

        # Stream data for 10 seconds
        await asyncio.sleep(10.0)

async def main():
    await asyncio.gather(*(connect(addr) for addr in addresses))

if __name__ == "__main__":
    asyncio.run(main())

If we run this, now we get data:

Troubleshooting: if you don’t see any sensor data like above, add response=True as a 3rd argument to write_gatt_char():

await client.write_gatt_char(measurement_char_uuid, binary_message, response=True)

However, the data above is in a bytearray , which is just a bunch of hex numbers like \x8d\x00. We need to convert it to real numbers.

Step 6: Convert the Data to Real Numbers with NumPy

To convert the bytearrays to real numbers, we’ll use NumPy. NumPy is the master of working with math, bits, and bytes in Python.

According to the Measurement Service section (Section 3) of the DOT BLE Specification documentation, each Bluetooth notification for “Free Acceleration” is 20 bytes. It has 16 bytes of actual data , made up of a timestamp and X, Y, Z free acceleration data, and 4 bytes of zeros for padding.

Section 3.5 Measurement Data also shows that the actual numbers (X, Y, Z) in free acceleration data are 12 bytes total, meaning that the data is:

  • a 4-byte timestamp
  • a 4-byte X accelerometer value
  • a 4-byte Y accelerometer value
  • a 4-byte Z accelerometer value
  • 4-bytes of zeroes

We’ll use this function to convert the bytearrays to real numbers:

import numpy as np
...
def encode_free_acceleration(bytes_):
    data_segments = np.dtype([
        ('timestamp', np.uint32),
        ('x', np.float32),
        ('y', np.float32),
        ('z', np.float32),
        ('zero_padding', np.uint32)
        ])
    formatted_data = np.frombuffer(bytes_, dtype=data_segments)
    return formatted_data

You don’t have to understand it yet to use it in your code. Just know that it’s using data_segments as a template to slice up the bytes into neat chunks and converting them to real numbers.

Let’s make sure we use this new function in our callback function, so that we print real numbers:

def callback(sender, data):
    print(encode_free_acceleration(data))

Note that the data has to be chopped up differently for each of the DOT’s 18 measurement modes.

If we run the full code:

# Stream Free Acceleration data from multiple Movella DOTs

import numpy as np
import asyncio
from bleak import BleakClient

measurement_char_uuid = "15172001-4947-11e9-8646-d663bd873d93"
short_payload_char_uuid = "15172004-4947-11e9-8646-d663bd873d93"
addresses = [
    "509808FF-ECFE-895D-C1FE-BE5AC5DB6204",
    "338312FA-C3D1-183F-325A-0726AFDBEB78"
]

def callback(sender, data):
    print(encode_free_acceleration(data))

def encode_free_acceleration(bytes_):
    data_segments = np.dtype([
        ('timestamp', np.uint32),
        ('x', np.float32),
        ('y', np.float32),
        ('z', np.float32),
        ('zero_padding', np.uint32)
        ])
    formatted_data = np.frombuffer(bytes_, dtype=data_segments)
    return formatted_data

async def connect(address):
    # Connect to the DOT and stream data
    async with BleakClient(address) as client:
        print(f"Client connection to `{client.address}: {client.is_connected}")
    
        # Subscribe to data notifications
        await client.start_notify(short_payload_char_uuid, callback)

        # Set and turn on measurement mode
        binary_message = b"\x01\x01\x06"
        await client.write_gatt_char(measurement_char_uuid, binary_message, response=True)

        # Stream data for 10 seconds
        await asyncio.sleep(10.0)


async def main():
    await asyncio.gather(*(connect(addr) for addr in addresses))


if __name__ == "__main__":
    asyncio.run(main())

We’ll get real data!

The only problem is that we don’t know which DOT each line came from. We’ll fix that by writing a short class that print’s the device’s address.

Step 7: Tag Each Data Line with Its Device’s Address

We can’t give the callback function an argument directly, since the (sender, data) arguments are fixed (unless we use functools and partials). Instead, we’ll move our callback() into a class that holds the device address.

class NotificationHandler:
    '''This class allows us to add the DOT's UUID address to the data that gets printed'''
    def __init__(self, device_address):
        self.device_address = device_address

    def callback(self, sender, data):
        print(f"{self.device_address} -- {encode_free_acceleration(data)}")

This way, we can create an instance of the class for each DOT device with the device’s UUID address, then we can incorporate that address into the printed data via the class’s self.device_address variable.

We’ll instantiate the class for each DOT device with nh = NotificationHandler(address) , then give nh.callback() as the new callback function:

...
async def stream(address):
    nh = NotificationHandler(address)
    async with BleakClient(address, timeout=10.0) as client:
        # Check if connection was successful
        print(f"Client connection: {client.is_connected}") # prints True or False

        # Subscribe to notifications from the Short Payload Characteristic
        await client.start_notify(short_payload_characteristic_uuid, nh.callback)
...

Now, start_notify()‘s callback can access the “self” variables from NotificationHandler, which includes the DOT’s address.

The full script that prints accelerometer data with UUID addresses:

# Stream Free Acceleration data from multiple Movella DOTs

import numpy as np
import asyncio
from bleak import BleakClient


measurement_char_uuid = "15172001-4947-11e9-8646-d663bd873d93"
short_payload_char_uuid = "15172004-4947-11e9-8646-d663bd873d93"
addresses = [
    "509808FF-ECFE-895D-C1FE-BE5AC5DB6204",
    "338312FA-C3D1-183F-325A-0726AFDBEB78"
]

class NotificationHandler:
    '''This class allows us to add the DOT's UUID address to the data that gets printed'''
    def __init__(self, device_address):
        self.device_address = device_address

    def callback(self, sender, data):
        print(f"{self.device_address} -- {encode_free_acceleration(data)}")

def encode_free_acceleration(bytes_):
    data_segments = np.dtype([
        ('timestamp', np.uint32),
        ('x', np.float32),
        ('y', np.float32),
        ('z', np.float32),
        ('zero_padding', np.uint32)
        ])
    formatted_data = np.frombuffer(bytes_, dtype=data_segments)
    return formatted_data

async def connect(address):
    nh = NotificationHandler(address)
    # Connect to the DOT and stream data
    async with BleakClient(address) as client:
        print(f"Client connection to `{client.address}: {client.is_connected}")
    
        # Subscribe to data notifications
        await client.start_notify(short_payload_char_uuid, nh.callback)

        # Set and turn on measurement mode
        binary_message = b"\x01\x01\x06"
        await client.write_gatt_char(measurement_char_uuid, binary_message, response=True)

        # Stream data for 10 seconds
        await asyncio.sleep(10.0)


async def main():
    await asyncio.gather(*(connect(addr) for addr in addresses))


if __name__ == "__main__":
    asyncio.run(main())

Step 8: Extra Formatting (Optional)

We can get rid of the [(brackets)] around the data by changing the callback to:

def callback(self, sender, data):
    free_acceleration = encode_free_acceleration(data)[0]
    free_acceleration = str(free_acceleration)[1:-1]
    print(f"{self.device_address} -- {free_acceleration}")

Then the data will look like:

Above is the DOT’s address, the timestamp, and the X, Y, and Z of the accelerometer (and a zero 0 for padding).

You now have real data from your Movella DOTs. Check out our next article on how to save this data to a SQLite database.

Troubleshooting on Linux and Windows

If you are on a Linux system or any other system that has trouble connecting to Bluetooth devices in quick succession, try this workaround code on our GitHub. This code separates the device-finding and device-connecting parts to avoid errors like bleak.exc.BleakDBusError: Operation already in progress.

The Full Script

This is all the code we need to get Free Acceleration data from two Movella DOTs:

# Stream Free Acceleration data from multiple Movella DOTs

import numpy as np
import asyncio
from bleak import BleakClient

measurement_char_uuid = "15172001-4947-11e9-8646-d663bd873d93"
short_payload_char_uuid = "15172004-4947-11e9-8646-d663bd873d93"

# Replace this with a list of your DOT's UUID addresses 
# (or "MAC addresses" for Windows and Linux users)
addresses = [
    "509808FF-ECFE-895D-C1FE-BE5AC5DB6204",
    "338312FA-C3D1-183F-325A-0726AFDBEB78"
]

class NotificationHandler:
    '''This class allows us to add the DOT's UUID address to the data that gets printed'''
    def __init__(self, device_address):
        self.device_address = device_address

    def callback(self, sender, data):
        free_acceleration = encode_free_acceleration(data)[0]
        free_acceleration = str(free_acceleration)[1:-1]
        print(f"{self.device_address} -- {free_acceleration}")

def encode_free_acceleration(bytes_):
    data_segments = np.dtype([
        ('timestamp', np.uint32),
        ('x', np.float32),
        ('y', np.float32),
        ('z', np.float32),
        ('zero_padding', np.uint32)
        ])
    formatted_data = np.frombuffer(bytes_, dtype=data_segments)
    return formatted_data

async def connect(address):
    nh = NotificationHandler(address)
    # Connect to the DOT and stream data
    async with BleakClient(address) as client:
        print(f"Client connection to `{client.address}: {client.is_connected}")
    
        # Subscribe to data notifications
        await client.start_notify(short_payload_char_uuid, nh.callback)

        # Set and turn on measurement mode
        binary_message = b"\x01\x01\x06"
        await client.write_gatt_char(measurement_char_uuid, binary_message, response=True)

        # Stream data for 10 seconds
        await asyncio.sleep(10.0)

async def main():
    await asyncio.gather(*(connect(addr) for addr in addresses))

if __name__ == "__main__":
    asyncio.run(main())

If you’re getting errors (like BleakDBusError), try this version of the script from our GitHub.

Output:

Above is the DOT’s address, the timestamp, and the X, Y, and Z of the accelerometer (and a zero 0 for padding).

Questions and Feedback

If you have questions or feedback, email us at [email protected] or message us on Instagram (@protobioengineering).

If you liked this article, consider supporting us by donating a coffee.

Related Resources

Other Bluetooth and Python Stuff

Python
Bluetooth
Physical Therapy
Biomedical Engineering
Engineering
Recommended from ReadMedium