# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for deploying a set of processing services and the events server all
at once.
See python/mtap/examples/exampleDeploymentConfiguration.yml for an example
of the yaml deployment configuration
which can be loaded using
:py:meth:`~mtap.deployment.Deployment.from_yaml_file`
"""
import argparse
import logging
import os
import pathlib
import shutil
import signal
import subprocess
import sys
import threading
import uuid
from contextlib import contextmanager
from dataclasses import dataclass
from os import PathLike
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
import grpc
from importlib_resources import files, as_file
from mtap import _config
__all__ = [
'Deployment',
'GlobalSettings',
'SharedProcessorConfig',
'EventsDeployment',
'ProcessorDeployment',
'main',
'deployment_parser',
'ServiceDeploymentException',
]
logger = logging.getLogger('mtap.deployment')
PYTHON_EXE = sys.executable
def _get_java() -> str:
try:
return str(pathlib.Path(os.environ['JAVA_HOME']) / 'bin' / 'java')
except KeyError:
return 'java'
JAVA_EXE = _get_java()
def _listen(process: subprocess.Popen, name: str) -> int:
for line in process.stdout:
print(f'{name}: {line.decode()}', end='', flush=True)
return process.wait()
class ServiceDeploymentException(Exception):
"""Exception raised in the case of a service failing to launch.
"""
def __init__(self, service_name: str, msg: str):
super().__init__(service_name, msg)
self.service_name = service_name
self.msg = msg
[docs]
@dataclass
class GlobalSettings:
"""Settings shared by event service and all processors.
"""
host: Optional[str] = None
"""The global host override, forces all services to use a specific host
name."""
mtap_config: Optional[str] = None
"""The path to an MTAP config file to load for all services."""
log_level: Optional[str] = None
"""A python logging level to pass to all services."""
[docs]
@staticmethod
def from_dict(conf: Optional[Dict[str, Any]]) -> 'GlobalSettings':
"""Creates a global settings object from a configuration dictionary.
Keyword Args:
conf: The configuration dictionary.
Returns:
The global settings object.
"""
conf = conf or {}
if "register" in conf:
del conf["register"]
print("The register setting has been deprecated from global deployment configuration, and currently does nothing.")
return GlobalSettings(**conf)
[docs]
@dataclass
class SharedProcessorConfig:
"""Configuration that is shared between multiple processor services.
"""
events_addresses: Optional[List[str]] = None
"""An optional GRPC-compatible target for the events service to be used by
all processors.
"""
workers: Optional[int] = None
"""The default number of worker threads which will perform processing."""
additional_args: Optional[List[str]] = None
"""A list of additional arguments that should be appended to every
processor.
"""
jvm_args: Optional[List[str]] = None
"""A list of JVM arguments for all Java processors."""
java_classpath: Optional[str] = None
"""A classpath string that will be passed to all Java processors."""
java_additional_args: Optional[List[str]] = None
"""A list of additional arguments that is added to only Java processors."""
startup_timeout: float = 30
"""The default startup timeout for processors."""
mp_spawn_method: Optional[str] = None
"""A :meth:`multiprocessing.get_context` argument to create the
multiprocessing context.
"""
[docs]
@staticmethod
def from_dict(conf: Optional[Dict[str, Any]]) -> 'SharedProcessorConfig':
"""Builds a configuration from a dictionary representation.
Args:
conf: The configuration dictionary.
Returns:
A shared processor config object.
"""
conf = conf or {}
return SharedProcessorConfig(**conf)
@dataclass
class _ServiceDeployment:
workers: Optional[int] = None
mtap_config: Optional[str] = None
log_level: Optional[str] = None
def service_args(
self,
host: Optional[str] = None,
port: Optional[int] = None,
sid: Optional[List[str]] = None,
global_host: Optional[str] = None,
workers_default: Optional[int] = None,
mtap_config_default: Optional[str] = None,
log_level_default: Optional[str] = None
) -> Tuple[List[str], int]:
call = []
host = global_host or host
if host is not None:
call.extend(['--host', str(host)])
if port is not None:
call.extend(['--port', str(port)])
sid = sid or str(uuid.uuid4())
call.extend(["--sid", sid])
workers = self.workers or workers_default
if workers is not None:
call.extend(['--workers', str(workers)])
mtap_config = self.mtap_config or mtap_config_default
if mtap_config is not None:
call.extend(['--mtap-config', mtap_config])
log_level = self.log_level or log_level_default
if log_level is not None:
call.extend(['--log-level', log_level])
return call, port
[docs]
@dataclass
class EventsDeployment:
"""Deployment configuration for the events service.
"""
enabled: bool = False
"""Whether an events service should be created."""
address: Optional[str] = None
"""The host address of one events service to launch."""
addresses: Optional[Sequence[str]] = None
"""The host addresses of multiple events services to launch."""
workers: Optional[int] = None
"""The number of worker threads the events service should use."""
mtap_config: Optional[str] = None
"""Path to an mtap configuration file."""
log_level: Optional[str] = None
"""The log level for the events service."""
startup_timeout: int = 10
"""The startup timeout for events deployment."""
[docs]
@staticmethod
def from_dict(conf: Optional[Dict]) -> 'EventsDeployment':
"""Creates the EventsDeployment configuration option from a
configuration dictionary.
Args:
conf: The configuration dictionary
Returns:
EventsDeployment or None from the configuration dictionary.
"""
conf = conf or {}
if "register" in conf:
del conf["register"]
print("The register setting has been deprecated from events service deployment configuration, and currently does nothing.")
return EventsDeployment(**conf)
def _create_events_calls(
events: EventsDeployment,
global_settings: GlobalSettings
) -> Iterable[Tuple[List[str], str]]:
if events.addresses is not None and events.address is not None:
raise ValueError(
'Only one of `address` and `addresses` can be specified.'
)
service_deployment = _ServiceDeployment(
workers=events.workers,
mtap_config=events.mtap_config,
log_level=events.log_level
)
if events.addresses is None:
addresses = [events.address]
else:
addresses = events.addresses
for address in addresses:
host = None
port = None
if address:
splits = address.split(':')
if len(splits) == 2:
host, port = splits
if host == '':
host = None
else:
host = splits[0]
call = [PYTHON_EXE, '-m', 'mtap', 'events']
service_args, port = service_deployment.service_args(
host=host,
port=port,
global_host=global_settings.host,
mtap_config_default=global_settings.mtap_config,
log_level_default=global_settings.log_level
)
call.extend(service_args)
yield call, port
[docs]
@dataclass
class ProcessorDeployment:
"""Deployment configuration for an MTAP processor.
Used to construct the command for launching the processor. The processor
should be a Java Class with a main method or a Python module with a main
block. It should accept the standard MTAP processor deployment arguments
and launch an MTAP processor using :func:`mtap.run_processor` or the
equivalent Java method.
"""
implementation: str
"""Either "java" or "python"."""
entry_point: str
"""Either the java main class, or the python main module."""
enabled: bool = True
"""Whether the processor should be launched as part of deployment."""
instances: int = 1
"""The number of instances of the processor to launch."""
host: Optional[str] = None
"""The listening host for the processor service."""
port: Optional[int] = None
"""The listening port for the processor service."""
workers: Optional[int] = None
"""The number of worker threads per instance."""
mtap_config: Optional[str] = None
"""Path to the MTAP configuration file."""
log_level: Optional[str] = None
"""The log level for the processor."""
name: Optional[str] = None
"""An optional service name override to use for registration."""
sid: Union[str, List[str], None] = None
"""An optional service instance unique identifier. If instances is 1 this
should be a string, if instances is more than 1 this should be a list of
strings, one for each instance.
"""
pre_args: Optional[List[str]] = None
"""Arguments that occur prior to the MTAP service arguments (like host,
port, etc).
"""
additional_args: Optional[List[str]] = None
"""Arguments that occur after the MTAP service arguments."""
startup_timeout: Optional[float] = None
"""Optional override startup timeout."""
mp: Optional[bool] = None
"""Whether to use the multiprocessing pool based processor server."""
mp_processes: Optional[int] = None
"""If using multiprocessing host, the number of worker processes."""
mp_start_method: Optional[str] = None
"""A multiprocessing.get_context start method to use."""
[docs]
@staticmethod
def from_dict(conf: Dict) -> 'ProcessorDeployment':
"""Creates an MTAP processor deployment configuration from a
configuration dictionary.
Args:
conf: The configuration dictionary.
Returns:
ProcessorDeployment object that can be used to construct the call
for the processor.
"""
if "register" in conf:
del conf["register"]
print("The register setting has been deprecated from processor deployment configuration, and currently does nothing.")
return ProcessorDeployment(**conf)
def _create_processor_calls(
depl: ProcessorDeployment,
global_settings: GlobalSettings,
shared_config: SharedProcessorConfig
) -> Iterable[Tuple[List[str], str]]:
service_deployment = _ServiceDeployment(
workers=depl.workers,
mtap_config=depl.mtap_config,
log_level=depl.log_level
)
if isinstance(depl.port, list):
ports = depl.port
elif depl.port is None:
ports = [None] * depl.instances
else:
ports = list(range(depl.port, depl.port + depl.instances))
for port in ports:
yield from _create_processor_call(depl, global_settings, port,
service_deployment, shared_config)
def _create_processor_call(depl, global_settings, port, service_deployment,
shared_config):
call = _implementation_args(depl, shared_config)
if depl.pre_args is not None:
call.extend(depl.pre_args)
service_args, port = service_deployment.service_args(
host=depl.host,
port=port,
sid=depl.sid,
global_host=global_settings.host,
mtap_config_default=global_settings.mtap_config,
log_level_default=global_settings.log_level,
workers_default=shared_config.workers
)
call.extend(service_args)
if depl.name is not None:
call.extend(['--name', depl.name])
events_addresses = shared_config.events_addresses
if events_addresses is not None:
call.extend(['--events', ','.join(events_addresses)])
if depl.additional_args is not None:
call.extend(depl.additional_args)
if depl.mp:
call.extend(['--mp'])
if depl.mp_processes is not None:
call.extend([''])
if shared_config.additional_args is not None:
call.extend(shared_config.additional_args)
if (depl.implementation == 'java'
and shared_config.java_additional_args is not None):
call.extend(shared_config.java_additional_args)
yield call, port
def _implementation_args(depl, shared_config):
if depl.implementation == 'python':
call = [PYTHON_EXE, '-m', depl.entry_point]
elif depl.implementation == 'java':
call = [str(JAVA_EXE)]
if shared_config.jvm_args is not None:
call.extend(shared_config.jvm_args)
if shared_config.java_classpath is not None:
call.extend(['-cp', shared_config.java_classpath])
call.append(depl.entry_point)
else:
raise ValueError(
'Unrecognized implementation: ' + depl.implementation)
return call
[docs]
@dataclass
class Deployment:
"""An automatic deployment configuration which launches a configurable set
of MTAP services.
"""
global_settings: Optional[GlobalSettings] = None
"""Settings shared among all services."""
events_deployment: Optional[EventsDeployment] = None
"""Deployment settings for the events service."""
shared_processor_config: Optional[SharedProcessorConfig] = None
"""Shared configuration settings for all processors."""
processors: List[ProcessorDeployment] = None
"""Configurations for individual processors."""
[docs]
@staticmethod
def from_dict(conf: Dict) -> 'Deployment':
"""Creates a deployment object from a configuration dictionary.
Args:
conf: The configuration dictionary.
Returns:
Deployment object created.
"""
global_settings = GlobalSettings.from_dict(conf.get('global'))
events = EventsDeployment.from_dict(conf.get('events_service'))
shared_processor_config = SharedProcessorConfig.from_dict(
conf.get('shared_processor_config')
)
proc_confs = conf.get('processors', [])
processors = [ProcessorDeployment.from_dict(c) for c in proc_confs]
return Deployment(
global_settings,
events,
shared_processor_config,
processors
)
[docs]
@staticmethod
def from_yaml_file(conf_path: Union[str, bytes, PathLike]) -> 'Deployment':
"""Loads a deployment configuration from a yaml file.
Args:
conf_path: The path to the yaml configuration file.
Returns:
Deployment object created from the configuration.
"""
conf_path = pathlib.Path(conf_path)
from yaml import load
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
with conf_path.open('rb') as f:
conf = load(f, Loader=Loader)
return Deployment.from_dict(conf)
[docs]
@contextmanager
def run_servers(self) -> Tuple[List[str], List[List[str]]]:
"""A context manager that starts all the configured services in
subprocesses and returns.
Raises:
ServiceDeploymentException: If one or more of the services fails
to launch.
Examples
>>> deploy = Deployment.from_yaml_file('deploy_config.yml')
>>> with deploy.run_servers():
>>> # do something that requires the servers.
>>> # servers are automatically shutdown / terminated
"""
with _ActiveDeployment() as depl:
addresses = self._do_launch_all_processors(depl)
yield addresses
[docs]
def run_servers_and_wait(self):
"""Starts the specified servers and blocks until KeyboardInterrupt,
SIGINT, or SIGTERM are received.
Raises:
ServiceDeploymentException: If one or more of the services fails
to launch.
"""
e = threading.Event()
signal.signal(signal.SIGINT, lambda *_: e.set())
signal.signal(signal.SIGTERM, lambda *_: e.set())
with self.run_servers():
try:
e.wait()
except KeyboardInterrupt:
pass
def _do_launch_all_processors(self, depl: '_ActiveDeployment'):
c = _config.Config()
enable_proxy = c.get('grpc.enable_proxy', False)
events_addresses = []
# deploy events service
if self.events_deployment.enabled:
calls = _create_events_calls(self.events_deployment,
self.global_settings)
for call, port in calls:
# Start new session here because otherwise subprocesses
# get hit with signals meant for parent
events_address = depl.start_subprocess(
call,
"events",
port,
self.events_deployment.startup_timeout,
enable_proxy
)
events_addresses.append(events_address)
self.shared_processor_config.events_addresses = events_addresses
# deploy processors
all_processor_addresses = []
for processor_deployment in self.processors:
if processor_deployment.enabled:
calls = _create_processor_calls(
processor_deployment,
self.global_settings,
self.shared_processor_config,
)
processor_addresses = []
for call, port in calls:
logger.debug('Launching processor with call: %s', call)
# Start new session here because otherwise subprocesses get
# hit with signals meant for parent
startup_timeout = (
processor_deployment.startup_timeout
or self.shared_processor_config.startup_timeout
)
name = processor_deployment.name
if name is None:
entry_point = processor_deployment.entry_point
last_period = entry_point.rfind('.')
name = entry_point[last_period + 1:] if last_period > 0 else entry_point
address = depl.start_subprocess(
call,
name,
port,
startup_timeout,
enable_proxy
)
processor_addresses.append(address)
all_processor_addresses.append(processor_addresses)
print('Done deploying all servers.', flush=True)
return events_addresses, all_processor_addresses
class _ActiveDeployment:
def __init__(self):
self._processor_listeners = []
def __enter__(self):
return self
def __exit__(self, __exc_type, __exc_value, __traceback):
self.shutdown()
def start_subprocess(
self,
call: List[str],
name: Any,
port: int,
startup_timeout: float,
enable_proxy: bool = False
) -> str:
# starts process and listener, stores for later cleanup, returns
# address.
p = subprocess.Popen(call,
start_new_session=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
logger_name = f'{name}'
listener = threading.Thread(target=_listen, args=(p, logger_name))
listener.start()
self._processor_listeners.append(
(p, listener)
) # Adding early so it gets cleaned up on failure
address = f'localhost:{port}'
with grpc.insecure_channel(
address,
options=[('grpc.enable_http_proxy', enable_proxy)]
) as channel:
future = grpc.channel_ready_future(channel)
try:
future.result(timeout=startup_timeout)
except grpc.FutureTimeoutError:
raise ServiceDeploymentException(
name,
f'Failed to launch, unresponsive: {name}'
)
return address
def shutdown(self):
print("Shutting down all processors")
excs = []
for p, listener in self._processor_listeners:
try:
p.terminate()
listener.join(timeout=15.0)
if listener.is_alive():
print(
f'Unsuccessfully terminated processor {p.args}'
f' ... killing.')
p.kill()
listener.join()
except Exception as e:
print(f"Failed to properly shutdown processor {p.args}")
excs.append(e)
def main(args: Optional[Sequence[str]] = None,
conf: Optional[argparse.Namespace] = None):
if conf is None:
conf = deployment_parser().parse_args(args)
if conf.log_level is not None:
logging.basicConfig(level=conf.log_level)
if conf.mode == 'run_servers':
deployment = Deployment.from_yaml_file(conf.deploy_config)
deployment.run_servers()
if conf.mode == 'write_example':
example = files(
"mtap.examples"
).joinpath(
"exampleDeploymentConfiguration.yml"
)
with as_file(example) as path:
shutil.copyfile(os.fspath(path),
"exampleDeploymentConfiguration.yml")
print('Writing "exampleDeploymentConfiguration.yml" to ' + str(
pathlib.Path.cwd()))
def deployment_parser() -> argparse.ArgumentParser:
"""Creates a parser for configuration that can be passed to the deployment
main method.
Returns:
The argument parser object that will create a namespace that can
be passed to :func:`main`.
"""
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'--log-level',
metavar='LEVEL',
help="The log level to use for the deployment script."
)
subparsers = parser.add_subparsers(title='mode')
run_servers = subparsers.add_parser('run_servers')
run_servers.add_argument(
'deploy_config',
metavar='CONFIG_FILE',
type=pathlib.Path,
help="A path to the deployment configuration to deploy."
)
run_servers.set_defaults(mode='run_servers')
write_example = subparsers.add_parser('write_example')
write_example.set_defaults(mode='write_example')
return parser
if __name__ == '__main__':
main()