Kanjut SHELL
Server IP : 172.16.15.8  /  Your IP : 18.217.224.165
Web Server : Apache
System : Linux zeus.vwu.edu 4.18.0-553.27.1.el8_10.x86_64 #1 SMP Wed Nov 6 14:29:02 UTC 2024 x86_64
User : apache ( 48)
PHP Version : 7.2.24
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON
Directory (0555) :  /bin/

[  Home  ][  C0mmand  ][  Upload File  ]

Current File : //bin/cg_annotate
#!/usr/bin/python3.11
# pyright: strict

# --------------------------------------------------------------------
# --- Cachegrind's annotator.                       cg_annotate.in ---
# --------------------------------------------------------------------

# This file is part of Cachegrind, a high-precision tracing profiler
# built with Valgrind.
#
# Copyright (C) 2002-2023 Nicholas Nethercote
#    njn@valgrind.org
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
#
# The GNU General Public License is contained in the file COPYING.

# This script reads Cachegrind output files and produces human-readable output.
#
# Use `make pyann` to "build" this script with `auxprogs/pybuild.rs` every time
# it is changed. This runs the formatters, type-checkers, and linters on
# `cg_annotate.in` and then generates `cg_annotate`.

from __future__ import annotations

import filecmp
import os
import re
import sys
from argparse import ArgumentParser, BooleanOptionalAction, Namespace
from collections import defaultdict
from typing import Callable, DefaultDict, NoReturn, TextIO


def die(msg: str) -> NoReturn:
    print("cg_annotate: error:", msg, file=sys.stderr)
    sys.exit(1)


SearchAndReplace = Callable[[str], str]


# A typed wrapper for parsed args.
class Args(Namespace):
    # None of these fields are modified after arg parsing finishes.
    diff: bool
    mod_filename: SearchAndReplace
    mod_funcname: SearchAndReplace
    show: list[str]
    sort: list[str]
    threshold: float  # a percentage
    show_percs: bool
    annotate: bool
    context: int
    cgout_filename: list[str]

    @staticmethod
    def parse() -> Args:
        # We support Perl-style `s/old/new/flags` search-and-replace
        # expressions, because that's how this option was implemented in the
        # old Perl version of `cg_diff`. This requires conversion from
        # `s/old/new/` style to `re.sub`. The conversion isn't a perfect
        # emulation of Perl regexps (e.g. Python uses `\1` rather than `$1` for
        # using captures in the `new` part), but it should be close enough. The
        # only supported flags are `g` (global) and `i` (ignore case).
        def search_and_replace(regex: str | None) -> SearchAndReplace:
            if regex is None:
                return lambda s: s

            # Extract the parts of an `s/old/new/tail` regex. `(?<!\\)/` is an
            # example of negative lookbehind. It means "match a forward slash
            # unless preceded by a backslash".
            m = re.match(r"s/(.*)(?<!\\)/(.*)(?<!\\)/(g|i|gi|ig|)$", regex)
            if m is None:
                raise ValueError

            # Forward slashes must be escaped in an `s/old/new/` expression,
            # but we then must unescape them before using them with `re.sub`.
            pat = m.group(1).replace(r"\/", r"/")
            repl = m.group(2).replace(r"\/", r"/")
            tail = m.group(3)

            if "g" in tail:
                count = 0  # unlimited
            else:
                count = 1

            if "i" in tail:
                flags = re.IGNORECASE
            else:
                flags = re.RegexFlag(0)

            return lambda s: re.sub(re.compile(pat, flags=flags), repl, s, count=count)

        def comma_separated_list(values: str) -> list[str]:
            return values.split(",")

        def threshold(n: str) -> float:
            f = float(n)
            if 0 <= f <= 20:
                return f
            raise ValueError

        # Add a bool argument that defaults to true.
        #
        # Supports these forms: `--foo`, `--no-foo`, `--foo=yes`, `--foo=no`.
        # The latter two were the forms supported by the old Perl version of
        # `cg_annotate`, and are now deprecated.
        def add_bool_argument(
            p: ArgumentParser, new_name: str, old_name: str, help_: str
        ) -> None:
            new_flag = "--" + new_name
            old_flag = "--" + old_name
            dest = new_name.replace("-", "_")

            # Note: the default value is always printed with `BooleanOptionalAction`,
            # due to an argparse bug: https://github.com/python/cpython/issues/83137.
            p.add_argument(
                new_flag,
                default=True,
                action=BooleanOptionalAction,
                help=help_,
            )
            p.add_argument(
                f"{old_flag}=yes",
                dest=dest,
                action="store_true",
                help=f"(deprecated) same as --{new_name}",
            )
            p.add_argument(
                f"{old_flag}=no",
                dest=dest,
                action="store_false",
                help=f"(deprecated) same as --no-{new_name}",
            )

        p = ArgumentParser(description="Process one or more Cachegrind output files.")

        p.add_argument("--version", action="version", version="%(prog)s-3.22.0")
        p.add_argument(
            "--diff",
            default=False,
            action="store_true",
            help="perform a diff between two Cachegrind output files",
        )
        p.add_argument(
            "--mod-filename",
            type=search_and_replace,
            metavar="REGEX",
            default=search_and_replace(None),
            help="a search-and-replace regex applied to filenames, e.g. "
            "`s/prog[0-9]/progN/`",
        )
        p.add_argument(
            "--mod-funcname",
            type=search_and_replace,
            metavar="REGEX",
            default=search_and_replace(None),
            help="like --mod-filename, but for function names",
        )
        p.add_argument(
            "--show",
            type=comma_separated_list,
            metavar="A,B,C",
            help="only show figures for events A,B,C (default: all events)",
        )
        p.add_argument(
            "--sort",
            type=comma_separated_list,
            metavar="A,B,C",
            help="sort functions by events A,B,C (default: event column order)",
        )
        p.add_argument(
            "--threshold",
            type=threshold,
            default=0.1,
            metavar="N:[0,20]",
            help="only show file:function/function:file pairs with more than "
            "N%% of primary sort event counts (default: %(default)s)",
        )
        add_bool_argument(
            p,
            "show-percs",
            "show-percs",
            "show a percentage for each non-zero count",
        )
        add_bool_argument(
            p,
            "annotate",
            "auto",
            "annotate all source files containing functions that reached the "
            "event count threshold",
        )
        p.add_argument(
            "--context",
            type=int,
            default=8,
            metavar="N",
            help="print N lines of context before and after annotated lines "
            "(default: %(default)s)",
        )
        p.add_argument(
            "cgout_filename",
            nargs="+",
            metavar="cachegrind-out-file",
            help="file produced by Cachegrind",
        )

        # `args0` name used to avoid shadowing the global `args`, which pylint
        # doesn't like.
        args0 = p.parse_args(namespace=Args())
        if args0.diff and len(args0.cgout_filename) != 2:
            p.print_usage(file=sys.stderr)
            die("argument --diff: requires exactly two Cachegrind output files")

        return args0  # type: ignore [return-value]


