Source code for mtap._events_client

# Copyright 2019 Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from abc import abstractmethod, ABC
from typing import overload, Iterable, ContextManager, Union, List, cast

EventsAddressLike = Union[str, List[str], None]

logger = logging.getLogger("mtap.events_client")


@overload
def events_client(address: str) -> 'EventsClient':
    """Creates a client object for interacting with the events service.

    Args:
        address: The events service target e.g. 'localhost:9090'.

    Examples:
        >>> with EventsClient(address='localhost:50000') as client, \\
        >>>      Event(event_id='1', client=client) as event:
        >>>     document = event.create_document(
        >>>         document_name='plaintext',
        >>>         text='The quick brown fox jumps over the lazy dog.'
        >>>     )
    """
    ...


@overload
def events_client(addresses: Iterable[str]) -> 'EventsClient':
    """Creates a client object for interacting with the events service.

    Args:
        addresses: A pool of events client addresses to use.
    """
    ...


@overload
def events_client() -> 'EventsClient':
    """Creates a client object for interacting with the events service.

    Uses service discovery to retrieve the address(es).
    """
    ...


[docs] def events_client(address: EventsAddressLike = None) -> 'EventsClient': logger.debug(f"Creating events client with address: {address}") from mtap._events_client_grpc import GrpcEventsClient if address is None or not isinstance(address, str) and len(address) == 0: from mtap.discovery import DiscoveryMechanism discovery = DiscoveryMechanism() address = discovery.discover_events_service('v1') if len(address) == 1: address = address[0] elif len(address) == 0: raise ValueError( "Did not discover any addresses for the events service." ) if isinstance(address, str): return GrpcEventsClient(address) elif isinstance(address, Iterable): from mtap._events_client_pool import EventsClientPool return cast(EventsClient, EventsClientPool(list(map(GrpcEventsClient, address)))) else: raise TypeError("Unsupported address type.", type(address))
[docs] class EventsClient(ContextManager['EventsClient'], ABC): """Communicates with the events service. """ @property @abstractmethod def instance_id(self): ...
[docs] @abstractmethod def close(self): """Closes the events client""" ...
@abstractmethod def ensure_open(self, instance_id): pass @abstractmethod def open_event(self, instance_id, event_id, only_create_new): pass @abstractmethod def close_event(self, instance_id, event_id): pass @abstractmethod def get_all_metadata(self, instance_id, event_id): pass @abstractmethod def add_metadata(self, instance_id, event_id, key, value): pass @abstractmethod def get_all_binary_data_names(self, instance_id, event_id): pass @abstractmethod def add_binary_data(self, instance_id, event_id, binary_data_name, binary_data): pass @abstractmethod def get_binary_data(self, instance_id, event_id, binary_data_name): pass @abstractmethod def get_all_document_names(self, instance_id, event_id): pass @abstractmethod def add_document(self, instance_id, event_id, document_name, text): pass @abstractmethod def get_document_text(self, instance_id, event_id, document_name): pass @abstractmethod def get_label_index_info(self, instance_id, event_id, document_name): pass @abstractmethod def add_labels(self, instance_id, event_id, document_name, index_name, labels, adapter): pass @abstractmethod def get_labels(self, instance_id, event_id, document_name, index_name, adapter): pass def __exit__(self, exc_type, exc_val, exc_tb): pass