# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import threading
import uuid
from concurrent.futures import thread
from logging.handlers import QueueListener
from typing import Any, Dict, Optional, Sequence, Tuple, Union
import grpc
from grpc_health.v1 import health, health_pb2_grpc
from grpc_status import rpc_status
from mtap import _config, _structs
from mtap._common import run_server_forever
from mtap._event import Event
from mtap.api.v1 import processing_pb2, processing_pb2_grpc
from mtap.processing import _runners, Processor, EventProcessor
from mtap.processing._exc import ProcessingException
from mtap.processing._processing_component import ProcessingComponent
from mtap.processing.results import add_times, create_timer_stats
logger = logging.getLogger('mtap.processing')
[docs]
def run_processor(proc: EventProcessor,
*,
options: Optional[argparse.Namespace] = None,
args: Optional[Sequence[str]] = None,
mp_context=None):
"""Runs the processor as a GRPC service, blocking until an interrupt signal
is received.
Args:
proc: The processor to host.
mp: If true, will create instances of ``proc`` on multiple
forked processes to process events. This is useful if the processor
is computationally intensive and would run into Python GIL issues
on a single process.
options: The parsed arguments
from the parser returned by :func:`processor_parser`.
args: Arguments to parse
server settings from if ``namespace`` was not supplied.
mp_context: A multiprocessing context that gets passed to the process
pool executor in the case of mp = True.
Examples:
Will automatically parse arguments:
>>> run_processor(MyProcessor())
Manual arguments:
>>> run_processor(MyProcessor(), args=['-p', '8080'])
"""
if not isinstance(proc, EventProcessor):
print("Processor must be instance of EventProcessor class.")
exit(1)
if options is None:
processors_parser = argparse.ArgumentParser(
parents=[processor_parser()]
)
processors_parser.add_help = True
options = processors_parser.parse_args(args)
if options.write_address:
logger.warning("The --write-address option is deprecated and does not do anything.")
if options.log_level:
logging.basicConfig(level=getattr(logging, options.log_level))
events_addresses = []
if options.events_addresses is not None:
events_addresses.extend(options.events_addresses.split(','))
with _config.Config() as c:
if options.mtap_config is not None:
c.update_from_yaml(options.mtap_config)
# instantiate runner
name = options.name or proc.metadata['name']
sid = options.sid
enable_http_proxy = options.grpc_enable_http_proxy
if enable_http_proxy is not None:
c['grpc.events_channel_options.grpc.enable_http_proxy'] \
= enable_http_proxy
if options.mp:
runner = MpProcessorRunner(proc=proc,
mp_processes=options.workers,
events_address=events_addresses,
processor_name=name,
mp_context=mp_context,
log_level=options.log_level)
else:
runner = _runners.LocalRunner(proc,
events_address=events_addresses)
server = ProcessorServer(runner=runner,
sid=sid,
host=options.host,
port=options.port,
workers=options.workers)
run_server_forever(server)
_mp_processor = ... # EventProcessor
_mp_client = ... # EventClient
def _mp_initialize(proc: EventProcessor,
events_address,
config,
log_queue):
global _mp_processor
global _mp_client
h = logging.handlers.QueueHandler(log_queue)
root = logging.getLogger()
root.addHandler(h)
root.setLevel(logging.DEBUG)
_config.Config(config)
_mp_processor = proc
from mtap import events_client
_mp_client = events_client(address=events_address)
def _mp_call_process(event_id, event_service_instance_id, params):
global _mp_processor
global _mp_client
with Processor.enter_context() as c, \
Event(
event_id=event_id,
event_service_instance_id=event_service_instance_id,
client=_mp_client
) as event:
with Processor.started_stopwatch('process_method'):
result = _mp_processor.process(event, params)
return result, c.times, event.created_indices
class MpProcessorRunner:
__slots__ = (
'pool',
'metadata',
'processor_name',
'component_id',
'log_listener',
)
def __init__(self,
proc: EventProcessor,
processor_name: str,
component_id: Optional[str] = None,
mp_processes: Optional[int] = 8,
events_address: Optional[Union[str, Sequence[str]]] = None,
mp_context=None,
mp_start_method=None,
log_level=None):
if mp_context is None:
import multiprocessing as mp
mp_context = mp.get_context(mp_start_method)
config = _config.Config()
log_queue = mp_context.Queue(-1)
handler = logging.StreamHandler()
log_level = 'INFO' if log_level is None else log_level
handler.setLevel(log_level)
self.log_listener = QueueListener(log_queue, handler)
self.log_listener.start()
self.pool = mp_context.Pool(
mp_processes,
initializer=_mp_initialize,
initargs=(proc, events_address, dict(config), log_queue)
)
self.metadata = proc.metadata
self.processor_name = processor_name
self.component_id = component_id or processor_name
def call_process(
self,
event_id: str,
event_service_instance_id: str,
params: Optional[Dict[str, Any]]
) -> Tuple[Dict, Dict, Dict]:
p = dict()
if params is not None:
p.update(params)
return self.pool.apply(_mp_call_process,
args=(event_id, event_service_instance_id, p))
def close(self):
self.pool.terminate()
self.pool.join()
self.log_listener.stop()
[docs]
def processor_parser() -> argparse.ArgumentParser:
"""An :class:`~argparse.ArgumentParser` that can be used to parse the
settings for :func:`run_processor`.
Returns:
A parser containing server settings.
Examples:
Using this as a parent parser:
>>> parser = ArgumentParser(parents=[processor_parser()])
>>> parser.add_argument('--my-arg-1')
>>> parser.add_argument('--my-arg-2')
>>> args = parser.parse_args()
>>> processor = MyProcessor(args.my_arg_1, args.my_arg_2)
>>> run_processor(processor, args)
"""
processors_parser = argparse.ArgumentParser(add_help=False)
processors_parser.add_argument(
'--host', '--address', '-a',
default="127.0.0.1",
metavar="HOST",
help='the address to serve the service on'
)
processors_parser.add_argument(
'--port', '-p',
type=int,
default=0,
metavar="PORT",
help='the port to serve the service on'
)
processors_parser.add_argument(
'--workers', '-w',
type=int,
default=10,
help='number of worker threads to handle requests'
)
processors_parser.add_argument(
"--mtap-config",
default=None,
help="path to MTAP config file"
)
processors_parser.add_argument(
'--events-addresses', '--events-address', '--events', '-e',
default=None,
help='address of the events service to use'
)
processors_parser.add_argument(
'--name', '-n',
help="Optional override service name, defaults to the processor "
"annotation"
)
processors_parser.add_argument(
'--sid',
help="A unique identifier for this instance of a processor service."
)
processors_parser.add_argument(
'--write-address', action='store_true',
help="If set, will write the server address to a designated location."
)
processors_parser.add_argument(
'--log-level',
type=str,
default='INFO',
help="Sets the python log level."
)
processors_parser.add_argument(
'--grpc-enable-http-proxy',
action='store_true',
help="If set, will enable usage of http_proxy by grpc."
)
processors_parser.add_argument(
'--mp', action='store_true',
help="Whether to use the multiprocessing pool based processor server."
)
processors_parser.add_argument(
'--mp-processes',
type=int,
default=2,
help="If using multiprocessing host, the number of worker processes."
)
processors_parser.add_argument(
'--mp-start-method',
default='spawn',
choices=['spawn', 'fork', 'forkserver', 'None'],
help="A multiprocessing.get_context start method to use."
)
return processors_parser
def _label_index_meta_to_proto(d, message):
message.name = d['name'] or ''
message.name_from_parameter = d['name_from_parameter'] or ''
message.optional = d.get('optional', False)
message.description = d['description'] or ''
for property_meta in d['properties']:
p = message.properties.add()
p.name = property_meta['name'] or ''
p.description = property_meta['description'] or ''
p.data_type = property_meta['data_type'] or ''
p.nullable = property_meta['nullable']
class _ProcessorServicer(processing_pb2_grpc.ProcessorServicer):
def __init__(self, runner: ProcessingComponent):
self._runner = runner
self._times_map = {}
self.processed = 0
self.failure_count = 0
def Process(self, request, context=None):
event_id = request.event_id
event_service_instance_id = request.event_service_instance_id
logger.debug(
'%s received process request on event: (%s, %s)',
self._runner.processor_name, event_id, event_service_instance_id)
params = {}
_structs.copy_struct_to_dict(request.params, params)
try:
response = processing_pb2.ProcessResponse()
result, times, added_indices = self._runner.call_process(
event_id,
event_service_instance_id,
params
)
if result is not None:
_structs.copy_dict_to_struct(result, response.result, [])
add_times(self._times_map, times)
for k, l in times.items():
response.timing_info[k].FromTimedelta(l)
for document_name, l in added_indices.items():
for index_name in l:
created_index = response.created_indices.add()
created_index.document_name = document_name
created_index.index_name = index_name
return response
except ProcessingException as e:
logger.error(e, exc_info=True)
context.abort_with_status(rpc_status.to_status(e.to_rpc_status()))
def GetStats(self, request, context):
r = processing_pb2.GetStatsResponse(processed=self.processed,
failures=self.failure_count)
tsd = create_timer_stats(self._times_map,
self._runner.component_id)
for k, v in tsd.items():
ts = r.timing_stats[k]
ts.mean.FromTimedelta(v.mean)
ts.std.FromTimedelta(v.std)
ts.max.FromTimedelta(v.max)
ts.min.FromTimedelta(v.min)
ts.sum.FromTimedelta(v.sum)
return r
def GetInfo(self, request, context):
response = processing_pb2.GetInfoResponse()
_structs.copy_dict_to_struct(self._runner.metadata, response.metadata)
return response
class ProcessorServer:
"""Host a MTAP processor as a service.
Normally should not need to use this class as :func:`run_processor` should
be sufficient for ordinary needs.
Args:
runner: A processing component runner to host.
port: The port to host the server on, or 0 to use a random port.
workers: The number of workers that should handle requests. Defaults
to 10.
Attributes:
host: The address / hostname / IP to host the server on.
processor_name: The processor's service name.
sid: The service instance unique identifier for the processor.
"""
host: str
processor_name: str
sid: str
def __init__(self,
runner: ProcessingComponent,
host: str,
port: int = 0,
*,
sid: Optional[str] = None,
workers: Optional[int] = None,
write_address: bool = False):
self.host = host
self.processor_name = runner.processor_name
self.sid = sid or str(uuid.uuid4())
if write_address:
logger.warning("The write_address option is deprecated and does not do anything")
health_servicer = health.HealthServicer()
health_servicer.set('', 'SERVING')
health_servicer.set(self.processor_name, 'SERVING')
self._runner = runner
workers = workers or 10
self._thread_pool = thread.ThreadPoolExecutor(max_workers=workers)
config = _config.Config()
options = config.get("grpc.processor_options", {})
self._server = grpc.server(self._thread_pool, options=list(options.items()))
health_pb2_grpc.add_HealthServicer_to_server(health_servicer,
self._server)
servicer = _ProcessorServicer(
runner=runner
)
processing_pb2_grpc.add_ProcessorServicer_to_server(servicer,
self._server)
self._port = self._server.add_insecure_port("{}:{}".format(self.host,
port))
if port != 0 and self._port != port:
raise ValueError(f"Unable to bind on port {port}, likely in use.")
self._stopped_event = threading.Event()
self._address_file = None
@property
def port(self) -> int:
"""Port the hosted server is bound to.
"""
return self._port
def start(self):
"""Starts the service.
"""
self._server.start()
logger.info(
'Started processor server with name: "%s" on address: "%s:%d"',
self.processor_name, self.host, self.port)
def stop(self, *, grace: Optional[float] = None) -> threading.Event:
"""Stops accepting new requests and completely shuts down after a specified
grace time.
During the grace period the server will continue to process existing
requests, but it will not accept any new requests. This function is
idempotent, multiple calls will shut down the server after the soonest
grace to expire, calling the shutdown event for all calls to
this function.
Args:
grace: The grace period that the server should continue processing
requests or ``None`` if it should shut down immediately.
Returns:
A shutdown event for the server.
"""
print(
f'Shutting down processor server with name: '
f'"{self.processor_name}" on address: "{self.host}:{self.port}"'
)
if self._address_file is not None:
self._address_file.unlink()
self._thread_pool.shutdown()
return self._server.stop(grace)