# Args are stored in a global for easy access.
args = Args.parse()


# A single instance of this class is constructed, from `args` and the `events:`
# line in the cgout file.
class Events:
    # The event names.
    events: list[str]

    # Equal to `len(self.events)`.
    num_events: int

    # The order in which we must traverse events for --show. Can be shorter
    # than `events`.
    show_events: list[str]

    # Like `show_events`, but indices into `events`, rather than names.
    show_indices: list[int]

    # The order in which we must traverse events for --sort. Can be shorter
    # than `events`.
    sort_events: list[str]

    # Like `sort_events`, but indices into `events`, rather than names.
    sort_indices: list[int]

    def __init__(self) -> None:
        # All fields are left uninitialized here, and set instead in `init`.
        pass

    def init(self, text: str) -> None:
        self.events = text.split()
        self.num_events = len(self.events)

        # A temporary dict mapping events to indices, [0, n-1].
        event_indices = {event: n for n, event in enumerate(self.events)}

        # If --show is given, check it is valid. If --show is not given,
        # default to all events in the standard order.
        if args.show:
            for event in args.show:
                if event not in event_indices:
                    die(f"--show event `{event}` did not appear in `events:` line")
            self.show_events = args.show
        else:
            self.show_events = self.events

        self.show_indices = [event_indices[event] for event in self.show_events]

        # Likewise for --sort.
        if args.sort:
            for event in args.sort:
                if event not in event_indices:
                    die(f"--sort event `{event}` did not appear in `events:` line")
            self.sort_events = args.sort
        else:
            self.sort_events = self.events

        self.sort_indices = [event_indices[event] for event in self.sort_events]

    # Raises a `ValueError` exception on syntax error.
    def mk_cc(self, str_counts: list[str]) -> Cc:
        # This is slightly faster than a list comprehension.
        counts = list(map(int, str_counts))

        if len(counts) == self.num_events:
            pass
        elif len(counts) < self.num_events:
            # Add zeroes at the end for any missing numbers.
            counts.extend([0] * (self.num_events - len(counts)))
        else:
            raise ValueError

        return counts

    def mk_empty_cc(self) -> Cc:
        # This is much faster than a list comprehension.
        return [0] * self.num_events

    def mk_empty_dcc(self) -> Dcc:
        return Dcc(self.mk_empty_cc(), defaultdict(self.mk_empty_cc))


# A "cost centre", which is a dumb container for counts. Always the same length
# as `Events.events`, but it doesn't even know event names. `Events.mk_cc` and
# `Events.mk_empty_cc` are used for construction.
#
# This used to be a class with a single field `counts: list[int]`, but this
# type is very hot and just using a type alias is much faster.
Cc = list[int]


# Add the counts in `a_cc` to `b_cc`.
def add_cc_to_cc(a_cc: Cc, b_cc: Cc) -> None:
    for i, a_count in enumerate(a_cc):
        b_cc[i] += a_count


# Subtract the counts in `a_cc` from `b_cc`.
def sub_cc_from_cc(a_cc: Cc, b_cc: Cc) -> None:
    for i, a_count in enumerate(a_cc):
        b_cc[i] -= a_count


# Unrolled version of `add_cc_to_cc`, for speed.
def add_cc_to_ccs(
    a_cc: Cc, b_cc1: Cc, b_cc2: Cc, b_cc3: Cc, b_cc4: Cc, b_cc5: Cc, total_cc: Cc
) -> None:
    for i, a_count in enumerate(a_cc):
        b_cc1[i] += a_count
        b_cc2[i] += a_count
        b_cc3[i] += a_count
        b_cc4[i] += a_count
        b_cc5[i] += a_count
        total_cc[i] += a_count


