# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functionality for handling errors during pipeline processing.
Errors are handled by instances of the :class:`ProcessingErrorHandler`
class. To set up error handlers, the pipeline has an
:attr:`~mtap.processing.Pipeline.error_handlers` attribute. The
error handlers in this attribute will be called sequentially when an
error occurs during processing.
The default error handlers list is:
#. :class:`SimpleErrorHandler` (name ``simple``).
#. :class:`TerminationErrorHandler` with
:attr:`~TerminationErrorHandler.max_failures` set to 0
(name ``termination``).
There are an additional number of built-in error handlers that can be used
including:
* :class:`LoggingErrorHandler` which logs to a specified
:class:`logging.Logger` object (name ``logging``).
* :class:`ErrorsDirectoryErrorHandler` which for all errors that occur
writes a set of files to disk containing the serialized event, the stack
trace, and the error information (name ``to_directory``).
* :class:`SuppressAllErrorsHandler` which suppresses all errors
(name ``suppress``).
In yaml pipeline configurations, error handlers can be specified using
their registered name (shown above) and a dictionary of parameters that
will be passed to the registered factory method (usually the constructor).
An example of error_handlers configuration in the pipeline file is shown
below.
. code-block:: text
error_handlers:
- name: logging
- name: to_directory
params:
output_directory: '/path/to/'
- name: termination
params:
max_failures: 0
components:
...
New error handlers can be registered to be loaded in this fashion by overriding the ``name`` method or using the
module-qualified full class name.
"""
import dataclasses
import logging
import os.path
from abc import abstractmethod
from os import PathLike
from typing import (
Any,
Callable,
Dict,
Optional,
Union,
ClassVar, Type, Tuple, Iterable
)
import mtap
from mtap._event import Event
from mtap.processing import ErrorInfo
from mtap.serialization import Serializer
LOGGER = logging.getLogger('mtap.processing')
ErrorHandlerFactory = Callable[..., 'ProcessingErrorHandler']
def handle_error(error_handlers: Iterable[Tuple['ProcessingErrorHandler', Dict]],
event: mtap.Event,
error: ErrorInfo):
for handler, state in error_handlers:
try:
handler.handle_error(event, error, state)
except SuppressError:
break
[docs]
class StopProcessing(Exception):
"""Thrown by error handlers when the pipeline should immediately
terminate."""
pass
[docs]
class SuppressError(Exception):
"""Thrown by error handlers when the error should be suppressed
and not handled by any downstream error handlers (later in the pipeline's
list of error handlers) ."""
pass
[docs]
class ProcessingErrorHandler:
"""Base class for an error handler that is included in a pipeline to
report and decide action for errors / exceptions that occur during
processing.
Note that you should not store any state on the error handler since it may
be reused across multiple runs of the same pipeline, instead store stateful
information in the state dictionary.
"""
_REGISTRY: ClassVar[Dict[str, ErrorHandlerFactory]] = {}
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
ProcessingErrorHandler._REGISTRY[cls.name()] = cls
[docs]
@staticmethod
def from_dict(conf: Optional[Dict[str, Any]]) -> 'ProcessingErrorHandler':
"""Creates an error handler from its dictionary representation.
Args:
conf: The dictionary representation of the error handler. Should have at minimum a ``name`` key with
``str`` value, can also have a dictionary of params that will be passed to the constructor of the
serializer.
Returns:
The instantiated error handler.
"""
return ProcessingErrorHandler._REGISTRY[conf['name']](**conf.get('params', {}))
[docs]
@classmethod
def name(cls):
"""Optional method that returns the name the error handler should be
loaded into the registry with, by default will just use the full path
name.
"""
return '.'.join([cls.__module__, cls.__name__])
[docs]
@abstractmethod
def handle_error(
self,
event: Event,
error_info: ErrorInfo,
state: Dict[Any, Any]
):
"""Handles an error that was caught by processing logic and
transformed into a :class:`~mtap.processing.ProcessingException`.
Args:
event:
The event that was being processed when the error was thrown.
error_info:
The information about the error.
state:
A dictionary that is preserved in-between calls to the
error handler that the handler can use to store state local
to the current run of processing.
Raises:
StopProcessing: if the pipeline should immediately stop processing.
SuppressError: if the error should be suppressed and not passed to
any downstream error handlers (later in the pipeline's list of
error handlers).
"""
raise NotImplementedError()
[docs]
class SimpleErrorHandler(ProcessingErrorHandler):
"""Prints a simple helpful explanation of the error info.
"""
def __init__(self, more_help: Optional[str] = None):
#: Additional help to be printed after every error.
self.more_help: str = (
more_help or
"Check the configuration settings to enable enhanced "
"debug behavior."
)
@classmethod
def name(cls):
return 'simple'
def handle_error(self, event, error_info, state):
from mtap.processing import ErrorOrigin
if error_info.error_repr == "ValueError('Cannot invoke RPC on closed channel!')":
return
if error_info.origin == ErrorOrigin.REMOTE:
print(
f"An error occurred while processing an event with id "
f"'{event.event_id}' through the remote component "
f"'{error_info.component_id}' at address "
f"'{error_info.address}': {error_info.grpc_code}")
if error_info.localized_msg:
print(
f"It had the following message: {error_info.localized_msg}"
)
print(f"{self.more_help}", flush=True)
else:
print(
f"An error occurred while processing an event with id "
f"'{event.event_id}' through the component "
f"'{error_info.component_id}': '{error_info.error_repr}'\n"
f"It had the following message: {error_info.localized_msg}\n"
f"{self.more_help}")
[docs]
class TerminationErrorHandler(ProcessingErrorHandler):
"""Terminates the pipeline after more than :attr:`max_failures`
number of errors occurs.
Attributes:
max_failures: The maximum number of errors to allow before
immediately terminating the pipeline.
"""
max_failures: int
def __init__(self, max_failures: int = 0):
self.max_failures = max_failures
@classmethod
def name(cls):
return 'termination'
def handle_error(self, _1, _2, state):
try:
state['failures'] += 1
except KeyError:
state['failures'] = 1
if state['failures'] > self.max_failures:
print(f"Pipeline exceeded the maximum number "
f"of allowed failures ({self.max_failures}) "
f"and is terminating.")
raise StopProcessing()
[docs]
class LoggingErrorHandler(ProcessingErrorHandler):
"""Logs errors to a specified :class:`logging.Logger` object.
Args:
logger: Either the logger itself, the logger name, or none to use
``mtap.processing``.
Attributes:
logger: The logger to use.
"""
def __init__(self, logger: Union[str, logging.Logger, None] = None, level='error'):
if isinstance(logger, str):
logger = logging.getLogger(logger)
self.logger = getattr(logger or logging.getLogger('mtap.processing'), level)
@classmethod
def name(cls):
return 'logging'
def handle_error(self, event, error_info, state):
from mtap.processing import ErrorOrigin
if error_info.origin == ErrorOrigin.REMOTE:
msg = (f"An error occurred while processing an event with id "
f"'{event.event_id}' through the remote component "
f"'{error_info.component_id}' at address "
f"'{error_info.address}': {error_info.error_repr}")
if error_info.localized_msg:
msg += f"It had the following message: {error_info.localized_msg}"
msg += ''.join(error_info.stack_trace)
self.logger(msg)
else:
self.logger(
f"An error occurred while processing an event with id "
f"'{event.event_id}' through the component "
f"'{error_info.component_id}': '{error_info.error_repr}'\n"
f"It had the following message: {error_info.localized_msg}\n"
+ ''.join(error_info.stack_trace)
)
[docs]
class ErrorsDirectoryErrorHandler(ProcessingErrorHandler):
"""Built-in Error Handler which handles failures in pipeline processing by
writing files containing the error info, stack trace, and serialized event.
Args:
serializer: Either a serializer, a configuration dictionary that can
be used to instantiate a serializer, or ``None`` to use the
default json serializer.
Attributes:
output_directory: The directory to write the files to.
serializer: The serializer to use to serialize the event.
"""
output_directory: Union[str, bytes, PathLike]
serializer: Type[Serializer]
def __init__(self,
output_directory: Union[str, bytes, PathLike, None] = None,
serializer: Union[Type[Serializer], str, None] = None):
self.output_directory = 'errors' if output_directory is None else output_directory
if serializer is None:
serializer = Serializer.get('json')
if not issubclass(serializer, Serializer):
serializer = Serializer.get(serializer)
self.serializer = serializer
@classmethod
def name(cls):
return 'to_directory'
def handle_error(self, event, error_info, _):
d = os.path.join(self.output_directory, event.event_id)
os.makedirs(d, exist_ok=True)
print(f"Writing error information to: "
f"'{os.path.abspath(os.fspath(d))}'")
import json
with open(os.path.join(d, 'info.json'), 'w') as f:
json.dump(dataclasses.asdict(error_info), f, default=str)
ser_path = os.path.join(d, 'event' + self.serializer.extension())
self.serializer.event_to_file(event, ser_path)
with open(os.path.join(d, 'trace.txt'), 'w') as f:
for line in error_info.stack_trace:
f.write(line) # The lines already have builtin line breaks
[docs]
class SuppressAllErrorsHandler(ProcessingErrorHandler):
@classmethod
def name(cls):
return 'suppress'
def handle_error(self, *_args, **_kwargs):
raise SuppressError()