mtap.pipeline

Running a pipeline

class mtap.pipeline.Pipeline(*components: ComponentDescriptor, name: str | None = None, events_address: str | List[str] | None = None, mp_config: MpConfig | None = None, error_handlers: List[ProcessingErrorHandler] | None = None)[source]

Bases: List[ComponentDescriptor]

An object which can be used to build and run a pipeline of remote and local processors.

Pipelines are a MutableSequence containing one or more ComponentDescriptor, a pipeline can be modified after creation using this functionality.

Parameters:

*components – Component descriptors created using RemoteProcessor or LocalProcessor.

name: str

The pipeline’s name.

events_address: str | List[str] | None

Optional events address. Required if using local processors.

mp_config: MpConfig

The multiprocessing configuration for the pipeline.

error_handlers: List[ProcessingErrorHandler]

The error handlers to use when running the pipeline.

static from_yaml_file(conf_path: str | bytes | PathLike) Pipeline[source]

Creates a pipeline from a yaml pipeline configuration file.

Parameters:

conf_path – The path to the configuration file.

Returns:

Pipeline object from the configuration.

static from_dict(conf: Dict) Pipeline[source]

Creates a pipeline from a pipeline configuration dictionary.

Parameters:

conf – The pipeline configuration dictionary.

Returns:

Pipeline created from the configuration.

run_multithread(source: Iterable[Event | Document] | ProcessingSource, *, total: int | None = None, callback: Callable[[Event, PipelineResult], Any] | None = None, **kwargs) PipelineTimes[source]

Runs this pipeline on a source which provides multiple documents / events.

Concurrency is per-event, with each event being provided a thread which runs it through the pipeline.

Parameters:
  • source – The processing source.

  • total – Total documents in the processing source.

  • callback – A callback that receives events and their associated results from the finished pipeline.

  • kwargs – Override for any of the MpConfig attributes.

Raises:

PipelineTerminated – If an error handler terminates the pipeline before it is completed.

Examples

>>> docs = list(pathlib.Path('abc/').glob('*.txt'))
>>>
>>> def document_source():
>>>     for path in docs:
>>>         with path.open('r') as f:
>>>             txt = f.read()
>>>         with Event(event_id=path.name,
>>>                    client=pipeline.events_client) as event:
>>>             doc = event.create_document('plaintext', txt)
>>>             yield doc
>>>
>>> pipeline.run_multithread(document_source(), total=len(docs))
run(target: Event | Document, *, params: Dict[str, Any] | None = None) PipelineResult[source]

Processes the event/document using all the processors in the pipeline.

Parameters:
  • target – Either an event or a document to process.

  • params – Json object containing params specific to processing this event, the existing params dictionary defined in add_processor() will be updated with the contents of this dict.

Returns:

The results of all the processors in the pipeline.

Examples

>>> e = mtap.Event()
>>> document = mtap.Document('plaintext', text="...", event=e)
>>> pipeline.run(document)
create_times() PipelineTimes[source]

Creates a timing object that can be used to store run times.

Returns:

A timing object.

Examples:

>>> times = pipeline.create_times()
>>> result = pipeline.run(document)
>>> times.add_result_times(result)
class mtap.pipeline.MpConfig(show_progress: bool = False, params: ~typing.Dict[str, ~typing.Any] = <factory>, workers: int = 10, read_ahead: int = 10, close_events: bool = False, log_level: str = 'INFO', mp_start_method: str = 'spawn', mp_context: multiprocessing | None = None)[source]

Configuration object for pipeline multiprocessing.

show_progress: bool = False

Whether progress should be displayed in the console.

workers: int = 10

Number of workers to concurrently process events through the pipeline.

read_ahead: int = 10

The number of documents to read onto the events service(s) to queue for processing.

close_events: bool = False

Whether any events passed from the source to the pipeline should be closed when the pipeline is completed.

log_level: str = 'INFO'

The log level to use.

mp_start_method: str = 'spawn'

The start method for multiprocessing processes see: multiprocessing.get_context().

mp_context: multiprocessing | None = None

An optional mp_context. If set overrides the mp_start_method attribute. If not set will use multiprocessing.get_context(mp_start_method) to create the context.

class mtap.pipeline.ComponentDescriptor[source]

A configuration which describes either a local or remote pipeline component and what the pipeline needs to do to call the component.

class mtap.pipeline.RemoteProcessor(name: str, address: str | None = None, component_id: str = 'pipeline_component', params: ~typing.Dict[str, ~typing.Any] = <factory>, enable_proxy: bool = False, enabled: bool = True, call_timeout: float = 10.0)[source]

Bases: ComponentDescriptor