# Unrolled version of `sub_cc_from_cc`, for speed. Note that the last one,
# `total_cc`, is added.
def sub_cc_from_ccs(
    a_cc: Cc, b_cc1: Cc, b_cc2: Cc, b_cc3: Cc, b_cc4: Cc, b_cc5: Cc, total_cc: Cc
) -> None:
    for i, a_count in enumerate(a_cc):
        b_cc1[i] -= a_count
        b_cc2[i] -= a_count
        b_cc3[i] -= a_count
        b_cc4[i] -= a_count
        b_cc5[i] -= a_count
        total_cc[i] += a_count


# Update `min_cc` and `max_cc` with `self`.
def update_cc_extremes(self: Cc, min_cc: Cc, max_cc: Cc) -> None:
    for i, count in enumerate(self):
        if count > max_cc[i]:
            max_cc[i] = count
        elif count < min_cc[i]:
            min_cc[i] = count


# Note: some abbrevations used below:
# - Ofl/ofl: original filename, as mentioned in a cgout file.
# - Ofn/ofn: original function name, as mentioned in a cgout file.
# - Mfl/mfl: modified filename, the result of passing an Ofl through
#   `--mod-filename`.
# - Mfn/mfn: modified function name, the result of passing an Ofn through
#   `--mod-funcname`.
# - Mname/mname: modified name, used for what could be an Mfl or an Mfn.


# A deep cost centre with a dict for the inner mnames and CCs.
class Dcc:
    outer_cc: Cc
    inner_dict_mname_cc: DictMnameCc

    def __init__(self, outer_cc: Cc, inner_dict_mname_cc: DictMnameCc) -> None:
        self.outer_cc = outer_cc
        self.inner_dict_mname_cc = inner_dict_mname_cc


# A deep cost centre with a list for the inner mnames and CCs. Used during
# filtering and sorting.
class Lcc:
    outer_cc: Cc
    inner_list_mname_cc: ListMnameCc

    def __init__(self, outer_cc: Cc, inner_list_mname_cc: ListMnameCc) -> None:
        self.outer_cc = outer_cc
        self.inner_list_mname_cc = inner_list_mname_cc


# Per-Mfl/Mfn CCs. The list version is used during filtering and sorting.
DictMnameCc = DefaultDict[str, Cc]
ListMnameCc = list[tuple[str, Cc]]

# Per-Mfl/Mfn DCCs. The outer Mnames are Mfls and the inner Mnames are Mfns, or
# vice versa. The list version is used during filtering and sorting.
DictMnameDcc = DefaultDict[str, Dcc]
ListMnameLcc = list[tuple[str, Lcc]]

# Per-line CCs, organised by Mfl and line number.
DictLineCc = DefaultDict[int, Cc]
DictMflDictLineCc = DefaultDict[str, DictLineCc]

# A dictionary tracking how Ofls get mapped to Mfls by `--mod-filename`. If
# `--mod-filename` isn't used, each entry will be the identity mapping: ("foo"
# -> set(["foo"])).
DictMflOfls = DefaultDict[str, set[str]]


