# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Serialization, serializers, and helper methods for going to and from
flattened python dictionary representations of events.
"""
import logging
import os
from abc import abstractmethod
from base64 import standard_b64decode, standard_b64encode
from os import PathLike
from typing import Union, Dict, Any, Optional, TextIO, Type, ClassVar, IO, \
BinaryIO
from mtap._document import Document
from mtap._event import Event
from mtap._events_client import EventsClient
from mtap._label_adapters import GENERIC_ADAPTER
from mtap.descriptors import processor, parameter
from mtap.processing import EventProcessor, Processor
logger = logging.getLogger('mtap.serialization')
[docs]
def event_to_dict(event: Event, *,
include_label_text: bool = False,
include_binaries: bool = False) -> Dict:
"""A helper method that turns an event into a python dictionary.
Args:
event: The event object.
include_label_text: Whether to include the text labels cover with the
labels.
include_binaries: Whether to serialize and include the event's binaries.
Returns:
dict: A dictionary object suitable for serialization.
"""
d = {
'event_id': event.event_id,
'metadata': {},
'documents': {}
}
for k, v in event.metadata.items():
d['metadata'][k] = v
for doc in event.documents.values():
d['documents'][doc.document_name] = document_to_dict(
doc,
include_label_text=include_label_text
)
if include_binaries:
d['binaries'] = {}
for k, binary in event.binaries.items():
d[k] = standard_b64encode(binary).decode("utf-8")
return d
[docs]
def document_to_dict(document: Document, *,
include_label_text: bool = False) -> Dict:
"""A helper method that turns a document into a python dictionary.
Args:
document: The document object.
include_label_text: Whether to include the text labels cover with the
labels.
Returns:
A dictionary object suitable for serialization.
"""
d = {
'text': document.text,
'label_indices': {}
}
for index_name, index in document.labels.items():
adapter = index.adapter
if adapter is None:
adapter = GENERIC_ADAPTER
d['label_indices'][index_name] = adapter.pack(
index,
include_label_text=include_label_text
)
return d
[docs]
def dict_to_event(d: Dict, *,
client: Optional[EventsClient] = None) -> Event:
"""Turns a serialized dictionary into an Event.
Args:
d: A json-like (only ``str`` keys) dictionary representation of the
event.
client: If specified, will upload the event to this events service.
Otherwise, creates a local event.
Returns:
The deserialized event object.
"""
event = Event(event_id=d['event_id'], client=client)
for k, v in d.get('metadata', {}).items():
event.metadata[k] = v
for k, v in d.get('documents', {}).items():
dict_to_document(k, v, event=event)
for k, v in d.get('binaries', {}).items():
event.binaries[k] = standard_b64decode(str.encode(v, 'utf-8'))
return event
[docs]
def dict_to_document(document_name: str,
d: Dict,
*, event: Optional[Event] = None) -> Document:
"""Turns a serialized dictionary into a Document.
Args:
document_name: The name identifier of the document.
d: A json-like (only ``str`` keys) dictionary representation of the
document.
event: If specified, the function will add the document to this
event. Otherwise, it will create a stand-alone document.
Returns:
The deserialized Document object.
"""
document = Document(document_name=document_name, text=d['text'])
if event is not None:
event.add_document(document)
for k, v in d.get('label_indices', {}).items():
adapter = document.default_adapter(k)
index = adapter.unpack(v, k, document=document)
document.add_labels(k, index, distinct=index.distinct)
return document
[docs]
class Serializer:
"""Abstract base class for a serializer of MTAP events.
"""
_REGISTRY: ClassVar[Dict[str, Type['Serializer']]] = {}
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
Serializer._REGISTRY[cls.name()] = cls
@staticmethod
def get(name: str) -> Type['Serializer']:
return Serializer._REGISTRY[name]
@classmethod
def name(cls) -> str:
return cls.__name__.casefold()
[docs]
@classmethod
def extension(cls) -> str:
"""str: Filename extension, including period. Ex: ``'.json'``."""
...
[docs]
@classmethod
@abstractmethod
def event_to_file(
cls,
event: Event,
f: Union[str, bytes, PathLike, IO],
*, include_label_text: bool = False
):
"""Writes the event to a file.
Args:
event: The event object to serialize.
f: A file or a path to a file to write the event to.
include_label_text: Whether, when serializing, to include the text
that each label covers with the rest of the label.
"""
...
[docs]
@classmethod
@abstractmethod
def file_to_event(
cls,
f: Union[str, bytes, PathLike, IO],
*, client: Optional[EventsClient] = None
) -> Event:
"""Loads an event from a serialized file.
Args:
f: The file to load from.
client: The events service to load the event into.
Returns:
The loaded event object.
"""
...
[docs]
@processor(
'mtap-serializer',
description='Serializes events to a specific directory',
parameters=[
parameter(
'filename',
data_type='str',
description='Optional override for the filename '
'to write the document to.'
)
]
)
class SerializationProcessor(EventProcessor):
"""An MTAP :obj:`EventProcessor` that serializes events to a specific
directory.
Attributes:
serializer (~mtap.serialization.Serializer): The serializer to use.
output_dir: The output directory.
include_label_text: Whether to attach the covered text to labels.
"""
serializer: Serializer
output_dir: Union[str, bytes, PathLike]
include_label_text: bool
def __init__(
self,
serializer: Serializer,
output_dir: Union[str, bytes, PathLike],
include_label_text: bool = False
):
self.serializer = serializer
self.output_dir = os.path.abspath(output_dir)
self.include_label_text = include_label_text
os.makedirs(self.output_dir, exist_ok=True)
def process(self, event: Event, params: Dict[str, Any]):
name = params.get('filename',
event.event_id + self.serializer.extension())
path = os.path.join(self.output_dir, name)
self.serializer.event_to_file(
event,
path,
include_label_text=self.include_label_text
)
[docs]
class JsonSerializer(Serializer):
"""Serializer implementation that performs serialization to JSON.
"""
@classmethod
def name(cls) -> str:
return 'json'
@classmethod
def extension(cls) -> str:
return '.json'
@classmethod
def event_to_file(cls,
event: Event,
f: Union[str, bytes, PathLike, IO], *,
include_label_text: bool = False):
with Processor.started_stopwatch('transform'):
d = event_to_dict(event, include_label_text=include_label_text)
with Processor.started_stopwatch('io'):
_json_dump_file_or_path(d, f)
@classmethod
def file_to_event(cls,
f: Union[str, bytes, PathLike, IO],
client: Optional[EventsClient] = None) -> Event:
with Processor.started_stopwatch('io'):
d = _json_load_file_or_path(f)
with Processor.started_stopwatch('transform'):
return dict_to_event(d, client=client)
def _json_dump_file_or_path(d, f):
import json
try:
json.dump(d, f)
return
except (AttributeError, TypeError):
pass
os.makedirs(os.path.dirname(f), exist_ok=True)
with open(f, 'w') as f:
json.dump(d, f)
def _json_load_file_or_path(f):
import json
try:
return json.load(f)
except (AttributeError, TypeError):
pass
with open(f, 'r') as f:
return json.load(f)
[docs]
class YamlSerializer(Serializer):
"""Serializer implementation that performs serialization to YAML.
"""
@classmethod
def name(cls) -> str:
return 'yaml'
@classmethod
def extension(cls) -> str:
return '.yml'
@classmethod
def event_to_file(cls, event: Event,
f: Union[str, bytes, PathLike, IO], *,
include_label_text: bool = False):
with Processor.started_stopwatch('transform'):
d = event_to_dict(event, include_label_text=include_label_text)
with Processor.started_stopwatch('io'):
_yaml_dump_file_or_path(d, f)
@classmethod
def file_to_event(cls,
f: Union[str, bytes, PathLike, TextIO],
*, client: Optional[EventsClient] = None) -> Event:
with Processor.started_stopwatch('io'):
d = _yaml_load_file_or_path(f)
with Processor.started_stopwatch('transform'):
return dict_to_event(d, client=client)
def _yaml_dump_file_or_path(d, f):
import yaml
try:
from yaml import CDumper as Dumper
except ImportError:
from yaml import Dumper
try:
yaml.dump(d, f, Dumper=Dumper)
return
except (AttributeError, TypeError):
pass
os.makedirs(os.path.dirname(f), exist_ok=True)
with open(f, 'w') as f:
yaml.dump(d, f, Dumper=Dumper)
def _yaml_load_file_or_path(f):
import yaml
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
try:
return yaml.load(f, Loader=Loader)
except (AttributeError, TypeError):
pass
with open(f) as f:
return yaml.load(f, Loader=Loader)
[docs]
class PickleSerializer(Serializer):
"""Serializer implementation that performs serialization to .pickle.
"""
@classmethod
def name(cls) -> str:
return 'pickle'
@classmethod
def extension(cls) -> str:
return '.pickle'
@classmethod
def event_to_file(cls,
event: Event,
f: Union[str, bytes, PathLike, BinaryIO], *,
include_label_text: bool = False):
with Processor.started_stopwatch('transform'):
d = event_to_dict(event, include_label_text=include_label_text)
with Processor.started_stopwatch('io'):
_pickle_dump_file_or_path(d, f)
@classmethod
def file_to_event(
cls,
f: Union[str, bytes, PathLike, BinaryIO], *,
client: Optional[EventsClient] = None
) -> Event:
with Processor.started_stopwatch('io'):
d = _pickle_load_file_or_path(f)
with Processor.started_stopwatch('transform'):
return dict_to_event(d, client=client)
def _pickle_dump_file_or_path(d, f):
import pickle
try:
pickle.dump(d, f)
return
except (AttributeError, TypeError):
pass
os.makedirs(os.path.dirname(f), exist_ok=True)
with open(f, 'wb') as f:
pickle.dump(d, f)
def _pickle_load_file_or_path(f):
import pickle
try:
return pickle.load(f)
except (AttributeError, TypeError):
pass
with open(f, 'rb') as f:
return pickle.load(f)