Files
刘可亮 11c97ef399 v1.2.1
2025-07-22 11:15:46 +08:00

125 lines
4.2 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import argparse
import asyncio
import hci_commands
import sys
import logging
import hci
import traceback
import util
import transport_factory
def parse_arguments():
parser = argparse.ArgumentParser(
description='Check HCI device address type and address',
epilog='How to run script: \
sudo python check_addr.py -i 0 1 2 \
-t path/to/custom_transport_dir')
parser.add_argument('-i', '--indexes', type=str, nargs='*',
help='specify hci adapters indexes', default=0)
parser.add_argument('-t', '--transport_directory', type=str, nargs='*',
help='specify hci transport directory path. \
Use for transport other than the default linux socket.',
default=["default"])
try:
args = parser.parse_args()
if (isinstance(args.transport_directory, list)):
args.transport_directory = args.transport_directory.pop()
else:
args.transport_directory = args.transport_directory
except Exception as e:
print(traceback.format_exc())
return args
async def main(dev: hci_commands.HCI_Commands):
result = tuple()
task = asyncio.create_task(dev.rx_buffer_q_wait())
await dev.cmd_reset()
await dev.cmd_read_bd_addr()
if hci.bdaddr != '00:00:00:00:00:00':
logging.info("Type public: %s, address: %s",
hci.PUBLIC_ADDRESS_TYPE, hci.bdaddr)
result = (0, hci.bdaddr)
print("Public address: ", result)
else:
await dev.cmd_vs_read_static_addr()
if hci.static_addr != '00:00:00:00:00:00':
logging.info("Type static random: %s, address: %s",
hci.STATIC_RANDOM_ADDRESS_TYPE, hci.static_addr)
result = (1, hci.static_addr)
print("Static random address: ", result)
else:
addr = hci.gen_static_rand_addr()
logging.info("Type static random: %s, generated address: %s",
hci.STATIC_RANDOM_ADDRESS_TYPE, addr)
result = (1, addr)
print("Generated static random address: ", result)
task.cancel()
return result
def check_addr(
device_indexes: list,
addresses: list,
transport_directory: str) -> list:
util.configure_logging(f"log/check_addr.log", clear_log_file=True)
logging.info(f"Devices indexes: {device_indexes}")
for index in device_indexes:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(True)
transport = transport_factory.TransportFactory(device_index=str(
index), asyncio_loop=loop, transport_directory=transport_directory)
bt_dev = hci_commands.HCI_Commands(send=transport.send,
rx_buffer_q=transport.rx_buffer_q,
asyncio_loop=loop)
transport.start()
addresses.append(loop.run_until_complete(main(bt_dev)))
transport.stop()
loop.close()
logging.info(f"Finished: {addresses}")
return addresses
if __name__ == '__main__':
try:
args = parse_arguments()
print(args)
addresses = []
addresses = check_addr(args.indexes, addresses,
args.transport_directory)
print(addresses)
except Exception as e:
print(traceback.format_exc())
finally:
sys.exit()