def read_cgout_file(
    cgout_filename: str,
    is_first_file: bool,
    descs: list[str],
    cmds: list[str],
    events: Events,
    dict_mfl_ofls: DictMflOfls,
    dict_mfl_dcc: DictMnameDcc,
    dict_mfn_dcc: DictMnameDcc,
    dict_mfl_dict_line_cc: DictMflDictLineCc,
    summary_cc: Cc,
) -> None:
    # The file format is described in Cachegrind's manual.
    try:
        cgout_file = open(cgout_filename, "r", encoding="utf-8")
    except OSError as err:
        die(f"{err}")

    with cgout_file:
        cgout_line_num = 0

        def parse_die(msg: str) -> NoReturn:
            die(f"{cgout_file.name}:{cgout_line_num}: {msg}")

        def readline() -> str:
            nonlocal cgout_line_num
            cgout_line_num += 1
            return cgout_file.readline()

        # Read "desc:" lines.
        desc = ""
        while line := readline():
            if m := re.match(r"desc:\s+(.*)", line):
                desc += m.group(1) + "\n"
            else:
                break
        descs.append(desc)

        # Read "cmd:" line. (`line` is already set from the "desc:" loop.)
        if m := re.match(r"cmd:\s+(.*)", line):
            cmds.append(m.group(1))
        else:
            parse_die("missing a `command:` line")

        # Read "events:" line.
        line = readline()
        if m := re.match(r"events:\s+(.*)", line):
            if is_first_file:
                events.init(m.group(1))
            else:
                events2 = Events()
                events2.init(m.group(1))
                if events.events != events2.events:
                    die("events in data files don't match")
        else:
            parse_die("missing an `events:` line")

        def mk_empty_dict_line_cc() -> DictLineCc:
            return defaultdict(events.mk_empty_cc)

        # The current Mfl and Mfn.
        mfl = ""
        mfn = ""

        # These values are passed in by reference and are modified by this
        # function. But they can't be properly initialized until the `events:`
        # line of the first file is read and the number of events is known. So
        # we initialize them in an invalid state in `main`, and then
        # reinitialize them properly here, before their first use.
        if is_first_file:
            dict_mfl_dcc.default_factory = events.mk_empty_dcc
            dict_mfn_dcc.default_factory = events.mk_empty_dcc
            dict_mfl_dict_line_cc.default_factory = mk_empty_dict_line_cc
            summary_cc.extend(events.mk_empty_cc())

        # These are refs into the dicts above, used to avoid repeated lookups.
        # They are all overwritten before first use.
        mfl_dcc = events.mk_empty_dcc()
        mfn_dcc = events.mk_empty_dcc()
        mfl_dcc_inner_mfn_cc = events.mk_empty_cc()
        mfn_dcc_inner_mfl_cc = events.mk_empty_cc()
        dict_line_cc = mk_empty_dict_line_cc()
        total_cc = events.mk_empty_cc()

        # When diffing, we negate the first cgout file's counts to effectively
        # achieve `cgout2 - cgout1`.
        if args.diff and is_first_file:
            combine_cc_with_cc = sub_cc_from_cc
            combine_cc_with_ccs = sub_cc_from_ccs
        else:
            combine_cc_with_cc = add_cc_to_cc
            combine_cc_with_ccs = add_cc_to_ccs

        summary_cc_present = False

        # Line matching is done in order of pattern frequency, for speed.
        while line := readline():
            if line[0].isdigit():
                split_line = line.split()
                try:
                    line_num = int(split_line[0])
                    cc = events.mk_cc(split_line[1:])
                except ValueError:
                    parse_die("malformed or too many event counts")

                # Record this CC at various levels.
                combine_cc_with_ccs(
                    cc,
                    mfl_dcc.outer_cc,
                    mfn_dcc.outer_cc,
                    mfl_dcc_inner_mfn_cc,
                    mfn_dcc_inner_mfl_cc,
                    dict_line_cc[line_num],
                    total_cc,
                )

            elif line.startswith("fn="):
                ofn = line[3:-1]
                mfn = args.mod_funcname(ofn)
                # `mfl_dcc` is unchanged.
                mfn_dcc = dict_mfn_dcc[mfn]
                mfl_dcc_inner_mfn_cc = mfl_dcc.inner_dict_mname_cc[mfn]
                mfn_dcc_inner_mfl_cc = mfn_dcc.inner_dict_mname_cc[mfl]

            elif line.startswith("fl="):
                ofl = line[3:-1]
                mfl = args.mod_filename(ofl)
                dict_mfl_ofls[mfl].add(ofl)
                # A `fn=` line should follow, overwriting the function name.
                mfn = "<unspecified>"
                mfl_dcc = dict_mfl_dcc[mfl]
                mfn_dcc = dict_mfn_dcc[mfn]
                mfl_dcc_inner_mfn_cc = mfl_dcc.inner_dict_mname_cc[mfn]
                mfn_dcc_inner_mfl_cc = mfn_dcc.inner_dict_mname_cc[mfl]
                dict_line_cc = dict_mfl_dict_line_cc[mfl]

            elif m := re.match(r"summary:\s+(.*)", line):
                summary_cc_present = True
                try:
                    this_summary_cc = events.mk_cc(m.group(1).split())
                    combine_cc_with_cc(this_summary_cc, summary_cc)

                    # Check summary is correct. Note that `total_cc` doesn't
                    # get negated for the first file in a diff, unlike the
                    # other CCs, because it's only used here as a sanity check.
                    if this_summary_cc != total_cc:
                        msg = (
                            "`summary:` line doesn't match computed total\n"
                            f"- summary:  {this_summary_cc}\n"
                            f"- computed: {total_cc}"
                        )
                        parse_die(msg)

                except ValueError:
                    parse_die("malformed or too many event counts")

            elif line == "\n" or line.startswith("#"):
                # Skip empty lines and comment lines.
                pass

            else:
                parse_die(f"malformed line: {line[:-1]}")

    # Check if summary line was present.
    if not summary_cc_present:
        parse_die("missing `summary:` line, aborting")


# The width of a column, in three parts.
class Width:
    # Width of the widest commified event count.
    count: int

    # Width of the widest first percentage, of the form ` (n.n%)` or ` (n.n%,`.
    perc1: int

    # Width of the widest second percentage, of the form ` n.n%)`.
    perc2: int

    def __init__(self, count: int, perc1: int, perc2: int) -> None:
        self.count = count
        self.perc1 = perc1
        self.perc2 = perc2


