#!/usr/bin/python # # Copyright (C) 2021 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Reports on merge status of Java files in a package based on four repositories: baseline - upstream baseline used for previous Android release release - files in previous Android release current - target for merge upstream - new upstream being merged Example output: $ tools/upstream/pkg-status java.security.spec AlgorithmParameterSpec.java: Unchanged, Done DSAGenParameterSpec.java: Added, TO DO DSAParameterSpec.java: Unchanged, Done DSAPrivateKeySpec.java: Unchanged, Done DSAPublicKeySpec.java: Unchanged, Done ECField.java: Unchanged, Done ECFieldF2m.java: Unchanged, Done ECFieldFp.java: Unchanged, Done ECGenParameterSpec.java: Updated, TO DO [...] """ import argparse import hashlib import os import os.path import sys from enum import Enum from pathlib import Path RED = '\u001b[31m' GREEN = "\u001b[32m" YELLOW = "\u001b[33m" RESET = "\u001b[0m" def colourise(colour, string): """Wrap a string with an ANSI colour code""" return "%s%s%s" % (colour, string, RESET) def red(string): """Wrap a string with a red ANSI colour code""" return colourise(RED, string) def green(string): """Wrap a string with a green ANSI colour code""" return colourise(GREEN, string) def yellow(string): """Wrap a string with a yellow ANSI colour code""" return colourise(YELLOW, string) class WorkStatus(Enum): """Enum for a file's work completion status""" UNKNOWN = ('Unknown', red) TODO = ('TO DO', yellow) DONE = ('Done', green) PROBABLY_DONE = ('Probably done', green) ERROR = ('Error', red) def colourise(self, string): """Colourise a string using the method for this enum value""" return self.colourfunc(string) def __init__(self, description, colourfunc): self.description = description self.colourfunc = colourfunc class MergeStatus(Enum): """Enum for a file's merge status""" UNKNOWN = 'Unknown!' MISSING = 'Missing' ADDED = 'Added' DELETED = 'Deleted or moved' UNCHANGED = 'Unchanged' UPDATED = 'Updated' def __init__(self, description): self.description = description class MergeConfig: """ Configuration for an upstream merge. Encapsulates the paths to each of the required code repositories. """ def __init__(self, baseline, release, current, upstream) -> None: self.baseline = baseline self.release = release self.current = current self.upstream = upstream try: # Root of checked-out Android sources, set by the "lunch" command. self.android_build_top = os.environ['ANDROID_BUILD_TOP'] # Root of repository snapshots. self.ojluni_upstreams = os.environ['OJLUNI_UPSTREAMS'] except KeyError: sys.exit('`lunch` and set OJLUNI_UPSTREAMS first.') def java_dir(self, repo, pkg): relpath = pkg.replace('.', '/') if repo == self.current: return '%s/libcore/%s/src/main/java/%s' % ( self.android_build_top, self.current, relpath) else: return '%s/%s/%s' % (self.ojluni_upstreams, repo, relpath) def baseline_dir(self, pkg): return self.java_dir(self.baseline, pkg) def release_dir(self, pkg): return self.java_dir(self.release, pkg) def current_dir(self, pkg): return self.java_dir(self.current, pkg) def upstream_dir(self, pkg): return self.java_dir(self.upstream, pkg) class JavaPackage: """ Encapsulates information about a single Java package, notably paths to it within each repository. """ def __init__(self, config, name) -> None: self.name = name self.baseline_dir = config.baseline_dir(name) self.release_dir = config.release_dir(name) self.current_dir = config.current_dir(name) self.upstream_dir = config.upstream_dir(name) @staticmethod def list_candidate_files(path): """Returns a list of all the Java filenames in a directory.""" return list(filter( lambda f: f.endswith('.java') and f != 'package-info.java', os.listdir(path))) def all_files(self): """Returns the union of all the Java filenames in all repositories.""" files = set(self.list_candidate_files(self.baseline_dir)) files.update(self.list_candidate_files(self.release_dir)) files.update(self.list_candidate_files(self.upstream_dir)) files.update(self.list_candidate_files(self.current_dir)) return sorted(list(files)) def java_files(self): """Returns a list of JavaFiles corresponding to all filenames.""" return map(lambda f: JavaFile(self, f), self.all_files()) def baseline_path(self, filename): return Path(self.baseline_dir + '/' + filename) def release_path(self, filename): return Path(self.release_dir + '/' + filename) def current_path(self, filename): return Path(self.current_dir + '/' + filename) def upstream_path(self, filename): return Path(self.upstream_dir + '/' + filename) def report_merge_status(self): """Report on the mergse status of this package.""" for file in self.java_files(): merge_status, work_status = file.status() text = '%s: %s, %s' % \ ( file.name, merge_status.description, work_status.description) print(work_status.colourise(text)) if work_status == WorkStatus.ERROR: print(file.baseline_sum, file.baseline) print(file.release_sum, file.release) print(file.current_sum, file.current) print(file.upstream_sum, file.upstream) class JavaFile: """ Encapsulates information about a single Java file in a package across all of the repositories involved in a merge. """ def __init__(self, package, name): self.package = package self.name = name # Paths for this file in each repository self.baseline = package.baseline_path(name) self.release = package.release_path(name) self.upstream = package.upstream_path(name) self.current = package.current_path(name) # Checksums for this file in each repository, or None if absent self.baseline_sum = self.checksum(self.baseline) self.release_sum = self.checksum(self.release) self.upstream_sum = self.checksum(self.upstream) self.current_sum = self.checksum(self.current) # List of methods for determining file's merge status. # Order matters - see merge_status() for details self.merge_status_methods = [ (self.check_for_missing, MergeStatus.MISSING), (self.check_for_unchanged, MergeStatus.UNCHANGED), (self.check_for_added_upstream, MergeStatus.ADDED), (self.check_for_removed_upstream, MergeStatus.DELETED), (self.check_for_changed_upstream, MergeStatus.UPDATED), ] # Map of methods from merge status to determine work status self.work_status_methods = { MergeStatus.MISSING: self.calculate_missing_work_status, MergeStatus.UNCHANGED: self.calculate_unchanged_work_status, MergeStatus.ADDED: self.calculate_added_work_status, MergeStatus.DELETED: self.calculate_deleted_work_status, MergeStatus.UPDATED: self.calculate_updated_work_status, } def is_android_changed(self): """ Returns true if the file was changed between the baseline and Android release. """ return self.is_in_release() and self.baseline_sum != self.release_sum def is_android_unchanged(self): """ Returns true if the file is in the Android release and is unchanged. """ return self.is_in_release() and self.baseline_sum == self.release_sum def check_for_changed_upstream(self): """Returns true if the file is changed upstream since the baseline.""" return self.baseline_sum != self.upstream_sum def is_in_baseline(self): return self.baseline_sum is not None def is_in_release(self): """Returns true if the file is present in the baseline and release.""" return self.is_in_baseline() and self.release_sum is not None def is_in_current(self): """Returns true if the file is in current, release and baseline.""" return self.is_in_release() and self.current_sum is not None def is_in_upstream(self): return self.upstream_sum is not None def check_for_missing(self): """ Returns true if the file is expected to be in current, but isn't. """ return self.is_in_release() and self.is_in_upstream() \ and not self.is_in_current() def removed_in_release(self): """Returns true if the file was removed by Android in the release.""" return self.is_in_baseline() and not self.is_in_release() def check_for_removed_upstream(self): """Returns true if the file was removed upstream since the baseline.""" return self.is_in_baseline() and not self.is_in_upstream() def check_for_added_upstream(self): """Returns true if the file was added upstream since the baseline.""" return self.is_in_upstream() and not self.is_in_baseline() def check_for_unchanged(self): """Returns true if the file is unchanged upstream since the baseline.""" return not self.check_for_changed_upstream() def merge_status(self): """ Returns the merge status for this file, or UNKNOWN. Tries each merge_status_method in turn, and if one returns true then this method returns the associated merge status. """ for (method, status) in self.merge_status_methods: if method(): return status return MergeStatus.UNKNOWN def work_status(self): """ Returns the work status for this file. Looks up a status method based on the merge statis and uses that to determine the work status. """ status = self.merge_status() if status in self.work_status_methods: return self.work_status_methods[status]() return WorkStatus.ERROR @staticmethod def calculate_missing_work_status(): """Missing files are always an error.""" return WorkStatus.ERROR def calculate_unchanged_work_status(self): """ File is unchanged upstream, so should be unchanged between release and current. """ if self.current_sum == self.release_sum: return WorkStatus.DONE return WorkStatus.UNKNOWN def calculate_added_work_status(self): """File was added upstream so needs to be added to current.""" if self.current_sum is None: return WorkStatus.TODO if self.current_sum == self.upstream_sum: return WorkStatus.DONE # XXX check for change markers if android changed return WorkStatus.UNKNOWN def calculate_deleted_work_status(self): """File was removed upstream so needs to be removed from current.""" if self.is_in_current(): return WorkStatus.TODO return WorkStatus.DONE def calculate_updated_work_status(self): """File was updated upstream.""" if self.current_sum == self.upstream_sum: if self.is_android_unchanged(): return WorkStatus.DONE # Looks like Android changes are missing in current return WorkStatus.ERROR if self.is_android_unchanged(): return WorkStatus.TODO # If we get here there are upstream and Android changes that need # to be merged, If possible use the file copyright date as a # heuristic to determine if upstream has been merged into current release_copyright = self.get_copyright(self.release) current_copyright = self.get_copyright(self.current) upstream_copyright = self.get_copyright(self.upstream) if release_copyright == upstream_copyright: # Upstream copyright same as last release, so can't infer anything return WorkStatus.UNKNOWN if current_copyright == upstream_copyright: return WorkStatus.PROBABLY_DONE if current_copyright == release_copyright: return WorkStatus.TODO # Give up return WorkStatus.UNKNOWN def status(self): return self.merge_status(), self.work_status() @staticmethod def checksum(path): """Returns a checksum string for a file, SHA256 as a hex string.""" try: with open(path, 'rb') as file: bytes = file.read() return hashlib.sha256(bytes).hexdigest() except: return None @staticmethod def get_copyright(file): """Returns the upstream copyright line for a file.""" try: with open(file, 'r') as file: for count in range(5): line = file.readline() if line.startswith( ' * Copyright') and 'Android' not in line: return line return None except: return None def main(): parser = argparse.ArgumentParser( description='Report on merge status of Java packages', formatter_class=argparse.ArgumentDefaultsHelpFormatter) # TODO(prb): Add help for available repositories parser.add_argument('-b', '--baseline', default='expected', help='Baseline repo') parser.add_argument('-r', '--release', default='sc-release', help='Last released repo') parser.add_argument('-u', '--upstream', default='11+28', help='Upstream repo.') parser.add_argument('-c', '--current', default='ojluni', help='Current repo.') parser.add_argument('pkgs', nargs="+", help='Packages to report on') args = parser.parse_args() config = MergeConfig(args.baseline, args.release, args.current, args.upstream) for pkg_name in args.pkgs: try: package = JavaPackage(config, pkg_name) package.report_merge_status() except Exception as e: print(red("ERROR: Unable to process package " + pkg_name + e)) if __name__ == "__main__": main()