#!/usr/bin/python3 # # Copyright (c) 2016 Qualcomm Atheros, Inc. # Copyright (c) 2018,2020 The Linux Foundation. All rights reserved. # # Permission to use, copy, modify, and/or distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import os import logging import re import argparse import shutil import sys import filecmp import functools import subprocess import email # global variables logger = None BRANCH_DEFAULT_PRIORITY = 1000 BRANCH_PRIORITY_FILE = '.priority' WHENCE_FILE = 'WHENCE' NOTICE_FILE = 'Notice.txt' NOTICE_FILE_LEN_MIN = 5000 ATH12K_DIR = 'ath12k' TESTING_BRANCH = 'testing' FIRMWARE_BLACKLIST = [ ] BRANCH_BLACKLIST = [ 'msm', ] @functools.total_ordering class Hardware(): def get_path(self): return os.path.join(self.hw, self.hw_ver) def __eq__(self, other): return self.name == other.name def __lt__(self, other): return self.name < other.name def __repr__(self): return self.__str__() def __str__(self): return 'Hardware(\'%s\'): %s %s' % (self.name, self.board_files, sorted(self.firmware_branches)) def __init__(self, hw, hw_ver): # QCA6174 self.hw = hw # hw3.0 self.hw_ver = hw_ver self.name = '%s %s' % (hw, hw_ver) self.firmware_branches = [] self.board_files = [] @functools.total_ordering class FirmwareBranch(): # return the branch name without 'testing/' prefix def get_clean_name(self): if self.testing_branch: return self.name[len(TESTING_BRANCH):] return self.name def __eq__(self, other): return self.priority == other.priority and \ self.get_clean_name() == other.get_clean_name() def __lt__(self, other): # '.' is always of the lower priority if self.name == '.': return True if other.name == '.': return False if self.priority != other.priority: if self.priority < other.priority: return True else: return False return self.get_clean_name() < other.get_clean_name() def __repr__(self): return self.__str__() def __str__(self): return 'FirmwareBranch(\'%s\'): %s' % (self.name, sorted(self.firmwares)) def __init__(self, name, path=None): self.name = name self.firmwares = [] if name.startswith(TESTING_BRANCH): self.testing_branch = True # testing branches use lower priority by default so that # they are ordered below normal branches self.priority = 0 else: self.testing_branch = False self.priority = BRANCH_DEFAULT_PRIORITY if path: priority_path = os.path.join(path, BRANCH_PRIORITY_FILE) if os.path.isfile(priority_path): try: f = open(priority_path, 'r') buf = f.read() f.close() self.priority = int(buf) except Exception as e: logger.error('Failed to read %s: %s' % (priority_path, e)) class BoardFile(): @staticmethod def create_from_path(path): filename = os.path.basename(path) match = re.search(r'^board-(\d+).bin', filename) if match is None: match = re.search(r'^board.bin', filename) if match is None: return None if len(match.groups()) > 1: bd_api = match.group(1) else: bd_api = None return BoardFile(path, bd_api) def get_basename(self): return os.path.basename(self.path) def __repr__(self): return self.__str__() def __str__(self): return '%s' % (self.get_basename()) def __init__(self, path, bd_api): # full path to the board file, including directories and filename self.path = path # board api version, eg. '2' in board-2.bin self.bd_api = bd_api class Firmware(): @staticmethod def create_from_path(path): if not os.path.isdir(path): raise Exception('Firmware path %s is not a directory') fw_ver = os.path.basename(path) return Firmware(fw_ver, path) def get_files_with_path(self): result = [] for filename in self.filenames: result.append(os.path.join(self.path, filename)) return result def get_notice_path(self): return os.path.join(self.path, self.notice_filename) def __eq__(self, other): return self.fw_ver == other.fw_ver def __ne__(self, other): return not self.__eq__(other) # FIXME: firmware-5.bin_10.4-3.2-00080 and # firmware-5.bin_10.4-3.2.1-00028 are sorted incorrectly def __lt__(self, other): s = self.fw_ver o = other.fw_ver # FIXME: An ugly hack that to make the comparison easier to # implement. Just to get some sort of simple sorting working # replace '-' with '.' in version string. But now for example # '10.2.4.70.2 > 10.2.4.70-2' is not compared correctly. s = s.replace('-', '.') o = o.replace('-', '.') s = s.split('.') o = o.split('.') s2 = s o2 = o s = [] o = [] for t in s2: try: k = int(t) except: k = t s.append(k) for t in o2: try: k = int(t) except: k = t o.append(k) l = min(len(s), len(o)) for i in range(l): if s[i] < o[i]: return True elif s[i] > o[i]: return False if len(s) > len(o): return False return True def __le__(self, other): return self.__lt__(other) or self.__eq__(other) def __gt__(self, other): return not self.__le__(other) def __ge__(self, other): return self.__gt__(other) or self.__eq__(other) def __repr__(self): return self.__str__() def __str__(self): return '%s' % (self.fw_ver) # path can be None with unittests def __init__(self, fw_ver, path=None): # path to the release directory, no filenames self.path = path # filenames of all firmware files, excluding notice file self.filenames = [] # filename of the notice file, excluding path self.notice_filename = None # firmware version self.fw_ver = fw_ver if path: files = os.listdir(path) files.sort() for filename in files: if filename == NOTICE_FILE: logger.debug('%s: %s' % (self.fw_ver, filename)) self.notice_filename = filename continue self.filenames.append(filename) logger.debug('%s: %s' % (self.fw_ver, self.filenames)) # check notice file if self.notice_filename is None: print('%s: missing %s' % (self.path, NOTICE_FILE)) return notice_path = os.path.join(self.path, self.notice_filename) f = open(notice_path, 'r') try: buf = f.read() except UnicodeDecodeError as e: print('%s: invalid utf-8: %s' % (notice_path, e)) self.notice_filename = None return finally: f.close() if len(buf) < NOTICE_FILE_LEN_MIN: print('%s: too short: %d B' % (notice_path, len(buf))) self.notice_filename = None return def scan_branch_dir(path): fw_list = [] files = os.listdir(path) files.sort() for f in files: f_path = os.path.join(path, f) if not os.path.isdir(f_path): continue firmware = Firmware.create_from_path(f_path) if firmware: if firmware.fw_ver in FIRMWARE_BLACKLIST: logger.debug('Blacklisted firmware release: %s' % (firmware.fw_ver)) continue logger.debug('Found firmware release: %s' % (firmware.fw_ver)) fw_list.append(firmware) continue logger.warning('Unknown file: %s' % (f_path)) return fw_list # QCA988X/hw2.0 def scan_hw_ver(hw): path = hw.get_path() files = os.listdir(path) files.sort() for fw_branch in files: if fw_branch == TESTING_BRANCH: # scan all directories under testing branch, eg. testing/1.2.3.4 dirs = os.listdir(os.path.join(path, fw_branch)) fw_branches = [] for d in dirs: fw_branches.append(os.path.join(TESTING_BRANCH, d)) else: fw_branches = [fw_branch] for fw_branch in fw_branches: fw_branch_path = os.path.join(path, fw_branch) if not os.path.isdir(fw_branch_path): continue if os.path.basename(fw_branch_path) in BRANCH_BLACKLIST: logger.debug('Blacklisted firmware branch: %s' % (fw_branch_path)) continue logger.debug('Found firmware branch: %s' % (fw_branch)) fb = FirmwareBranch(fw_branch, fw_branch_path) hw.firmware_branches.append(fb) fw = scan_branch_dir(fw_branch_path) fb.firmwares += fw files = os.listdir(path) for f_path in files: boardfile = BoardFile.create_from_path(os.path.join(path, f_path)) if boardfile: logger.debug('Found board file: %s' % (f_path)) hw.board_files.append(boardfile) continue # QCA98XX def scan_hw(path): hws = [] files = os.listdir(path) files.sort() for hw_ver in files: hw_ver_path = os.path.join(path, hw_ver) if not os.path.isdir(hw_ver_path): continue # skip symbolic links, for example WCN6855 hw2.1 if os.path.islink(hw_ver_path): continue logger.debug('Found hw version: %s' % (hw_ver)) hw = Hardware(path, hw_ver) scan_hw_ver(hw) if len(hw.firmware_branches) == 0: logger.debug('Skipping due to no firmware branches found: %s' % (hw.name)) continue hws.append(hw) return hws def scan_repository(directory): hws = {} files = os.listdir(directory) files.sort() for hw_name in files: if not os.path.isdir(hw_name): continue # skip hidden directories if hw_name.startswith('.'): continue logger.debug('Found hw: %s' % (hw_name)) hw_list = scan_hw(hw_name) for hw in hw_list: hws[hw.name] = hw return hws # srcpath: full pathname (directory + filename) where copy from def install_file(args, srcpath, destdir, destfilename): logger.debug('install_file(%s, %s, %s)' % (srcpath, destdir, destfilename)) if args.dry_run: return destpath = os.path.join(destdir, destfilename) destdir = os.path.dirname(destpath) if not os.path.isdir(destdir): os.makedirs(destdir) logger.info('\t%s -> %s' % (srcpath, destpath)) shutil.copyfile(srcpath, destpath) return destpath def get_firmware_version(path): cmd = ['ath12k-fwencoder', '--info', path] info = subprocess.check_output(cmd, universal_newlines=True) msg = email.message_from_string(info) return msg['FirmwareVersion'] def get_board_crc32(path): cmd = ['ath12k-fwencoder', '--crc32', path] return subprocess.check_output(cmd, universal_newlines=True).strip() # print indent def pi(level, msg): print('%s%s' % (level * '\t', msg)) # The WHENCE file update is implemented by using board-2.bin entry as # an "anchor". All entries (including File, Version and License) for # that hardware directory will be replaces by the new ones. As the # filepaths is always sorted the changes visible in git-diff will be # actually changed files. # # Only called during firmware updates. Board file updates don't need # changes in WHENCE and that's why this function doesn't support board # file changes. def whence_update(linux_firmware, filepaths, version): whencepath = os.path.join(linux_firmware, WHENCE_FILE) license_relpath = None if not os.path.exists(whencepath): return None f = open(whencepath, 'r') buf = f.read() f.close() dirname = os.path.dirname(os.path.relpath(filepaths[0], linux_firmware)) pattern = r'(File: %s/board-\d+.bin\n)(.*%s.*?\n)+' % (dirname, dirname) # \g<1> is same as \1 but needed to separate from the version string replace = r'\g<1>' for filepath in filepaths: relpath = os.path.relpath(filepath, linux_firmware) if relpath.endswith(NOTICE_FILE): license_relpath = relpath continue replace += r'File: %s\n' % (relpath) if version is not None: replace += r'Version: %s\n' % (version) # license (or notice.txt to be exact) needs to be last if license_relpath is not None: replace += r'File: %s\n' % (license_relpath) (buf, sub_count) = re.subn(pattern, replace, buf, flags=re.MULTILINE | re.DOTALL) if sub_count != 1: logger.error('Failed to add %s to WHENCE: %d' % (version, sub_count)) return None f = open(whencepath, 'w') f.write(buf) f.close() return whencepath def whence_add(linux_firmware, filepaths, version=None): whencepath = os.path.join(linux_firmware, WHENCE_FILE) license_relpath = None if not os.path.exists(whencepath): return None f = open(whencepath, 'r') buf = f.read() f.close() pattern = r'(Driver: ath12k.*?\n\n.*?)\n\n' # \g<1> is same as \1 but needed to separate from the version string replace = r'\g<1>\n' for filepath in filepaths: relpath = os.path.relpath(filepath, linux_firmware) if relpath.endswith(NOTICE_FILE): license_relpath = relpath continue replace += r'File: %s\n' % (relpath) if version is not None: replace += r'Version: %s\n' % (version) # license (or notice.txt to be exact) needs to be last if license_relpath is not None: replace += r'File: %s\n' % (license_relpath) # empty line before the 'Licence: Redistributable.' line replace += r'\n' (buf, sub_count) = re.subn(pattern, replace, buf, flags=re.MULTILINE | re.DOTALL) if sub_count != 1: logger.error('Failed to add %s to WHENCE: %d' % (version, sub_count)) return None f = open(whencepath, 'w') f.write(buf) f.close() return whencepath def git_commit(args, msg, repodir, files): if not args.commit: # nothing to do return cmd = ['git', '-C', repodir, 'commit', '--quiet', '--signoff', '-m', msg] + files logger.debug('Running: %r' % (cmd)) subprocess.check_call(cmd) def git_add(args, repodir, files): if not args.commit: # nothing to do return cmd = ['git', '-C', repodir, 'add'] + files logger.debug('Running: %r' % (cmd)) subprocess.check_call(cmd) def git_rm(args, repodir, files): if not args.commit: # nothing to do return cmd = ['git', '-C', repodir, 'rm', '--quiet'] + files logger.debug('Running: %r' % (cmd)) subprocess.check_call(cmd) def cmd_check(args): scan_repository('.') def cmd_list(args): level = 0 hws = scan_repository('.') for hw in sorted(hws.values()): pi(level, '%s:' % (hw.name)) level += 1 # print board files if len(hw.board_files) > 0: pi(level, 'board') level += 1 for board_file in sorted(hw.board_files): pi(level, board_file) level -= 1 # print firmware branches for branch in sorted(hw.firmware_branches): if len(branch.firmwares) == 0: # don't print empty branches continue pi(level, '%s' % (branch.name)) level += 1 for fw in sorted(branch.firmwares): pi(level, fw.fw_ver) level -= 1 level -= 1 def cmd_list_hardware(args): hws = scan_repository('.') for hw in sorted(hws.values()): print(hw.name) def cmd_list_branches(args): hw_name = args.list_branches[0] hw_ver = args.list_branches[1] hws = scan_repository('.') for hw in sorted(hws.values()): if hw.name == '%s %s' % (hw_name, hw_ver): for branch in sorted(hw.firmware_branches): print(branch.name) return def cmd_list_releases(args): hw_name = args.list_releases[0] hw_ver = args.list_releases[1] fw_branch = args.list_releases[2] hws = scan_repository('.') for hw in sorted(hws.values()): if hw.name == '%s %s' % (hw_name, hw_ver): for branch in sorted(hw.firmware_branches): if fw_branch == branch.name: for fw in branch.firmwares: print(fw.fw_ver) return def cmd_list_lib_dir(args): fw_dir = args.list_lib_dir[0] ath12k_dir = os.path.join(fw_dir, ATH12K_DIR) if not os.path.exists(ath12k_dir): logger.error('directory %s does not exist, aborting' % (ath12k_dir)) sys.exit(1) if not os.path.isdir(ath12k_dir): logger.error('%s is not a directory, aborting' % (ath12k_dir)) sys.exit(1) # sort the results based on dirpath for (dirpath, dirnames, filenames) in sorted(os.walk(ath12k_dir)): found = [] for filename in sorted(filenames): path = os.path.join(dirpath, filename) match = re.match(r'firmware.*\.bin', filename) if match is not None: # this is a firmware file s = '%s\t%s' % (filename, get_firmware_version(path)) found.append(s) match = re.match(r'board.*\.bin', filename) if match is not None: # this is a board file s = '%s\t%s' % (filename, get_board_crc32(path)) found.append(s) if len(found) > 0: # Just show QCA1234/hw1.0 directories. I would have liked # to use os.path functions here but just could not find # anything sensible there. pi(0, '%s:' % ('/'.join(dirpath.split('/')[-2:]))) for line in found: pi(1, line) def cmd_get_latest_in_branch(args): # As this command is mostly for scripts to parse, don't show # warnings etc to clutter the output, unless we are debugging of # course. if not args.debug: logger.setLevel(logging.ERROR) hws = scan_repository('.') args_hw = args.get_latest_in_branch[0] args_hwver = args.get_latest_in_branch[1] args_fwbranch = args.get_latest_in_branch[2] # TODO: hw is always in uppercase and hwver lower case, check that hw_name = '%s %s' % (args_hw, args_hwver) if hw_name not in hws: logger.error('Did not find hardware: %s' % (hw_name)) sys.exit(1) hw = hws[hw_name] fw_branch = None for b in hw.firmware_branches: if b.name == args_fwbranch: fw_branch = b break if fw_branch is None: logger.error('Did not find firmware branch: %s' % (args_fwbranch)) sys.exit(1) if len(fw_branch.firmwares) == 0: # no firmware images in this branch, just use return value 0 with no output sys.exit(0) print(sorted(fw_branch.firmwares)[-1].path) sys.exit(0) def cmd_get_latest_in_hw(args): # As this command is mostly for scripts to parse, don't show # warnings etc to clutter the output, unless we are debugging of # course. if not args.debug: logger.setLevel(logging.ERROR) hws = scan_repository('.') args_hw = args.get_latest[0] args_hwver = args.get_latest[1] # TODO: hw is always in uppercase and hwver lower case, check that hw_name = '%s %s' % (args_hw, args_hwver) if hw_name not in hws: logger.error('Did not find hardware: %s' % (hw_name)) sys.exit(1) hw = hws[hw_name] for branch in sorted(hw.firmware_branches, reverse=True): if len(branch.firmwares) == 0: # ignore an empty branch continue print(sorted(branch.firmwares)[-1].path) break sys.exit(0) def cmd_install(args): hws = scan_repository('.') linux_firmware = args.install[0] ath12kdir = os.path.join(linux_firmware, ATH12K_DIR) if not os.path.exists(ath12kdir): os.makedirs(ath12kdir) if not os.path.isdir(ath12kdir): logger.error('%s is not a directory' % (ath12kdir)) sys.exit(1) logger.debug('Installing to directory %s' % (ath12kdir)) for hw in sorted(hws.values()): bd_list = hw.board_files # every Hardware() should have at least one firmware branch, the # main '.' branch so no need to check the length fw_list = sorted(sorted(hw.firmware_branches)[-1].firmwares) if len(fw_list) == 0: logger.debug('no firmware images found for %s' % (hw)) continue destdir = os.path.join(ath12kdir, hw.get_path()) # install board files first as that's used as an "anchor" for # firmware files WHENCE updates for bd in bd_list: installed = [] dest = os.path.join(ath12kdir, bd.path) if not os.path.exists(dest) or not filecmp.cmp(bd.path, dest): if os.path.exists(dest): action = 'update' else: action = 'add' logger.info('Installing board file %s' % (bd.path)) destpath = install_file(args, bd.path, destdir, bd.get_basename()) installed.append(destpath) if action == 'add': whencepath = whence_add(linux_firmware, installed) if whencepath is not None: installed.append(whencepath) git_add(args, linux_firmware, installed) msg = 'ath12k: %s: %s %s' % (hw.name, action, bd.get_basename()) git_commit(args, msg, linux_firmware, installed) else: logger.debug('No update needed for %s' % (bd.path)) # install latest firmware fw = fw_list[-1] to_add = [] to_update = [] to_remove = [] # remove notice and board files from to_remove if os.path.exists(destdir): for filename in os.listdir(destdir): if filename in [NOTICE_FILE, 'board-2.bin']: continue to_remove.append(filename) # investigate what changes are needed for filepath in fw.get_files_with_path(): filename = os.path.basename(filepath) dest = os.path.join(destdir, filename) if not os.path.exists(dest): to_add.append(filename) continue if not filecmp.cmp(filepath, dest): to_update.append(filename) to_remove.remove(filename) if len(to_add) > 0 or len(to_update) > 0 or len(to_remove) > 0: if len(to_update) > 0 or len(to_remove) > 0: action = 'update' else: action = 'add' logger.info('Installing %s to %s' % (fw.fw_ver, destdir)) installed = [] for filepath in fw.get_files_with_path(): destpath = install_file(args, filepath, destdir, os.path.basename(filepath)) installed.append(destpath) # install notice file (every release must have a notice file) destpath = install_file(args, fw.get_notice_path(), destdir, fw.notice_filename) installed.append(destpath) # TODO: whence is not working with ath12k if action == 'update': # updating an existing firmware file whencepath = whence_update(linux_firmware, installed, fw.fw_ver) else: # adding a new firmware file whencepath = whence_add(linux_firmware, installed, fw.fw_ver) if whencepath is not None: installed.append(whencepath) git_add(args, linux_firmware, installed) for filename in to_remove: filepath = os.path.join(ath12kdir, hw.get_path(), filename) if os.path.basename(filepath) == 'regdb.bin': logger.debug('ignore %s so that it is not removed from target' % (filepath)) continue logger.info('\trm %s' % (filepath)) # even git_rm() removes the file need to remove the # file separately in case --commit is not used os.remove(filepath) git_rm(args, linux_firmware, [filepath]) installed.append(filepath) # "ath12k: QCA6390 hw2.0: update to WLAN.HST.1.0.1-01740-QCAHSTSWPLZ_V2_TO_X86-1" msg = 'ath12k: %s: %s to %s' % (hw.name, action, fw.fw_ver) git_commit(args, msg, linux_firmware, installed) else: logger.debug('No update needed in %s for %s' % (hw.name, fw.fw_ver)) def main(): global logger logger = logging.getLogger('ath12k-fw-repo') parser = argparse.ArgumentParser( description='Install firmware images from the ath12k-firmware git repository. Run it from the top directory of the working tree.') parser.add_argument('--debug', action='store_true', help='Enable debug messages.') parser.add_argument('--dry-run', action='store_true', help='Do not run any actual commands.') parser.add_argument('--check', action='store_true', help='Check the ath12k-firmware repository content for validity.') parser.add_argument('--list', action='store_true', help='List all files found from the ath12k-firmware repository.') parser.add_argument('--list-hardware', action='store_true', help='List all possible hardware versions found from the ath12k-firmware repository.') parser.add_argument('--list-branches', action='store', nargs=2, help='List all firmware branches for for this hardware version.') parser.add_argument('--list-releases', action='store', nargs=3, help='List all releases from a firmware branch.') parser.add_argument('--list-lib-dir', action='store', nargs=1, metavar='LIB_FIRMWARE_DIRECTORY', help='List all files found from the specified directory, which can either be a linux-firmware repository or /lib/firmware directory.') parser.add_argument('--install', action='store', nargs=1, metavar='DESTINATION', help='Install all ath12k firmware images to DESTINATION folder, for example /lib/firmware.') parser.add_argument('--commit', action='store_true', help='When installing files also git commit them, for example when updating linux-firmware.git.') parser.add_argument('--get-latest-in-branch', action='store', nargs=3, metavar=('HW', 'HWVER', 'BRANCH'), help='Show latest firmware version from a firmware branch. Just outputs the version for easy parsing in scripts.') parser.add_argument('--get-latest', action='store', nargs=2, metavar=('HW', 'HWVER'), help='Show latest firmware version for hardware version. Just outputs the version for easy parsing in scripts.') args = parser.parse_args() if args.debug: logging.basicConfig(format='%(levelname)s: %(message)s') logger.setLevel(logging.DEBUG) else: logging.basicConfig(format='%(message)s') logger.setLevel(logging.INFO) # commands if args.check: cmd_check(args) elif args.list: cmd_list(args) elif args.list_hardware: cmd_list_hardware(args) elif args.list_branches: cmd_list_branches(args) elif args.list_releases: cmd_list_releases(args) elif args.list_lib_dir: cmd_list_lib_dir(args) elif args.install: cmd_install(args) elif args.get_latest_in_branch: cmd_get_latest_in_branch(args) elif args.get_latest: cmd_get_latest_in_hw(args) else: logger.error('No command defined') parser.print_usage() if __name__ == "__main__": main()