class CcPrinter:
    # Note: every `CcPrinter` gets the same `Events` object.
    events: Events

    # Note: every `CcPrinter` gets the same summary CC.
    summary_cc: Cc

    # String to print before the event names.
    events_prefix: str

    # The widths of each event column. For simplicity, its length matches
    # `events.events`, even though not all events are necessarily shown.
    widths: list[Width]

    # Text of a missing CC, which can be computed in advance.
    missing_cc_str: str

    # Must call `init_ccs` or `init_list_mname_lcc` after this.
    def __init__(self, events: Events, summary_cc: Cc) -> None:
        self.events = events
        self.summary_cc = summary_cc
        # Other fields initialized in `init_*`.

    def init_ccs(self, ccs: list[Cc]) -> None:
        self.events_prefix = ""

        # Find min and max count for each event. One of them will be the widest
        # value.
        min_cc = self.events.mk_empty_cc()
        max_cc = self.events.mk_empty_cc()
        for cc in ccs:
            update_cc_extremes(cc, min_cc, max_cc)

        self.init_widths(min_cc, max_cc, None, None)

    def init_list_mname_lcc(self, list_mname_lcc: ListMnameLcc) -> None:
        self.events_prefix = "  "

        cumul_cc = self.events.mk_empty_cc()

        # Find min and max value for each event. One of them will be the widest
        # value. Likewise for the cumulative counts.
        min_cc = self.events.mk_empty_cc()
        max_cc = self.events.mk_empty_cc()
        min_cumul_cc = self.events.mk_empty_cc()
        max_cumul_cc = self.events.mk_empty_cc()
        for _, lcc in list_mname_lcc:
            # Consider both outer and inner CCs for `count` and `perc1`.
            update_cc_extremes(lcc.outer_cc, min_cc, max_cc)
            for _, inner_cc in lcc.inner_list_mname_cc:
                update_cc_extremes(inner_cc, min_cc, max_cc)

            # Consider only outer CCs for `perc2`.
            add_cc_to_cc(lcc.outer_cc, cumul_cc)
            update_cc_extremes(cumul_cc, min_cumul_cc, max_cumul_cc)

        self.init_widths(min_cc, max_cc, min_cumul_cc, max_cumul_cc)

    def init_widths(
        self, min_cc1: Cc, max_cc1: Cc, min_cc2: Cc | None, max_cc2: Cc | None
    ) -> None:
        self.widths = [Width(0, 0, 0)] * self.events.num_events
        for i in range(len(self.events.events)):
            # Get count and percs widths of the min and max CCs.
            (min_count, min_perc1, min_perc2) = self.count_and_percs_strs(
                min_cc1, min_cc2, i
            )
            (max_count, max_perc1, max_perc2) = self.count_and_percs_strs(
                max_cc1, max_cc2, i
            )
            self.widths[i] = Width(
                max(len(min_count), len(max_count)),
                max(len(min_perc1), len(max_perc1)),
                max(len(min_perc2), len(max_perc2)),
            )

        self.missing_cc_str = ""
        for i in self.events.show_indices:
            self.missing_cc_str += self.count_and_percs_str(i, ".", "", "")

    # Get the count and perc string for `cc1[i]` and the perc string for
    # `cc2[i]`. (Unless `cc2` is `None`, in which case `perc2` will be "".)
    def count_and_percs_strs(
        self, cc1: Cc, cc2: Cc | None, i: int
    ) -> tuple[str, str, str]:
        count = f"{cc1[i]:,d}"  # commify
        if args.show_percs:
            summary_count = self.summary_cc[i]
            if cc2 is None:
                # A plain or inner CC, with a single percentage.
                if cc1[i] == 0:
                    # Don't show percentages for "0" entries, it's just clutter.
                    perc1 = ""
                elif summary_count == 0:
                    # Avoid dividing by zero.
                    perc1 = " (n/a)"
                else:
                    perc1 = f" ({cc1[i] * 100 / summary_count:.1f}%)"
                perc2 = ""
            else:
                # An outer CC, with two percentages.
                if summary_count == 0:
                    # Avoid dividing by zero.
                    perc1 = " (n/a,"
                    perc2 = " n/a)"
                else:
                    perc1 = f" ({cc1[i] * 100 / summary_count:.1f}%,"
                    perc2 = f" {cc2[i] * 100 / summary_count:.1f}%)"
        else:
            perc1 = ""
            perc2 = ""

        return (count, perc1, perc2)

    def count_and_percs_str(self, i: int, count: str, perc1: str, perc2: str) -> str:
        event_w = len(self.events.events[i])
        count_w = self.widths[i].count
        perc1_w = self.widths[i].perc1
        perc2_w = self.widths[i].perc2
        pre_w = max(0, event_w - count_w - perc1_w - perc2_w)
        return f"{'':>{pre_w}}{count:>{count_w}}{perc1:>{perc1_w}}{perc2:>{perc2_w}} "

    def print_events(self, suffix: str) -> None:
        print(self.events_prefix, end="")
        for i in self.events.show_indices:
            event = self.events.events[i]
            event_w = len(event)
            count_w = self.widths[i].count
            perc1_w = self.widths[i].perc1
            perc2_w = self.widths[i].perc2
            print(f"{event:_<{max(event_w, count_w + perc1_w + perc2_w)}} ", end="")

        print(suffix)

    def print_lcc(self, indent: str, lcc: Lcc, outer_mname: str, cumul_cc: Cc) -> None:
        print(indent, end="")
        if (
            len(lcc.inner_list_mname_cc) == 1
            and lcc.outer_cc == lcc.inner_list_mname_cc[0][1]
        ):
            # There is only one inner CC, it met the threshold, and it is equal
            # to the outer CC. Print the inner CC and outer CC in a single
            # line, because they are the same.
            inner_mname = lcc.inner_list_mname_cc[0][0]
            self.print_cc(lcc.outer_cc, cumul_cc, f"{outer_mname}:{inner_mname}")
        else:
            # There are multiple inner CCs, and at least one met the threshold.
            # Print the outer CC and then the inner CCs, indented.
            self.print_cc(lcc.outer_cc, cumul_cc, f"{outer_mname}:")
            for inner_mname, inner_cc in lcc.inner_list_mname_cc:
                print("  ", end="")
                self.print_cc(inner_cc, None, f"  {inner_mname}")
        print()

    # If `cc2` is `None`, it's a vanilla CC or inner CC. Otherwise, it's an
    # outer CC.
    def print_cc(self, cc: Cc, cc2: Cc | None, suffix: str) -> None:
        for i in self.events.show_indices:
            (count, perc1, perc2) = self.count_and_percs_strs(cc, cc2, i)
            print(self.count_and_percs_str(i, count, perc1, perc2), end="")

        print("", suffix)

    def print_missing_cc(self, suffix: str) -> None:
        print(self.missing_cc_str, suffix)


