# Copyright 2019 Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import overload, Optional, Mapping, Iterator, \
Iterable, Sequence, TypeVar, List, TYPE_CHECKING
from mtap._labeler import Labeler
from mtap._labels import Label
if TYPE_CHECKING:
from mtap import Event, GenericLabel
from mtap.types import EventsClient, ProtoLabelAdapter, LabelIndex
L = TypeVar('L', bound=Label)
LabelAdapters = Mapping[str, 'ProtoLabelAdapter']
[docs]
class Document:
"""An object for interacting with text and labels stored on an
:class:`Event`.
Documents are keyed by their name, and pipelines can store different
pieces of related text on a single processing event using multiple
documents. An example would be storing the text of one language on one
document, and a translation on another, or storing the rtf or html
encoding on one document (or as a binary in :meth:`Event.binaries`), and
the parsed plaintext on another document.
Both the document text and any added label indices are immutable. This is
to enable parallelization and distribution of processing, and because other
label indices might be downstream dependents on the earlier created labels.
Args:
document_name: The document name identifier.
text: The document text, can be omitted if this is an existing
document and text needs to be retrieved from the events service.
event: The parent event of this document. If the event has a client,
then that client will be used to share changes to this document
with all other clients of the Events service. In that case, text
should only be specified if it is the known existing text of the
document.
Examples:
Local document:
>>> document = Document('plaintext', text='Some document text.')
Existing distributed object:
>>> with EventsClient(address='localhost:8080') as client, \
>>> Event(event_id='1',
>>> event_service_instance_id='events_sid',
>>> client=client) as event:
>>> document = event.documents['plaintext']
>>> document.text
'Some document text fetched from the server.'
New distributed object:
>>> with EventsClient(address='localhost:8080') as client, \
>>> Event(event_id='1', client=client) as event:
>>> document = Document('plaintext', text='Some document text.')
>>> event.add_document(document)
or
>>> with EventsClient(address='localhost:8080') as client, \
>>> Event(event_id='1', client=client) as event:
>>> document = event.create_document('plaintext',
>>> text='Some document text.')
"""
__slots__ = (
'_document_name',
'_event',
'_text',
'_labelers',
'_created_indices',
'_waiting_indices',
'label_adapters',
'_labels',
)
@overload
def __init__(
self,
document_name: str,
*, text: str,
label_adapters: Optional[LabelAdapters] = None
):
...
@overload
def __init__(
self,
document_name: str,
*, event: 'Event',
label_adapters: Optional[LabelAdapters] = None
):
pass
@overload
def __init__(
self,
document_name: str,
*, text: str,
event: 'Event',
label_adapters: Optional[LabelAdapters] = None
):
pass
def __init__(
self,
document_name: str,
*, text: Optional[str] = None,
event: Optional['Event'] = None,
label_adapters: Optional[LabelAdapters] = None
):
if not isinstance(document_name, str):
raise TypeError('Document name is not string.')
self._document_name = document_name
self._event = event
self._text = text
self._labelers = []
self._created_indices = []
self._waiting_indices = []
self.label_adapters = label_adapters or {}
if self.event is not None and self.event.client is None and text is None:
raise ValueError(
'Document must either be an existing document (event.client '
'is not None) or must have the text parameter specified.'
)
@property
def event(self) -> Optional['Event']:
"""The parent event of this document."""
return self._event
@property
def document_name(self) -> str:
"""The unique identifier for this document on the event."""
return self._document_name
@property
def text(self) -> str:
"""The document text."""
if self._text is None and event_client(self) is not None:
self._text = event_client(self).get_document_text(*ids(self))
return self._text
@property
def created_indices(self) -> List[str]:
"""A list of all the label index names that have been created on this
document using a labeler either locally or by remote pipeline
components invoked on this document."""
return list(self._created_indices)
@property
def labels(self) -> Mapping[str, 'LabelIndex']:
"""A mapping from label index names to their label index.
Items will be fetched from the events service if they are not cached
locally when the document has an event with a client.
"""
return self._label_indices
@property
def _label_indices(self) -> '_LabelIndices':
try:
return self._labels
except AttributeError:
self._labels = _LabelIndices(self)
return self._labels
[docs]
def get_labeler(
self,
label_index_name: str,
*, distinct: Optional[bool] = None
) -> Labeler['GenericLabel']:
"""Alias for :meth:`labeler`
"""
return self.labeler(label_index_name, distinct=distinct)
[docs]
def labeler(
self,
label_index_name: str,
*, distinct: Optional[bool] = None
) -> Labeler['GenericLabel']:
"""Creates a function that can be used to add labels to a label index.
Args:
label_index_name: An identifying name for the label index.
distinct: Optional, if using generic labels, whether to use
distinct generic labels or non-distinct generic labels, will
default to False. Distinct labels are non-overlapping and
can use faster binary search indices.
Returns:
A callable when used in conjunction with the 'with' keyword will
automatically handle uploading any added labels to the server.
Examples:
>>> with document.get_labeler('sentences',
>>> distinct=True) as labeler:
>>> labeler(0, 25, sentence_type='STANDARD')
>>> sentence = labeler(26, 34)
>>> sentence.sentence_type = 'FRAGMENT'
"""
if label_index_name in self._labelers:
raise KeyError("Labeler already in use: " + label_index_name)
label_adapter = self.default_adapter(label_index_name, distinct)
labeler = Labeler(self, label_index_name, label_adapter)
self._labelers.append(label_index_name)
return labeler
[docs]
def add_labels(self,
label_index_name: str,
labels: Sequence[L],
*, distinct: Optional[bool] = None,
label_adapter: Optional['ProtoLabelAdapter'] = None):
"""Skips using a labeler and adds the sequence of labels as a new
label index.
Args:
label_index_name: The name of the label index.
labels: The labels to add.
distinct: Whether the index is distinct or non-distinct.
label_adapter: A label adapter to use.
Returns:
The new label index created from the labels.
"""
if label_index_name in self.labels:
raise KeyError(
"Label index already exists with name: " + label_index_name)
if label_adapter is None:
if distinct is None:
distinct = False
label_adapter = self.default_adapter(label_index_name,
distinct)
labels = sorted(labels, key=lambda x: x.location)
waiting_on = set()
for i, lbl in enumerate(labels):
lbl.document = self
lbl.identifier = i
lbl.label_index_name = label_index_name
for lbl in labels:
lbl.collect_floating_references(waiting_on)
if len(self._waiting_indices) > 0:
label_ids = {id(label) for label in labels}
self.check_waiting_indices(label_ids)
if len(waiting_on) > 0:
self._waiting_indices.append(
(label_index_name, labels, label_adapter, waiting_on))
else:
self.finalize_labels(label_adapter, label_index_name, labels)
def check_waiting_indices(self, updated_ids):
waiting_indices = []
for (label_index_name,
labels,
label_adapter,
waiting_on) in self._waiting_indices:
still_waiting = waiting_on.difference(updated_ids)
if len(still_waiting) == 0:
self.finalize_labels(label_adapter, label_index_name, labels)
else:
waiting_indices.append(
(label_index_name, labels, label_adapter, still_waiting))
self._waiting_indices = waiting_indices
def finalize_labels(self, label_adapter, label_index_name, labels):
label_adapter.store_references(labels)
if event_client(self) is not None:
event_client(self).add_labels(*ids(self),
index_name=label_index_name,
labels=labels,
adapter=label_adapter)
self._created_indices.append(label_index_name)
index = label_adapter.create_index(labels)
self._label_indices.add_to_cache(label_index_name, index)
return index
def default_adapter(
self,
label_index_name: str,
distinct: Optional[bool] = None
) -> Optional['ProtoLabelAdapter']:
try:
return self.label_adapters[label_index_name]
except KeyError:
pass
try:
return self.event.label_adapters[label_index_name]
except (AttributeError, KeyError):
from mtap._label_adapters import DISTINCT_GENERIC_ADAPTER
from mtap._label_adapters import GENERIC_ADAPTER
return DISTINCT_GENERIC_ADAPTER if distinct else GENERIC_ADAPTER
def add_created_indices(self, created_indices: Iterable[str]):
return self._created_indices.extend(created_indices)
class _LabelIndices(Mapping[str, 'LabelIndex']):
__slots__ = (
'document',
'cache',
'names_cache'
)
def __init__(self, document: Document):
self.document = document
self.cache = {}
self.names_cache = set()
def __getitem__(self, k: str) -> 'LabelIndex':
try:
return self.cache[k]
except KeyError:
pass
if k not in self:
raise KeyError
if event_client(self.document) is not None:
label_adapter = self.document.default_adapter(k)
if label_adapter is None:
from mtap._label_adapters import GENERIC_ADAPTER
label_adapter = GENERIC_ADAPTER
index = event_client(self.document).get_labels(
*ids(self.document),
k,
adapter=label_adapter
)
for label in index:
label.label_index_name = k
label.document = self.document
self.cache[k] = index
self.names_cache.add(k)
return index
else:
raise KeyError('Document does not have label index: ' + k)
def __contains__(self, item):
self.refresh_names()
return item in self.names_cache
def __len__(self) -> int:
self.refresh_names()
return len(self.names_cache)
def __iter__(self) -> Iterator[str]:
self.refresh_names()
return iter(self.names_cache)
def refresh_names(self):
if event_client(self.document) is not None:
infos = event_client(self.document).get_label_index_info(
*ids(self.document)
)
for info in infos:
self.names_cache.add(info.index_name)
def add_to_cache(self, label_index_name, index):
self.cache[label_index_name] = index
self.names_cache.add(label_index_name)
def ids(document: 'Document'):
return (
document.event.event_service_instance_id,
document.event.event_id,
document.document_name
)
def event_client(document: 'Document') -> Optional['EventsClient']:
if document.event is None:
return None
return document.event.client