mirror of
https://gitee.com/Vancouver2017/luban-lite.git
synced 2025-12-17 01:28:54 +00:00
125 lines
4.2 KiB
Python
125 lines
4.2 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import argparse
|
|
import asyncio
|
|
import hci_commands
|
|
import sys
|
|
import logging
|
|
import hci
|
|
import traceback
|
|
import util
|
|
import transport_factory
|
|
|
|
|
|
def parse_arguments():
|
|
parser = argparse.ArgumentParser(
|
|
description='Check HCI device address type and address',
|
|
epilog='How to run script: \
|
|
sudo python check_addr.py -i 0 1 2 \
|
|
-t path/to/custom_transport_dir')
|
|
parser.add_argument('-i', '--indexes', type=str, nargs='*',
|
|
help='specify hci adapters indexes', default=0)
|
|
parser.add_argument('-t', '--transport_directory', type=str, nargs='*',
|
|
help='specify hci transport directory path. \
|
|
Use for transport other than the default linux socket.',
|
|
default=["default"])
|
|
try:
|
|
args = parser.parse_args()
|
|
if (isinstance(args.transport_directory, list)):
|
|
args.transport_directory = args.transport_directory.pop()
|
|
else:
|
|
args.transport_directory = args.transport_directory
|
|
|
|
except Exception as e:
|
|
print(traceback.format_exc())
|
|
return args
|
|
|
|
|
|
async def main(dev: hci_commands.HCI_Commands):
|
|
result = tuple()
|
|
task = asyncio.create_task(dev.rx_buffer_q_wait())
|
|
await dev.cmd_reset()
|
|
await dev.cmd_read_bd_addr()
|
|
|
|
if hci.bdaddr != '00:00:00:00:00:00':
|
|
logging.info("Type public: %s, address: %s",
|
|
hci.PUBLIC_ADDRESS_TYPE, hci.bdaddr)
|
|
result = (0, hci.bdaddr)
|
|
print("Public address: ", result)
|
|
else:
|
|
await dev.cmd_vs_read_static_addr()
|
|
if hci.static_addr != '00:00:00:00:00:00':
|
|
logging.info("Type static random: %s, address: %s",
|
|
hci.STATIC_RANDOM_ADDRESS_TYPE, hci.static_addr)
|
|
result = (1, hci.static_addr)
|
|
print("Static random address: ", result)
|
|
else:
|
|
addr = hci.gen_static_rand_addr()
|
|
logging.info("Type static random: %s, generated address: %s",
|
|
hci.STATIC_RANDOM_ADDRESS_TYPE, addr)
|
|
result = (1, addr)
|
|
print("Generated static random address: ", result)
|
|
task.cancel()
|
|
return result
|
|
|
|
|
|
def check_addr(
|
|
device_indexes: list,
|
|
addresses: list,
|
|
transport_directory: str) -> list:
|
|
util.configure_logging(f"log/check_addr.log", clear_log_file=True)
|
|
|
|
logging.info(f"Devices indexes: {device_indexes}")
|
|
for index in device_indexes:
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
loop.set_debug(True)
|
|
|
|
transport = transport_factory.TransportFactory(device_index=str(
|
|
index), asyncio_loop=loop, transport_directory=transport_directory)
|
|
|
|
bt_dev = hci_commands.HCI_Commands(send=transport.send,
|
|
rx_buffer_q=transport.rx_buffer_q,
|
|
asyncio_loop=loop)
|
|
|
|
transport.start()
|
|
|
|
addresses.append(loop.run_until_complete(main(bt_dev)))
|
|
|
|
transport.stop()
|
|
loop.close()
|
|
|
|
logging.info(f"Finished: {addresses}")
|
|
return addresses
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
args = parse_arguments()
|
|
print(args)
|
|
addresses = []
|
|
addresses = check_addr(args.indexes, addresses,
|
|
args.transport_directory)
|
|
print(addresses)
|
|
except Exception as e:
|
|
print(traceback.format_exc())
|
|
finally:
|
|
sys.exit()
|