# Used in various places in the output.
def print_fancy(text: str) -> None:
    fancy = "-" * 80
    print(fancy)
    print("--", text)
    print(fancy)


def print_metadata(descs: list[str], cmds: list[str], events: Events) -> None:
    print_fancy("Metadata")

    def all_the_same(strs: list[str]) -> bool:
        for i in range(len(strs) - 1):
            if strs[i] != strs[i + 1]:
                return False

        return True

    print("Invocation:      ", *sys.argv)

    # When there are multiple descriptions, they are usually all the same. Only
    # print the description once in that case.
    if all_the_same(descs):
        print(descs[0], end="")
    else:
        for i, desc in enumerate(descs):
            print(f"Description {i+1}:")
            print(desc, end="")

    # Commands are sometimes the same, sometimes not. Always print them
    # individually, but refer to the previous one when appropriate.
    if len(cmds) == 1:
        print("Command:         ", cmds[0])
    else:
        for i, cmd in enumerate(cmds):
            if i > 0 and cmds[i - 1] == cmd:
                print(f"Command {i+1}:        (same as Command {i})")
            else:
                print(f"Command {i+1}:       ", cmd)

    print("Events recorded: ", *events.events)
    print("Events shown:    ", *events.show_events)
    print("Event sort order:", *events.sort_events)
    print("Threshold:        ", args.threshold, "%", sep="")
    print("Annotation:      ", "on" if args.annotate else "off")
    print()


def print_summary(events: Events, summary_cc: Cc) -> None:
    printer = CcPrinter(events, summary_cc)
    printer.init_ccs([summary_cc])
    print_fancy("Summary")
    printer.print_events("")
    print()
    printer.print_cc(summary_cc, None, "PROGRAM TOTALS")
    print()


def print_mname_summary(
    kind: str, indent: str, events: Events, dict_mname_dcc: DictMnameDcc, summary_cc: Cc
) -> set[str]:
    # The primary sort event is used for the threshold.
    threshold_index = events.sort_indices[0]

    # Convert the threshold from a percentage to an event count.
    threshold = args.threshold * abs(summary_cc[threshold_index]) / 100

    def meets_threshold(mname_and_cc: tuple[str, Cc]) -> bool:
        cc = mname_and_cc[1]
        return abs(cc[threshold_index]) >= threshold

    # Create a list with the outer CC counts in sort order, so that
    # left-to-right list comparison does the right thing. Plus the outer name
    # at the end for deterministic output when all the event counts are
    # identical in two CCs.
    def key_mname_and_lcc(mname_and_lcc: tuple[str, Lcc]) -> tuple[list[int], str]:
        (outer_mname, lcc) = mname_and_lcc
        return (
            [abs(lcc.outer_cc[i]) for i in events.sort_indices],
            outer_mname,
        )

    # Similar to `key_mname_and_lcc`.
    def key_mname_and_cc(mname_and_cc: tuple[str, Cc]) -> tuple[list[int], str]:
        (mname, cc) = mname_and_cc
        return ([abs(cc[i]) for i in events.sort_indices], mname)

    # This is a `filter_map` operation, which Python doesn't directly support.
    list_mname_lcc: ListMnameLcc = []
    for outer_mname, dcc in dict_mname_dcc.items():
        # Filter out inner CCs for which the primary sort event count is below the
        # threshold, and sort the remainder.
        inner_list_mname_cc = sorted(
            filter(meets_threshold, dcc.inner_dict_mname_cc.items()),
            key=key_mname_and_cc,
            reverse=True,
        )

        # If no inner CCs meet the threshold, ignore the entire DCC, even if
        # the outer CC meets the threshold.
        if len(inner_list_mname_cc) == 0:
            continue

        list_mname_lcc.append((outer_mname, Lcc(dcc.outer_cc, inner_list_mname_cc)))

    list_mname_lcc = sorted(list_mname_lcc, key=key_mname_and_lcc, reverse=True)

    printer = CcPrinter(events, summary_cc)
    printer.init_list_mname_lcc(list_mname_lcc)
    print_fancy(kind + " summary")
    printer.print_events(" " + kind.lower())
    print()

    # Print LCCs.
    threshold_mnames: set[str] = set([])
    cumul_cc = events.mk_empty_cc()
    for mname, lcc in list_mname_lcc:
        add_cc_to_cc(lcc.outer_cc, cumul_cc)
        printer.print_lcc(indent, lcc, mname, cumul_cc)
        threshold_mnames.add(mname)

    return threshold_mnames


