| # Parse log profiling data and produce a graph showing encode times in time bins, |
| # with a breakdown per message type. |
| |
| import argparse |
| import csv |
| import math |
| import os |
| import webbrowser |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple |
| |
| import numpy as np |
| from bokeh.io import output_notebook |
| from bokeh.models import ColumnDataSource, HoverTool, Legend, LegendItem |
| from bokeh.palettes import Category20, Viridis256 |
| from bokeh.plotting import figure, show, output_file |
| from tabulate import tabulate |
| |
| |
| @dataclass |
| class SeriesDetail: |
| event_loop_times_s: List[float] |
| encode_times_ms: List[float] |
| |
| |
| def parse_csv_file(filepath: Path, max_lines: Optional[int], |
| start_time: Optional[float], |
| end_time: Optional[float]) -> Dict[str, SeriesDetail]: |
| """Parses the CSV file to extract needed data, respecting the maximum number of lines if provided.""" |
| data_by_type: Dict[str, SeriesDetail] = {} |
| |
| with open(filepath, 'r') as file: |
| reader = csv.reader(file) |
| next(reader) # Skip the header line |
| |
| line_count = 0 |
| for line in reader: |
| if max_lines is not None and line_count >= max_lines: |
| break |
| |
| line_count += 1 |
| |
| assert len(line) > 0 |
| |
| # Note that channel_name and encoding_start_time_ns are not yet used. |
| # They may be used here in the future, but for now they are helpful when |
| # directly looking at the csv file. |
| channel_name, channel_type, encode_duration_ns, encoding_start_time_ns, message_time_s = line |
| |
| # Convert nanoseconds to milliseconds |
| encode_duration_ms = float(encode_duration_ns) * 1e-6 |
| message_time_s = float(message_time_s) |
| |
| if (start_time is not None and message_time_s < start_time): |
| continue |
| if (end_time is not None and message_time_s > end_time): |
| continue |
| |
| if channel_type in data_by_type: |
| data_by_type[channel_type].encode_times_ms.append( |
| encode_duration_ms) |
| data_by_type[channel_type].event_loop_times_s.append( |
| message_time_s) |
| else: |
| data_by_type[channel_type] = SeriesDetail( |
| encode_times_ms=[encode_duration_ms], |
| event_loop_times_s=[message_time_s]) |
| |
| return data_by_type |
| |
| |
| @dataclass |
| class DataBin: |
| bin_range: str |
| message_encode_times: Dict[str, float] |
| |
| |
| @dataclass |
| class BinnedData: |
| bins: List[DataBin] = field(default_factory=list) |
| top_type_names: List[str] = field(default_factory=list) |
| |
| |
| def create_binned_data(data_by_type: Dict[str, SeriesDetail], |
| num_bins: int = 25, |
| top_n: int = 5) -> BinnedData: |
| # Calculate total encoding times for each message type across the entire file. |
| total_encode_times: Dict[str, float] = { |
| message_type: sum(detail.encode_times_ms) |
| for message_type, detail in data_by_type.items() |
| } |
| |
| # Determine the top N message types based on total encoding times. |
| top_types: List[Tuple[str, float]] = sorted(total_encode_times.items(), |
| key=lambda item: item[1], |
| reverse=True)[:top_n] |
| print(f"{top_types=}") |
| top_type_names: List[str] = [type_name for type_name, _ in top_types] |
| |
| # Find the global minimum and maximum times to establish bin edges. |
| min_time: float = min(detail.event_loop_times_s[0] |
| for detail in data_by_type.values()) |
| max_time: float = max(detail.event_loop_times_s[-1] |
| for detail in data_by_type.values()) |
| |
| # Create bins. |
| bins = np.linspace(min_time, max_time, num_bins + 1) |
| |
| # Initialize the list of DataBin instances with the correct number of bins. |
| binned_data = BinnedData(top_type_names=top_type_names) |
| for i in range(num_bins): |
| bin_range = f"{bins[i]:.2f} - {bins[i+1]:.2f}" |
| binned_data.bins.append( |
| DataBin(bin_range=bin_range, |
| message_encode_times={ |
| name: 0 |
| for name in top_type_names + ['other'] |
| })) |
| |
| # Populate binned_data with message encode times. |
| for message_type, details in data_by_type.items(): |
| binned_indices = np.digitize(details.event_loop_times_s, bins) - 1 |
| # Correcting the bin indices that are out of range by being exactly on the maximum edge. |
| binned_indices = np.minimum(binned_indices, num_bins - 1) |
| |
| for idx, encode_time in enumerate(details.encode_times_ms): |
| bin_index = binned_indices[idx] |
| current_bin = binned_data.bins[bin_index] |
| if message_type in top_type_names: |
| current_bin.message_encode_times[message_type] += encode_time |
| else: |
| current_bin.message_encode_times['other'] += encode_time |
| |
| return binned_data |
| |
| |
| def print_binned_data(binned_data: BinnedData) -> None: |
| # Extend headers for the table by replacing '.' with '\n.' for each message type name and |
| # adding 'Total'. |
| headers = ['Bin Range'] + [ |
| key.replace('.', '\n.') |
| for key in binned_data.top_type_names + ['other'] |
| ] + ['Total'] |
| |
| # Initialize the table data list. |
| table_data = [] |
| |
| # Populate the table data with the values from each DataBin instance and calculate totals. |
| for data_bin in binned_data.bins: |
| # Initialize a row with the bin range. |
| row = [data_bin.bin_range] |
| # Add the total message encode times for each message type. |
| encode_times = [ |
| data_bin.message_encode_times[message_type] |
| for message_type in binned_data.top_type_names |
| ] |
| other_time = data_bin.message_encode_times['other'] |
| row += encode_times + [other_time] |
| # Calculate the total encode time for the row and append it. |
| total_encode_time = sum(encode_times) + other_time |
| row.append(total_encode_time) |
| # Append the row to the table data. |
| table_data.append(row) |
| |
| # Print the table using tabulate with 'grid' format for better structure. |
| print( |
| tabulate(table_data, headers=headers, tablefmt='grid', floatfmt=".2f")) |
| |
| |
| def plot_bar(binned_data: BinnedData) -> None: |
| filename = "plot.html" |
| output_file(filename, title="Message Encode Time Plot Stacked Bar Graph") |
| |
| # Adjust width based on bin count for readability. |
| plot_width = max(1200, 50 * len(binned_data.bins)) |
| |
| p = figure(x_range=[bin.bin_range for bin in binned_data.bins], |
| title='Message Encode Time by Type over Event Loop Time', |
| x_axis_label='Event Loop Time Bins', |
| y_axis_label='Total Message Encode Time (ms)', |
| width=plot_width, |
| height=600, |
| tools="") |
| |
| source_data = {'bin_edges': [bin.bin_range for bin in binned_data.bins]} |
| for message_type in binned_data.top_type_names + ['other']: |
| source_data[message_type] = [ |
| bin.message_encode_times.get(message_type, 0) |
| for bin in binned_data.bins |
| ] |
| |
| source = ColumnDataSource(data=source_data) |
| |
| # Calculate totals and sort in descending order. |
| totals = { |
| message_type: sum(source_data[message_type]) |
| for message_type in source_data if message_type != 'bin_edges' |
| } |
| sorted_message_types = sorted(totals, key=totals.get, reverse=True) |
| |
| # Reverse the list to place larger segments at the top. |
| sorted_message_types.reverse() |
| |
| num_types = len(sorted_message_types) |
| if num_types > 20: |
| raise ValueError( |
| f"Number of types ({num_types}) exceeds the number of available colors in Category20." |
| ) |
| colors = Category20[20][:num_types] |
| |
| # Apply reversed order to stack rendering. |
| renderers = p.vbar_stack(sorted_message_types, |
| x='bin_edges', |
| width=0.7, |
| color=colors, |
| source=source) |
| |
| # Change the orientation of the x-axis labels to a 45-degree angle. |
| p.xaxis.major_label_orientation = math.pi / 4 |
| |
| # Create a custom legend, maintaining the reversed order for visual consistency. |
| legend_items = [ |
| LegendItem(label=mt.replace('.', ' '), renderers=[renderers[i]]) |
| for i, mt in enumerate(sorted_message_types) |
| ] |
| legend = Legend(items=legend_items, location=(0, -30)) |
| p.add_layout(legend, 'right') |
| |
| p.y_range.start = 0 |
| p.x_range.range_padding = 0.05 |
| p.xgrid.grid_line_color = None |
| p.axis.minor_tick_line_color = None |
| p.outline_line_color = None |
| |
| file_path = os.path.realpath(filename) |
| print('\n') |
| print(f"Plot saved to '{file_path}'") |
| webbrowser.open('file://' + file_path) |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser( |
| description= |
| 'Process log files to extract and plot message encode times.') |
| parser.add_argument('--log_file_path', |
| type=Path, |
| help='Path to the log file', |
| required=True) |
| parser.add_argument( |
| '--max_lines', |
| type=int, |
| default=None, |
| help='Maximum number of lines to read from the log file') |
| parser.add_argument('--num_bins', |
| type=int, |
| default=40, |
| help='Number of bins to use') |
| parser.add_argument('--top_n', |
| type=int, |
| default=10, |
| help='Number of top message types to plot') |
| parser.add_argument('--start_time', |
| type=float, |
| default=None, |
| help='Start time in seconds') |
| parser.add_argument('--end_time', |
| type=float, |
| default=None, |
| help='End time in seconds') |
| |
| args = parser.parse_args() |
| |
| data_by_type = parse_csv_file(filepath=args.log_file_path, |
| max_lines=args.max_lines, |
| start_time=args.start_time, |
| end_time=args.end_time) |
| binned_data = create_binned_data(data_by_type, |
| num_bins=args.num_bins, |
| top_n=args.top_n) |
| print_binned_data(binned_data) |
| plot_bar(binned_data) |
| print(f"{os.path.basename(__file__)} Finished.") |
| |
| |
| if __name__ == '__main__': |
| main() |