Source code for mtap.pipeline._pipeline_components

# Copyright 2023 Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional, Dict, Any

from mtap._events_client import EventsAddressLike
from mtap.processing import ProcessingComponent, EventProcessor, LocalRunner, \
    RemoteRunner


DEFAULT_COMPONENT_ID = 'pipeline_component'


[docs] class ComponentDescriptor(ABC): """A configuration which describes either a local or remote pipeline component and what the pipeline needs to do to call the component. """ __slots__ = () @property @abstractmethod def component_id(self) -> str: """How the processor's results will be identified locally. Will be modified to be unique if it is not unique relative to other components in the pipeline. """ ... @property @abstractmethod def enabled(self) -> bool: """Whether the processor is enabled and should be run in the pipeline.""" ... @abstractmethod def create_pipeline_component( self, events_address: EventsAddressLike ) -> ProcessingComponent: """Turns the descriptor into a component that can be used during processing.""" pass
[docs] @dataclass class LocalProcessor(ComponentDescriptor): """A configuration of a locally-invoked processor.""" processor: EventProcessor """The processor instance to run with the pipeline.""" component_id: str = DEFAULT_COMPONENT_ID """How the processor's results will be identified locally. Will be modified to be unique if it is not unique relative to other components in the pipeline. """ params: Dict[str, Any] = field(default_factory=dict) """An optional parameter dictionary that will be passed to the processor as parameters with every event or document processed. Values should be json-serializable. """ enabled: bool = True """Whether the processor is enabled and should be run in the pipeline.""" def __post_init__(self): if self.component_id is DEFAULT_COMPONENT_ID: self.component_id = self.processor.metadata['name'] def create_pipeline_component( self, events_address: EventsAddressLike ) -> ProcessingComponent: if not self.enabled: raise ValueError("Cannot create pipeline component for processor that is not enabled.") runner = LocalRunner(processor=self.processor, events_address=events_address, component_id=self.component_id, params=copy.deepcopy(self.params)) return runner
[docs] @dataclass class RemoteProcessor(ComponentDescriptor): """A remote processor in a pipeline.""" name: str """The processor service name, currently used for health checking.""" address: Optional[str] = None """An address to use. """ component_id: str = DEFAULT_COMPONENT_ID """How the processor's results will be identified locally. Will be modified to be unique if it is not unique relative to other components in a pipeline. """ params: Dict[str, Any] = field(default_factory=dict) """A parameter dictionary that will be passed to the processor as parameters with every event or document processed. Values should be json-serializable. """ enable_proxy: bool = False """Whether the grpc channel used to connect to the service should respect the ``http_proxy`` environment variable. """ enabled: bool = True """Whether the processor is enabled and should be run in the pipeline.""" call_timeout: float = 10.0 """A customizable timeout for calls to the remote processor.""" def __post_init__(self): if self.component_id is DEFAULT_COMPONENT_ID: self.component_id = self.name def create_pipeline_component( self, events_address: EventsAddressLike ) -> ProcessingComponent: if not self.enabled: raise ValueError("Cannot create pipeline component for processor that is not enabled.") runner = RemoteRunner(processor_name=self.name, component_id=self.component_id, address=self.address, params=copy.deepcopy(self.params), enable_proxy=self.enable_proxy, call_timeout=self.call_timeout) return runner