class AnnotatedCcs:
    line_nums_known_cc: Cc
    line_nums_unknown_cc: Cc
    non_identical_cc: Cc
    unreadable_cc: Cc
    below_threshold_cc: Cc
    files_unknown_cc: Cc

    labels = [
        "  annotated: files known & above threshold & readable, line numbers known",
        "  annotated: files known & above threshold & readable, line numbers unknown",
        "unannotated: files known & above threshold & two or more non-identical",
        "unannotated: files known & above threshold & unreadable ",
        "unannotated: files known & below threshold",
        "unannotated: files unknown",
    ]

    def __init__(self, events: Events) -> None:
        self.line_nums_known_cc = events.mk_empty_cc()
        self.line_nums_unknown_cc = events.mk_empty_cc()
        self.non_identical_cc = events.mk_empty_cc()
        self.unreadable_cc = events.mk_empty_cc()
        self.below_threshold_cc = events.mk_empty_cc()
        self.files_unknown_cc = events.mk_empty_cc()

    def ccs(self) -> list[Cc]:
        return [
            self.line_nums_known_cc,
            self.line_nums_unknown_cc,
            self.non_identical_cc,
            self.unreadable_cc,
            self.below_threshold_cc,
            self.files_unknown_cc,
        ]


def mk_warning(msg: str) -> str:
    return f"""\
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@ WARNING @@ WARNING @@ WARNING @@ WARNING @@ WARNING @@ WARNING @@ WARNING @@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
{msg}\
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
"""


def warn_ofls_are_all_newer(ofls: list[str], cgout_filename: str) -> None:
    s = "".join([f"@ - {ofl}\n" for ofl in ofls])
    msg = f"""\
@ Original source files are all newer than data file '{cgout_filename}':
{s}@ Annotations may not be correct.
"""
    print(mk_warning(msg))


def warn_bogus_lines(src_filename: str) -> None:
    msg = f"""\
@@ Information recorded about lines past the end of '{src_filename}'.
"""
    print(mk_warning(msg), end="")


def print_annotated_src_file(
    events: Events,
    dict_line_cc: DictLineCc,
    src_file: TextIO,
    annotated_ccs: AnnotatedCcs,
    summary_cc: Cc,
) -> None:
    printer = CcPrinter(events, summary_cc)
    printer.init_ccs(list(dict_line_cc.values()))
    # The starting fancy has already been printed by the caller.
    printer.print_events("")
    print()

    # The CC for line 0 is special, holding counts attributed to the source
    # file but not to any particular line (due to incomplete debug info).
    # Annotate the start of the file with this info, if present.
    line0_cc = dict_line_cc.pop(0, None)
    if line0_cc:
        suffix = "<unknown (line 0)>"
        printer.print_cc(line0_cc, None, suffix)
        add_cc_to_cc(line0_cc, annotated_ccs.line_nums_unknown_cc)
        print()

    # Find interesting line ranges: all lines with a CC, and all lines within
    # `args.context` lines of a line with a CC.
    line_nums = list(sorted(dict_line_cc.keys()))
    pairs: list[tuple[int, int]] = []
    n = len(line_nums)
    i = 0
    context = args.context
    while i < n:
        lo = max(line_nums[i] - context, 1)  # `max` to prevent negatives
        while i < n - 1 and line_nums[i] + 2 * context >= line_nums[i + 1]:
            i += 1
        hi = line_nums[i] + context
        pairs.append((lo, hi))
        i += 1

    def print_lines(pairs: list[tuple[int, int]]) -> None:
        line_num = 0
        while pairs:
            src_line = ""
            (lo, hi) = pairs.pop(0)
            while line_num < lo - 1:
                src_line = src_file.readline()
                line_num += 1
                if not src_line:
                    return  # EOF

            # Print line number, unless start of file.
            if lo != 1:
                print("-- line", lo, "-" * 40)

            while line_num < hi:
                src_line = src_file.readline()
                line_num += 1
                if not src_line:
                    return  # EOF
                if line_nums and line_num == line_nums[0]:
                    printer.print_cc(dict_line_cc[line_num], None, src_line[:-1])
                    add_cc_to_cc(
                        dict_line_cc[line_num], annotated_ccs.line_nums_known_cc
                    )
                    del line_nums[0]
                else:
                    printer.print_missing_cc(src_line[:-1])

            # Print line number.
            print("-- line", hi, "-" * 40)

    # Annotate chosen lines, tracking total annotated counts.
    if pairs:
        print_lines(pairs)

        # If there was info on lines past the end of the file, warn.
        if line_nums:
            print()
            for line_num in line_nums:
                printer.print_cc(
                    dict_line_cc[line_num], None, f"<bogus line {line_num}>"
                )
                add_cc_to_cc(dict_line_cc[line_num], annotated_ccs.line_nums_known_cc)

            print()
            warn_bogus_lines(src_file.name)

        print()


