0

I will provide my code. The primary purpose of this is vehicle identification. My only hypothesis is my laptop having multiple Bluetooth signals but it seems to happen for other devices as well. This is a pretty simple goal and the code itself may not even be the problem. From what I have found there isn't any known side effect of bleak that causes this. Here is the code any help will be appreciated:

# -*- coding: utf-8 -*-
"""
Source: https://bleak.readthedocs.io/en/latest/api/scanner.html
"""

import asyncio
from bleak import BleakScanner
import time


# identifier code for the scanning device
device_id = ""
while device_id not in ("1", "2"):
    device_id = input("Enter a 1 or a 2 for either scanner 1 or scanner 2:")
# running dictionary of discovered devices with discovery count
discovered = {}
# when the device was first discoverd
first_discover = {}
# when the device was last discovered
last_discover = {}
# max gap between discovery
max_discover_gap = {}

maxcount = 0
maxtime = 0

#Dynamic File Names
timestamp_str = time.strftime("%Y-%m-%d_%H-%M")
TXT_FILE = "scanner_" + device_id + "_" + "ble_data_" + timestamp_str + ".txt"

async def main():

    stop_event = asyncio.Event()
    # initializing scanning recording file
    # TODO: does not currently allow the program to run twice
    # How we would handle the event where the program is interrupted and
    # has to run again must be decided

    f = open(TXT_FILE, "w", encoding="utf-8")
    try:
        f.write("[Device]\n" + device_id + "\n[BTLECAPT]\n")
        f.flush()
    
        # Asyncronous event detection for stopping the program
        async def wait_for_enter():
            print("Press Enter to stop scanning...")
            # do more research on async event loops
            await asyncio.get_event_loop().run_in_executor(None, input)
            #stops the current loop
            stop_event.set()
        

        def callback(device, advertising_data):

            global maxcount, maxtime
            # todo: do something with incoming data
            print(device.address, advertising_data.rssi)

            discover_time = time.time()

            # If undiscovered initialize discovery
            if device.address not in discovered:
                # begins discovery count
                discovered[device.address] = 1
                # Setting the first instance  it was discovered
                first_discover[device.address] = discover_time
                # setting up the largest discovery gap between devices
                max_discover_gap[device.address] = 0
            
            else:
                # increments discovery
                discovered[device.address] = discovered[device.address] + 1
            
                # calculate gap in discovery time
                gap = discover_time - last_discover[device.address]

                # if gap is greater than discover gap then update value
                if gap > max_discover_gap[device.address]:
                    max_discover_gap[device.address] = gap

            # set last discovered to new discover time
            last_discover[device.address] = discover_time
            time_diff = last_discover[device.address] - first_discover[device.address]

            # random vairable probably for experimentation
            '''
            if time_diff > maxtime:
                maxtime = time_diff
                print("MAXTIME:" , device.address, round(maxtime,2), discovered[device.address], round(max_discover_gap[device.address],2))
            '''

            # writes scanned data into our file
                # creating and adding the scan info to record file
            f.write(str(round(discover_time,2))+ ","+ device.address[:8] + "," + device.address + "," + str(advertising_data.rssi)  + "\n")
            f.flush()
            
            
            '''
            print(device.address, advertising_data.rssi, discovered[device.address])
            if discovered[device.address] > maxcount:
                maxcount = discovered[device.address]
                print("MAX:" , device.address, maxcount)
            for i in advertising_data.manufacturer_data:
                print(advertising_data.manufacturer_data[i])
                if device.address not in discovered:
            '''

        # allows wait_for_enter to run concurrently to other subroutines
        asyncio.create_task(wait_for_enter())

        # call bleakscanner and bleakscanner delivers the device, advertising_data into the callback function
        async with BleakScanner(callback) as scanner:
            ...
            # Important! Wait for an event to trigger stop, otherwise scanner
            # will stop immediately.
            await stop_event.wait()

        # scanner stops when block exits
        ...

        # writing an ending line to scanned data
        f.write("[BTLECAPT]\n")
        f.flush()
        
    finally:
        f.close()

asyncio.run(main())
2
  • 2
    There's no question here. Please describe the expected result and what you're getting instead. And if you could simplify the code to a minimal reproducible example, that would be great. Commented Oct 28 at 16:19
  • describe problem in question's body, not in title. Commented Oct 29 at 21:31

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.