A remote processor in a pipeline.

class mtap.pipeline.LocalProcessor(processor: ~mtap.processing._processor.EventProcessor, component_id: str = 'pipeline_component', params: ~typing.Dict[str, ~typing.Any] = <factory>, enabled: bool = True)[source]

Bases: ComponentDescriptor

A configuration of a locally-invoked processor.

class mtap.pipeline.PipelineTerminated(*args)[source]

Raised after an error handler terminates the pipeline before completion.

Pipeline Document Sources

class mtap.pipeline.ProcessingSource[source]

Provides events or documents for the multithreaded pipeline runner. Also has functionality for receiving results.

property total: int | None

The total number of documents this source will provide.

Returns:

Total number of events this source will provide or None if not known.

abstract produce() Generator[Event | Document, None, None][source]

The method which provides documents or events to the pipeline to batch process.

Examples

Example implementation for processing text documents:

>>> ...
>>> def produce(self, consume):
>>>     for file in Path(".").glob("*.txt"):
>>>         with file.open('r') as fio:
>>>             txt = fio.read()
>>>         with Event(client=client) as e:
>>>             doc = event.create_document('plaintext', txt)
>>>             consume(doc)
close()[source]

Optional method: called to clean up after processing is complete.

Default behavior is to do nothing.

class mtap.pipeline.FilesInDirectoryProcessingSource(client: EventsClient, directory: str | bytes | PathLike, *, document_name: str = 'plaintext', extension_glob: str = '*.txt', count_total: bool = True, errors: str | None = None)[source]

Bases: ProcessingSource

Processing source for pipelines which iterates over files in a directory.

Parameters:

count_total – Should the count_total attribute be populated by iterating through the directory once to count all matching files.

client

Create the events using this client.

directory

The path to the directory of files to process.

document_name

Creates documents with this name and adds the file’s plain text.

extension_glob

A glob used to filter documents from the directory.

total

The total number of documents.

errors

The errors argument for open().

Examples

>>> with Pipeline(...) as pipeline:
>>>     pipeline.run_multithread(
>>>         FilesInDirectoryProcessingSource("/path/to/docs")
>>>     )

Pipeline Results

class mtap.pipeline.PipelineResult(component_results: List[ComponentResult], elapsed_time: timedelta)[source]

The result of processing an event or document in a pipeline.

component_results: List[ComponentResult]

The processing results for each individual component

elapsed_time: timedelta

The elapsed time for the entire pipeline.

component_result(identifier: str) ComponentResult[source]

Returns the component result for a specific identifier.

Parameters:

identifier – The processor’s identifier in the pipeline.

Returns:

The result for the specified processor.

Return type:

ComponentResult

class mtap.processing.results.ComponentResult(identifier: str, result_dict: Dict, timing_info: Dict[str, timedelta], created_indices: Dict[str, List[str]])[source]

The result of processing one document or event through a single processing component.

identifier: str

The id of the processor with respect to the pipeline.

result_dict: Dict

The json object returned by the processor as its results.

timing_info: Dict[str, timedelta]

A dictionary of the times taken processing this document.

created_indices: Dict[str, List[str]]

Any indices that have been added to documents by this processor.

class mtap.processing.results.TimerStats(mean: timedelta, std: timedelta, min: timedelta, max: timedelta, sum: timedelta)[source]

Statistics about a specific keyed measured duration recorded by a Stopwatch.

mean: timedelta

The sample mean of all measured durations.

std: timedelta

The sample standard deviation of all measured durations.

min: timedelta

The minimum of all measured durations.

max: timedelta

The maximum of all measured durations.

sum: timedelta

The sum of all measured durations.

class mtap.processing.results.AggregateTimingInfo(identifier: str, timing_info: Dict[str, TimerStats])[source]

Collection of all the timing info for a specific item / component.

identifier: str

The ID of the processor with respect to the pipeline.

timing_info: Dict[str, TimerStats]

A map from all the timer keys for the processor to the aggregated duration statistics.

print()[source]

Prints the aggregate timing info for all processing components using print.

static csv_header() str[source]

Returns the header for CSV formatted timing data.

Returns:

A string containing the column headers.

timing_csv() Generator[str, None, None][source]

Returns the timing data formatted as a string, generating each

Returns:

A generator of string rows for csv.

Pipeline Error Handling

Functionality for handling errors during pipeline processing.

Errors are handled by instances of the ProcessingErrorHandler class. To set up error handlers, the pipeline has an error_handlers attribute. The error handlers in this attribute will be called sequentially when an error occurs during processing.

The default error handlers list is:

  1. SimpleErrorHandler (name simple).

  2. TerminationErrorHandler with max_failures set to 0 (name termination).