# This partially consumes `dict_mfl_dict_line_cc`, and fully consumes
# `dict_mfl_olfs`.
def print_annotated_src_files(
    ann_mfls: set[str],
    events: Events,
    dict_mfl_ofls: DictMflOfls,
    dict_mfl_dict_line_cc: DictMflDictLineCc,
    summary_cc: Cc,
) -> AnnotatedCcs:
    annotated_ccs = AnnotatedCcs(events)

    def add_dict_line_cc_to_cc(dict_line_cc: DictLineCc, accum_cc: Cc) -> None:
        for line_cc in dict_line_cc.values():
            add_cc_to_cc(line_cc, accum_cc)

    # Exclude the unknown ("???") file, which is unannotatable.
    ann_mfls.discard("???")
    if "???" in dict_mfl_dict_line_cc:
        dict_line_cc = dict_mfl_dict_line_cc.pop("???")
        add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.files_unknown_cc)

    def print_ann_fancy(mfl: str) -> None:
        print_fancy(f"Annotated source file: {mfl}")

    # This can raise an `OSError`.
    def all_ofl_contents_identical(ofls: list[str]) -> bool:
        for i in range(len(ofls) - 1):
            if not filecmp.cmp(ofls[i], ofls[i + 1], shallow=False):
                return False

        return True

    for mfl in sorted(ann_mfls):
        ofls = sorted(dict_mfl_ofls.pop(mfl))
        first_ofl = ofls[0]

        try:
            if all_ofl_contents_identical(ofls):
                # All the Ofls that map to this Mfl are identical, which means we
                # can annotate, and it doesn't matter which Ofl we use.
                with open(first_ofl, "r", encoding="utf-8") as src_file:
                    dict_line_cc = dict_mfl_dict_line_cc.pop(mfl)
                    print_ann_fancy(mfl)

                    # Because all the Ofls are identical, we can treat their
                    # mtimes as if they are all as early as the earliest one.
                    # Therefore, we warn only if the earliest source file is
                    # more recent than the cgout file.
                    min_ofl_st_mtime_ns = min(os.stat(ofl).st_mtime_ns for ofl in ofls)

                    for cgout_filename in args.cgout_filename:
                        if min_ofl_st_mtime_ns > os.stat(cgout_filename).st_mtime_ns:
                            warn_ofls_are_all_newer(ofls, cgout_filename)

                    print_annotated_src_file(
                        events,
                        dict_line_cc,
                        src_file,
                        annotated_ccs,
                        summary_cc,
                    )
            else:
                dict_line_cc = dict_mfl_dict_line_cc.pop(mfl)
                add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.non_identical_cc)

                # We could potentially do better here.
                # - Annotate until the first line where the src files diverge.
                # - Also, heuristic resyncing, e.g. by looking for matching
                #   lines (of sufficient complexity) after a divergence.
                print_ann_fancy(mfl)
                print(
                    "Unannotated because two or more of these original files are not "
                    "identical:",
                    *ofls,
                    sep="\n- ",
                )
                print()
        except OSError:
            dict_line_cc = dict_mfl_dict_line_cc.pop(mfl)
            add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.unreadable_cc)

            print_ann_fancy(mfl)
            print(
                "Unannotated because one or more of these original files are "
                "unreadable:",
                *ofls,
                sep="\n- ",
            )
            print()

    # Sum the CCs remaining in `dict_mfl_dict_line_cc`, which are all in files
    # below the threshold.
    for dict_line_cc in dict_mfl_dict_line_cc.values():
        add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.below_threshold_cc)

    return annotated_ccs


def print_annotation_summary(
    events: Events,
    annotated_ccs: AnnotatedCcs,
    summary_cc: Cc,
) -> None:
    # Show how many events were covered by annotated lines above.
    printer = CcPrinter(events, summary_cc)
    printer.init_ccs(annotated_ccs.ccs())
    print_fancy("Annotation summary")
    printer.print_events("")
    print()

    total_cc = events.mk_empty_cc()
    for cc, label in zip(annotated_ccs.ccs(), AnnotatedCcs.labels):
        printer.print_cc(cc, None, label)
        add_cc_to_cc(cc, total_cc)

    print()

    # Internal sanity check.
    if summary_cc != total_cc:
        msg = (
            "`summary:` line doesn't match computed annotated counts\n"
            f"- summary:   {summary_cc}\n"
            f"- annotated: {total_cc}"
        )
        die(msg)


def main() -> None:
    # Metadata, initialized to empty states.
    descs: list[str] = []
    cmds: list[str] = []
    events = Events()

    # For tracking original filenames to modified filenames.
    dict_mfl_ofls: DictMflOfls = defaultdict(set)

    # Different places where we accumulate CC data. Initialized to invalid
    # states prior to the number of events being known.
    dict_mfl_dcc: DictMnameDcc = defaultdict(None)
    dict_mfn_dcc: DictMnameDcc = defaultdict(None)
    dict_mfl_dict_line_cc: DictMflDictLineCc = defaultdict(None)
    summary_cc: Cc = []

    for n, filename in enumerate(args.cgout_filename):
        is_first_file = n == 0
        read_cgout_file(
            filename,
            is_first_file,
            descs,
            cmds,
            events,
            dict_mfl_ofls,
            dict_mfl_dcc,
            dict_mfn_dcc,
            dict_mfl_dict_line_cc,
            summary_cc,
        )

    # Each of the following calls prints a section of the output.
    print_metadata(descs, cmds, events)
    print_summary(events, summary_cc)
    ann_mfls = print_mname_summary(
        "File:function", "< ", events, dict_mfl_dcc, summary_cc
    )
    print_mname_summary("Function:file", "> ", events, dict_mfn_dcc, summary_cc)
    if args.annotate:
        annotated_ccs = print_annotated_src_files(
            ann_mfls, events, dict_mfl_ofls, dict_mfl_dict_line_cc, summary_cc
        )

        print_annotation_summary(events, annotated_ccs, summary_cc)


if __name__ == "__main__":
    main()

Stv3n404 - 2023