Source code for mtap.pipeline._results

# Copyright 2023 Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from dataclasses import field, dataclass
from datetime import timedelta
from typing import Callable, Any, NamedTuple, List, overload

from mtap._event import Event
from mtap.processing.results import (
    add_times,
    create_timer_stats,
    AggregateTimingInfo,
    ComponentResult,
    TimesMap,
)

PipelineCallback = Callable[[Event, 'PipelineResult'], Any]


[docs] class PipelineResult(NamedTuple): """The result of processing an event or document in a pipeline.""" component_results: List[ComponentResult] """The processing results for each individual component""" elapsed_time: timedelta """The elapsed time for the entire pipeline."""
[docs] def component_result(self, identifier: str) -> ComponentResult: """Returns the component result for a specific identifier. Args: identifier: The processor's identifier in the pipeline. Returns: ComponentResult: The result for the specified processor. """ try: return next(filter(lambda x: x.identifier == identifier, self.component_results)) except StopIteration: raise KeyError('No result for identifier: ' + identifier)
def add_result_times(times_map: TimesMap, result: PipelineResult, pipeline_name: str): times = {} for component_id, _, component_times, _ in result.component_results: times.update({component_id + ':' + k: v for k, v in component_times.items()}) times[pipeline_name + ':total'] = result.elapsed_time add_times(times_map, times) @dataclass class PipelineTimes: name: str component_ids: List[str] _times_map: TimesMap = field(default_factory=dict) def add_other(self, other: 'PipelineTimes'): """Merges all the times from the other times into this. Args: other: Another pipeline times. """ if self.component_ids != other.component_ids: raise ValueError("component_ids need to match") _times_map = dict() for k in self._times_map: if k in other._times_map: _times_map[k] = self._times_map[k].merge(other._times_map[k]) for k in other._times_map: if k not in self._times_map: _times_map[k] = copy.copy(other._times_map[k]) @overload def processor_timer_stats(self) -> List[AggregateTimingInfo]: """Returns the timing information for all processors. Returns: A list of timing info objects, one for each processing component, in the same order as in the pipeline. """ ... @overload def processor_timer_stats( self, identifier: str ) -> AggregateTimingInfo: """Returns the timing info for one processor. Args: identifier: The pipeline component_id for the processor to return timing info. Returns: The timing info for the specified processor. """ ... def processor_timer_stats(self, identifier=None): if identifier is not None: aggregates = create_timer_stats(self._times_map, identifier + ':') aggregates = {k[(len(identifier) + 1):]: v for k, v in aggregates.items()} return AggregateTimingInfo(identifier=identifier, timing_info=aggregates) timing_infos = [] for component_id in self.component_ids: aggregates = create_timer_stats(self._times_map, component_id + ':') aggregates = {k[(len(component_id) + 1):]: v for k, v in aggregates.items()} timing_infos.append( AggregateTimingInfo(identifier=component_id, timing_info=aggregates)) return timing_infos def aggregate_timer_stats(self) -> AggregateTimingInfo: """The aggregated statistics for the global runtime of the pipeline. Returns: AggregateTimingInfo: The timing stats for the global runtime of the pipeline. """ aggregates = create_timer_stats(self._times_map, self.name) aggregates = {k[(len(self.name) + 1):]: v for k, v in aggregates.items()} return AggregateTimingInfo(identifier=self.name, timing_info=aggregates) def print(self): """Prints all the times collected during this pipeline using :func:`print`. """ self.aggregate_timer_stats().print() for pipeline_timer in self.processor_timer_stats(): pipeline_timer.print() def add_result_times(self, result: PipelineResult): """Adds the times from the result to these times. Args: result: A pipeline result. """ add_result_times(self._times_map, result, self.name)