mtap.pipeline
Running a pipeline
- class mtap.pipeline.Pipeline(*components: ComponentDescriptor, name: str | None = None, events_address: str | List[str] | None = None, mp_config: MpConfig | None = None, error_handlers: List[ProcessingErrorHandler] | None = None)[source]
Bases:
List
[ComponentDescriptor
]An object which can be used to build and run a pipeline of remote and local processors.
Pipelines are a
MutableSequence
containing one or moreComponentDescriptor
, a pipeline can be modified after creation using this functionality.- Parameters:
*components – Component descriptors created using
RemoteProcessor
orLocalProcessor
.
- events_address: str | List[str] | None
Optional events address. Required if using local processors.
- error_handlers: List[ProcessingErrorHandler]
The error handlers to use when running the pipeline.
- static from_yaml_file(conf_path: str | bytes | PathLike) Pipeline [source]
Creates a pipeline from a yaml pipeline configuration file.
- Parameters:
conf_path – The path to the configuration file.
- Returns:
Pipeline object from the configuration.
- static from_dict(conf: Dict) Pipeline [source]
Creates a pipeline from a pipeline configuration dictionary.
- Parameters:
conf – The pipeline configuration dictionary.
- Returns:
Pipeline created from the configuration.
- run_multithread(source: Iterable[Event | Document] | ProcessingSource, *, total: int | None = None, callback: Callable[[Event, PipelineResult], Any] | None = None, **kwargs) PipelineTimes [source]
Runs this pipeline on a source which provides multiple documents / events.
Concurrency is per-event, with each event being provided a thread which runs it through the pipeline.
- Parameters:
source – The processing source.
total – Total documents in the processing source.
callback – A callback that receives events and their associated results from the finished pipeline.
kwargs – Override for any of the
MpConfig
attributes.
- Raises:
PipelineTerminated – If an error handler terminates the pipeline before it is completed.
Examples
>>> docs = list(pathlib.Path('abc/').glob('*.txt')) >>> >>> def document_source(): >>> for path in docs: >>> with path.open('r') as f: >>> txt = f.read() >>> with Event(event_id=path.name, >>> client=pipeline.events_client) as event: >>> doc = event.create_document('plaintext', txt) >>> yield doc >>> >>> pipeline.run_multithread(document_source(), total=len(docs))
- run(target: Event | Document, *, params: Dict[str, Any] | None = None) PipelineResult [source]
Processes the event/document using all the processors in the pipeline.
- Parameters:
target – Either an event or a document to process.
params – Json object containing params specific to processing this event, the existing params dictionary defined in
add_processor()
will be updated with the contents of this dict.
- Returns:
The results of all the processors in the pipeline.
Examples
>>> e = mtap.Event() >>> document = mtap.Document('plaintext', text="...", event=e) >>> pipeline.run(document)
- class mtap.pipeline.MpConfig(show_progress: bool = False, params: ~typing.Dict[str, ~typing.Any] = <factory>, workers: int = 10, read_ahead: int = 10, close_events: bool = False, log_level: str = 'INFO', mp_start_method: str = 'spawn', mp_context: multiprocessing | None = None)[source]
Configuration object for pipeline multiprocessing.
- read_ahead: int = 10
The number of documents to read onto the events service(s) to queue for processing.
- close_events: bool = False
Whether any events passed from the source to the pipeline should be closed when the pipeline is completed.
- class mtap.pipeline.ComponentDescriptor[source]
A configuration which describes either a local or remote pipeline component and what the pipeline needs to do to call the component.
- class mtap.pipeline.RemoteProcessor(name: str, address: str | None = None, component_id: str = 'pipeline_component', params: ~typing.Dict[str, ~typing.Any] = <factory>, enable_proxy: bool = False, enabled: bool = True, call_timeout: float = 10.0)[source]
Bases:
ComponentDescriptor
A remote processor in a pipeline.
- class mtap.pipeline.LocalProcessor(processor: ~mtap.processing._processor.EventProcessor, component_id: str = 'pipeline_component', params: ~typing.Dict[str, ~typing.Any] = <factory>, enabled: bool = True)[source]
Bases:
ComponentDescriptor
A configuration of a locally-invoked processor.
Pipeline Document Sources
- class mtap.pipeline.ProcessingSource[source]
Provides events or documents for the multithreaded pipeline runner. Also has functionality for receiving results.
- property total: int | None
The total number of documents this source will provide.
- Returns:
Total number of events this source will provide or
None
if not known.
- abstract produce() Generator[Event | Document, None, None] [source]
The method which provides documents or events to the pipeline to batch process.
Examples
Example implementation for processing text documents:
>>> ... >>> def produce(self, consume): >>> for file in Path(".").glob("*.txt"): >>> with file.open('r') as fio: >>> txt = fio.read() >>> with Event(client=client) as e: >>> doc = event.create_document('plaintext', txt) >>> consume(doc)
- class mtap.pipeline.FilesInDirectoryProcessingSource(client: EventsClient, directory: str | bytes | PathLike, *, document_name: str = 'plaintext', extension_glob: str = '*.txt', count_total: bool = True, errors: str | None = None)[source]
Bases:
ProcessingSource
Processing source for pipelines which iterates over files in a directory.
- Parameters:
count_total – Should the
count_total
attribute be populated by iterating through the directory once to count all matching files.
- client
Create the events using this client.
- directory
The path to the directory of files to process.
- document_name
Creates documents with this name and adds the file’s plain text.
- extension_glob
A glob used to filter documents from the directory.
- total
The total number of documents.
Examples
>>> with Pipeline(...) as pipeline: >>> pipeline.run_multithread( >>> FilesInDirectoryProcessingSource("/path/to/docs") >>> )
Pipeline Results
- class mtap.pipeline.PipelineResult(component_results: List[ComponentResult], elapsed_time: timedelta)[source]
The result of processing an event or document in a pipeline.
- component_results: List[ComponentResult]
The processing results for each individual component
- component_result(identifier: str) ComponentResult [source]
Returns the component result for a specific identifier.
- Parameters:
identifier – The processor’s identifier in the pipeline.
- Returns:
The result for the specified processor.
- Return type:
- class mtap.processing.results.ComponentResult(identifier: str, result_dict: Dict, timing_info: Dict[str, timedelta], created_indices: Dict[str, List[str]])[source]
The result of processing one document or event through a single processing component.
- class mtap.processing.results.TimerStats(mean: timedelta, std: timedelta, min: timedelta, max: timedelta, sum: timedelta)[source]
Statistics about a specific keyed measured duration recorded by a
Stopwatch
.
- class mtap.processing.results.AggregateTimingInfo(identifier: str, timing_info: Dict[str, TimerStats])[source]
Collection of all the timing info for a specific item / component.
- timing_info: Dict[str, TimerStats]
A map from all the timer keys for the processor to the aggregated duration statistics.
Pipeline Error Handling
Functionality for handling errors during pipeline processing.
Errors are handled by instances of the ProcessingErrorHandler
class. To set up error handlers, the pipeline has an
error_handlers
attribute. The
error handlers in this attribute will be called sequentially when an
error occurs during processing.
The default error handlers list is:
SimpleErrorHandler
(namesimple
).TerminationErrorHandler
withmax_failures
set to 0 (nametermination
).
There are an additional number of built-in error handlers that can be used including:
LoggingErrorHandler
which logs to a specifiedlogging.Logger
object (namelogging
).ErrorsDirectoryErrorHandler
which for all errors that occur writes a set of files to disk containing the serialized event, the stack trace, and the error information (nameto_directory
).SuppressAllErrorsHandler
which suppresses all errors (namesuppress
).
In yaml pipeline configurations, error handlers can be specified using their registered name (shown above) and a dictionary of parameters that will be passed to the registered factory method (usually the constructor). An example of error_handlers configuration in the pipeline file is shown below.
. code-block:: text
error_handlers: - name: logging - name: to_directory
- params:
output_directory: ‘/path/to/’
name: termination params:
max_failures: 0
- components:
…
New error handlers can be registered to be loaded in this fashion by overriding the name
method or using the
module-qualified full class name.
- class mtap.pipeline.ProcessingErrorHandler[source]
Base class for an error handler that is included in a pipeline to report and decide action for errors / exceptions that occur during processing.
Note that you should not store any state on the error handler since it may be reused across multiple runs of the same pipeline, instead store stateful information in the state dictionary.
- static from_dict(conf: Dict[str, Any] | None) ProcessingErrorHandler [source]
Creates an error handler from its dictionary representation.
- Parameters:
conf –
The dictionary representation of the error handler. Should have at minimum a
name
key withstr
value, can also have a dictionary of params that will be passed to the constructor of theserializer.
- Returns:
The instantiated error handler.
- classmethod name()[source]
Optional method that returns the name the error handler should be loaded into the registry with, by default will just use the full path name.
- abstract handle_error(event: Event, error_info: ErrorInfo, state: Dict[Any, Any])[source]
Handles an error that was caught by processing logic and transformed into a
ProcessingException
.- Parameters:
event – The event that was being processed when the error was thrown.
error_info – The information about the error.
state – A dictionary that is preserved in-between calls to the error handler that the handler can use to store state local to the current run of processing.
- Raises:
StopProcessing – if the pipeline should immediately stop processing.
SuppressError – if the error should be suppressed and not passed to any downstream error handlers (later in the pipeline’s list of error handlers).
- class mtap.processing.ErrorInfo(origin: ErrorOrigin, component_id: str, lang: str, error_type: str, error_repr: str, localized_msg: str, locale: str, stack_trace: List[str], address: str | None = None)[source]
Information about an error.
- origin
The source of the error.
- class mtap.processing.ErrorOrigin(value)[source]
Where the error occurred.
- LOCAL = 1
Error occurred locally to this process.
- REMOTE = 2
Error occurred on a remote component.
- class mtap.pipeline.StopProcessing[source]
Thrown by error handlers when the pipeline should immediately terminate.
- class mtap.pipeline.SuppressError[source]
Thrown by error handlers when the error should be suppressed and not handled by any downstream error handlers (later in the pipeline’s list of error handlers) .
- class mtap.pipeline.SimpleErrorHandler(more_help: str | None = None)[source]
Bases:
ProcessingErrorHandler
Prints a simple helpful explanation of the error info.
- class mtap.pipeline.TerminationErrorHandler(max_failures: int = 0)[source]
Bases:
ProcessingErrorHandler
Terminates the pipeline after more than
max_failures
number of errors occurs.
- class mtap.pipeline.LoggingErrorHandler(logger: str | Logger | None = None, level='error')[source]
Bases:
ProcessingErrorHandler
Logs errors to a specified
logging.Logger
object.- Parameters:
logger – Either the logger itself, the logger name, or none to use
mtap.processing
.
- logger
The logger to use.
- class mtap.pipeline.ErrorsDirectoryErrorHandler(output_directory: str | bytes | PathLike | None = None, serializer: Type[Serializer] | str | None = None)[source]
Bases:
ProcessingErrorHandler
Built-in Error Handler which handles failures in pipeline processing by writing files containing the error info, stack trace, and serialized event.
- Parameters:
serializer – Either a serializer, a configuration dictionary that can be used to instantiate a serializer, or
None
to use the default json serializer.
- output_directory
The directory to write the files to.
- Type:
str | bytes | os.PathLike
- serializer
The serializer to use to serialize the event.
- Type:
- class mtap.pipeline.SuppressAllErrorsHandler[source]
Bases:
ProcessingErrorHandler