Source code for mtap._event

# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from typing import ContextManager, Dict, MutableMapping, Optional, Iterator, \
    List, Mapping, TYPE_CHECKING, cast

from mtap._document import Document
from mtap._events_client import EventsClient

if TYPE_CHECKING:
    from mtap._label_adapters import ProtoLabelAdapter


[docs] class Event(ContextManager['Event']): """An object for interacting with a specific event locally or on the events service. The Event object functions as a map from string document names to :obj:`Document` objects that can be used to access document data from the events server. To connect to the events service and load an existing event, all of ``event_id``, ``event_service_instance_id``, and ``client`` must be specified. Args: event_id: A globally-unique identifier for the event, or omit / none for a random UUID. client: If specified, connects to the events service with id ``event_service_instance_id`` and accesses either accesses the existing event with the ``event_id`` or creates a new one. only_create_new: Fails if the event already exists on the events service. Attributes: label_adapters: A mapping of string label index names to :class:`ProtoLabelAdapter` instances to perform custom mapping of label types. Examples: Creating a new event locally. >>> event = Event() Creating a new event remotely. >>> with EventsClient(address='localohost:50000') as c, \ >>> Event(event_id='id', client=c) as event: >>> # use event >>> ... Connecting to an existing event. >>> with EventsClient(address='localohost:50000') as c, \ >>> Event(event_id='id', >>> event_service_instance_id='events_sid', >>> client=c) as event: >>> >>> """ __slots__ = ('_event_id', '_client', '_lock', 'label_adapters', '_documents_cache', '_metadata_cache', '_binaries_cache', '_event_service_instance_id', '_leased') label_adapters: Mapping[str, 'ProtoLabelAdapter'] def __init__( self, event_id: Optional[str] = None, *, client: Optional['EventsClient'] = None, only_create_new: bool = False, label_adapters: Optional[Mapping[str, 'ProtoLabelAdapter']] = None, event_service_instance_id: Optional[str] = None, lease: bool = True ): self._event_id = event_id or str(uuid.uuid4()) self._event_service_instance_id = event_service_instance_id if label_adapters is not None: self.label_adapters = dict(label_adapters) else: self.label_adapters = {} self._client = client self._leased = lease if self._client is not None and self._leased: self._event_service_instance_id = self._client.open_event( event_service_instance_id, self._event_id, only_create_new=only_create_new ) self._documents_cache = None self._metadata_cache = None self._binaries_cache = None @property def client(self) -> Optional['EventsClient']: return self._client @property def event_id(self) -> str: """The globally unique identifier for this event.""" return self._event_id @property def event_service_instance_id(self) -> str: """The unique instance identifier for this event's paired event service.""" return self._event_service_instance_id @property def documents(self) -> Mapping[str, Document]: """A mutable mapping of strings to :obj:`Document` objects that can be used to query and add documents to the event.""" if self._documents_cache is None: self._documents_cache = _Documents(self) return self._documents_cache @property def metadata(self) -> MutableMapping[str, str]: """A mutable mapping of strings to strings that can be used to query and add metadata to the event.""" if self._metadata_cache is None: self._metadata_cache = _Metadata(self) return self._metadata_cache @property def binaries(self) -> MutableMapping[str, bytes]: """A mutable mapping of strings to bytes that can be used to query and add binary data to the event.""" if not self._binaries_cache: self._binaries_cache = _Binaries(self) return self._binaries_cache @property def created_indices(self) -> Dict[str, List[str]]: """A mapping of document names to a list of the names of all the label indices that have been added to that document""" try: return {document_name: document.created_indices for document_name, document in self._documents_cache.data.items()} except AttributeError: return {}
[docs] def close(self): """Closes this event. Lets the event service know that we are done with the event, allowing to clean up the event if no other clients have open leases to it.""" if self.client is not None and self._leased: self.release_lease()
[docs] def create_document(self, document_name: str, text: str) -> Document: """Adds a document to the event keyed by `document_name` and containing the specified `text`. Args: document_name: The event-unique identifier for the document, example: ``'plaintext'``. text: The content of the document. This is a required field, and the document text is final and immutable. Returns: The added document. Examples: >>> event = Event() >>> doc = event.create_document('plaintext', >>> text="The text of the document.") """ if not isinstance(text, str): raise ValueError('text is not string.') document = Document(document_name, text=text) self.add_document(document) return document
[docs] def add_document(self, document: Document): """Adds the document to this event, first uploading to events service if this event has a client connection to the events service. Args: document: The document to add to this event. Examples: >>> event = Event() >>> doc = Document('plaintext', >>> text="The text of the document.") >>> event.add_document(doc) """ if document.event is not None: raise ValueError('This document already has an event, it cannot ' 'be added to another.') document._event = self name = document.document_name if self.client is not None: self.client.add_document(*ids(self), name, document.text) cast(_Documents, self.documents).data[name] = document
def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not KeyboardInterrupt: self.close() def add_created_indices(self, created_indices): for k, v in created_indices.items(): try: doc = self.documents[k] doc.add_created_indices(v) except KeyError: pass def lease(self): if self.client is not None: self.client.open_event(self._event_service_instance_id, self.event_id, only_create_new=False) def release_lease(self): if self.client is not None: self.client.close_event(self.event_service_instance_id, self.event_id)
def ids(event): return event.event_service_instance_id, event.event_id class _Documents(Mapping[str, Document]): __slots__ = ('event', 'data') def __init__(self, event): self.event = event self.data = {} def __contains__(self, document_name: str) -> bool: if not isinstance(document_name, str): return False if document_name in self.data: return True self._refresh_documents() return document_name in self.data def __getitem__(self, document_name) -> Document: if not isinstance(document_name, str): raise KeyError try: return self.data[document_name] except KeyError: pass self._refresh_documents() return self.data[document_name] def __len__(self) -> int: self._refresh_documents() return len(self.data) def __iter__(self) -> Iterator[str]: self._refresh_documents() return iter(self.data) def _refresh_documents(self): if self.event.client is not None: document_names = self.event.client.get_all_document_names( *ids(self.event) ) for name in document_names: if name not in self.data: document = Document(name, event=self.event) self.data[name] = document class _Metadata(MutableMapping[str, str]): __slots__ = ('event', 'data') def __init__(self, event: Event): self.event = event self.data = {} def __contains__(self, key): if key in self.data: return True self._refresh_metadata() return key in self.data def __setitem__(self, key, value): if key in self: raise KeyError("Metadata already exists with key: " + key) self.data[key] = value if self.event.client is not None: if not self.event.client.add_metadata(*ids(self.event), key, value): raise ValueError() def __getitem__(self, key): try: return self.data[key] except KeyError: self._refresh_metadata() return self.data[key] def __delitem__(self, v) -> None: raise NotImplementedError def __iter__(self) -> Iterator[str]: self._refresh_metadata() return iter(self.data) def __len__(self) -> int: self._refresh_metadata() return len(self.data) def _refresh_metadata(self): if self.event.client is not None: response = self.event.client.get_all_metadata(*ids(self.event)) self.data.update(response) class _Binaries(MutableMapping[str, bytes]): __slots__ = ('event', 'names', 'data') def __init__(self, event: Event): self.event = event self.names = set() self.data = {} def __contains__(self, key): if key in self.names: return True self._refresh_binaries() return key in self.names def __setitem__(self, key, value): if key in self: raise KeyError("Binary already exists with name: " + key) self.names.add(key) self.data[key] = value if self.event.client is not None: if not self.event.client.add_binary_data(*ids(self.event), key, value): raise ValueError() def __getitem__(self, key): try: return self.data[key] except KeyError: pass if self.event.client is not None: b = self.event.client.get_binary_data(*ids(self.event), binary_data_name=key) self.names.add(b) self.data[key] = b return self.data[key] def __delitem__(self, v) -> None: raise NotImplementedError def __iter__(self) -> Iterator[str]: self._refresh_binaries() return iter(self.names) def __len__(self) -> int: self._refresh_binaries() return len(self.names) def _refresh_binaries(self): if self.event.client is not None: response = self.event.client.get_all_binary_data_names( *ids(self.event) ) self.names.update(response)