I will provide my code. The primary purpose of this is vehicle identification. My only hypothesis is my laptop having multiple Bluetooth signals but it seems to happen for other devices as well. This is a pretty simple goal and the code itself may not even be the problem. From what I have found there isn't any known side effect of bleak that causes this. Here is the code any help will be appreciated:
# -*- coding: utf-8 -*-
"""
Source: https://bleak.readthedocs.io/en/latest/api/scanner.html
"""
import asyncio
from bleak import BleakScanner
import time
# identifier code for the scanning device
device_id = ""
while device_id not in ("1", "2"):
device_id = input("Enter a 1 or a 2 for either scanner 1 or scanner 2:")
# running dictionary of discovered devices with discovery count
discovered = {}
# when the device was first discoverd
first_discover = {}
# when the device was last discovered
last_discover = {}
# max gap between discovery
max_discover_gap = {}
maxcount = 0
maxtime = 0
#Dynamic File Names
timestamp_str = time.strftime("%Y-%m-%d_%H-%M")
TXT_FILE = "scanner_" + device_id + "_" + "ble_data_" + timestamp_str + ".txt"
async def main():
stop_event = asyncio.Event()
# initializing scanning recording file
# TODO: does not currently allow the program to run twice
# How we would handle the event where the program is interrupted and
# has to run again must be decided
f = open(TXT_FILE, "w", encoding="utf-8")
try:
f.write("[Device]\n" + device_id + "\n[BTLECAPT]\n")
f.flush()
# Asyncronous event detection for stopping the program
async def wait_for_enter():
print("Press Enter to stop scanning...")
# do more research on async event loops
await asyncio.get_event_loop().run_in_executor(None, input)
#stops the current loop
stop_event.set()
def callback(device, advertising_data):
global maxcount, maxtime
# todo: do something with incoming data
print(device.address, advertising_data.rssi)
discover_time = time.time()
# If undiscovered initialize discovery
if device.address not in discovered:
# begins discovery count
discovered[device.address] = 1
# Setting the first instance it was discovered
first_discover[device.address] = discover_time
# setting up the largest discovery gap between devices
max_discover_gap[device.address] = 0
else:
# increments discovery
discovered[device.address] = discovered[device.address] + 1
# calculate gap in discovery time
gap = discover_time - last_discover[device.address]
# if gap is greater than discover gap then update value
if gap > max_discover_gap[device.address]:
max_discover_gap[device.address] = gap
# set last discovered to new discover time
last_discover[device.address] = discover_time
time_diff = last_discover[device.address] - first_discover[device.address]
# random vairable probably for experimentation
'''
if time_diff > maxtime:
maxtime = time_diff
print("MAXTIME:" , device.address, round(maxtime,2), discovered[device.address], round(max_discover_gap[device.address],2))
'''
# writes scanned data into our file
# creating and adding the scan info to record file
f.write(str(round(discover_time,2))+ ","+ device.address[:8] + "," + device.address + "," + str(advertising_data.rssi) + "\n")
f.flush()
'''
print(device.address, advertising_data.rssi, discovered[device.address])
if discovered[device.address] > maxcount:
maxcount = discovered[device.address]
print("MAX:" , device.address, maxcount)
for i in advertising_data.manufacturer_data:
print(advertising_data.manufacturer_data[i])
if device.address not in discovered:
'''
# allows wait_for_enter to run concurrently to other subroutines
asyncio.create_task(wait_for_enter())
# call bleakscanner and bleakscanner delivers the device, advertising_data into the callback function
async with BleakScanner(callback) as scanner:
...
# Important! Wait for an event to trigger stop, otherwise scanner
# will stop immediately.
await stop_event.wait()
# scanner stops when block exits
...
# writing an ending line to scanned data
f.write("[BTLECAPT]\n")
f.flush()
finally:
f.close()
asyncio.run(main())