There are an additional number of built-in error handlers that can be used including:

In yaml pipeline configurations, error handlers can be specified using their registered name (shown above) and a dictionary of parameters that will be passed to the registered factory method (usually the constructor). An example of error_handlers configuration in the pipeline file is shown below.

. code-block:: text

error_handlers: - name: logging - name: to_directory

params:

output_directory: ‘/path/to/’

  • name: termination params:

    max_failures: 0

components:

New error handlers can be registered to be loaded in this fashion by overriding the name method or using the module-qualified full class name.

class mtap.pipeline.ProcessingErrorHandler[source]

Base class for an error handler that is included in a pipeline to report and decide action for errors / exceptions that occur during processing.

Note that you should not store any state on the error handler since it may be reused across multiple runs of the same pipeline, instead store stateful information in the state dictionary.

static from_dict(conf: Dict[str, Any] | None) ProcessingErrorHandler[source]

Creates an error handler from its dictionary representation.

Parameters:

conf

The dictionary representation of the error handler. Should have at minimum a name key with str value, can also have a dictionary of params that will be passed to the constructor of the

serializer.

Returns:

The instantiated error handler.

classmethod name()[source]

Optional method that returns the name the error handler should be loaded into the registry with, by default will just use the full path name.

abstract handle_error(event: Event, error_info: ErrorInfo, state: Dict[Any, Any])[source]

Handles an error that was caught by processing logic and transformed into a ProcessingException.

Parameters:
  • event – The event that was being processed when the error was thrown.

  • error_info – The information about the error.

  • state – A dictionary that is preserved in-between calls to the error handler that the handler can use to store state local to the current run of processing.

Raises:
  • StopProcessing – if the pipeline should immediately stop processing.

  • SuppressError – if the error should be suppressed and not passed to any downstream error handlers (later in the pipeline’s list of error handlers).

class mtap.processing.ErrorInfo(origin: ErrorOrigin, component_id: str, lang: str, error_type: str, error_repr: str, localized_msg: str, locale: str, stack_trace: List[str], address: str | None = None)[source]

Information about an error.

origin

The source of the error.

Type:

mtap.processing._exc.ErrorOrigin

component_id

The id of the processing component that the error occurred in.

Type:

str

lang

What language did the error occur in?

Type:

str

error_type

The type of the error.

Type:

str

error_repr

The string representation of the error.

Type:

str

localized_msg

A localized, user-friendly message.

Type:

str

locale

The locale of the message.

Type:

str

stack_trace

The stack trace of the message.

Type:

List[str]

address

The remote address.

Type:

str | None

class mtap.processing.ErrorOrigin(value)[source]

Where the error occurred.

LOCAL = 1

Error occurred locally to this process.

REMOTE = 2

Error occurred on a remote component.

class mtap.pipeline.StopProcessing[source]

Thrown by error handlers when the pipeline should immediately terminate.

class mtap.pipeline.SuppressError[source]

Thrown by error handlers when the error should be suppressed and not handled by any downstream error handlers (later in the pipeline’s list of error handlers) .

class mtap.pipeline.SimpleErrorHandler(more_help: str | None = None)[source]

Bases: ProcessingErrorHandler

Prints a simple helpful explanation of the error info.

more_help: str

Additional help to be printed after every error.

class mtap.pipeline.TerminationErrorHandler(max_failures: int = 0)[source]

Bases: ProcessingErrorHandler

Terminates the pipeline after more than max_failures number of errors occurs.

max_failures

The maximum number of errors to allow before immediately terminating the pipeline.

Type:

int

class mtap.pipeline.LoggingErrorHandler(logger: str | Logger | None = None, level='error')[source]

Bases: ProcessingErrorHandler

Logs errors to a specified logging.Logger object.

Parameters:

logger – Either the logger itself, the logger name, or none to use mtap.processing.

logger

The logger to use.

class mtap.pipeline.ErrorsDirectoryErrorHandler(output_directory: str | bytes | PathLike | None = None, serializer: Type[Serializer] | str | None = None)[source]

Bases: ProcessingErrorHandler

Built-in Error Handler which handles failures in pipeline processing by writing files containing the error info, stack trace, and serialized event.

Parameters:

serializer – Either a serializer, a configuration dictionary that can be used to instantiate a serializer, or None to use the default json serializer.

output_directory

The directory to write the files to.

Type:

str | bytes | os.PathLike

serializer

The serializer to use to serialize the event.

Type:

Type[mtap.serialization._serialization.Serializer]

class mtap.pipeline.SuppressAllErrorsHandler[source]

Bases: ProcessingErrorHandler