Source code for mtap.pipeline._sources
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from abc import ABC, abstractmethod
from os import PathLike
from pathlib import Path
from typing import Union, Optional, ContextManager, Generator, Iterable, Collection
from mtap._event import Event
from mtap._events_client import EventsClient
from mtap.pipeline._common import EventLike
[docs]
class ProcessingSource(ContextManager, ABC):
"""Provides events or documents for the multithreaded pipeline runner.
Also has functionality for receiving results.
"""
__slots__ = ()
_total: Optional[int] = None
@property
def total(self) -> Optional[int]:
"""The total number of documents this source will provide.
Returns:
Total number of events this source will provide or ``None`` if not
known.
"""
return self._total
@total.setter
def total(self, count: Optional[int]):
self._total = count
[docs]
@abstractmethod
def produce(self) -> Generator[EventLike, None, None]:
"""The method which provides documents or events to the pipeline
to batch process.
Examples:
Example implementation for processing text documents:
>>> ...
>>> def produce(self, consume):
>>> for file in Path(".").glob("*.txt"):
>>> with file.open('r') as fio:
>>> txt = fio.read()
>>> with Event(client=client) as e:
>>> doc = event.create_document('plaintext', txt)
>>> consume(doc)
"""
...
[docs]
def close(self):
"""Optional method: called to clean up after processing is complete.
Default behavior is to do nothing.
"""
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return None
class IterableProcessingSource(ProcessingSource):
"""Wraps an iterable in a ProcessingSource for the multi-thread processor.
"""
__slots__ = ('it',)
def __init__(self, source: Union[Iterable[EventLike], Collection[EventLike]]):
# We use an iterator here to can ensure that it gets closed on
# unexpected / early termination and any caller context managers are
# exited before their client gets shut down.
# Using a for-in loop we're not guaranteed, which can cause zombie
# unclosed events on the event service.
self.it = iter(source)
try:
self.total = len(source)
except (AttributeError, TypeError):
pass
def produce(self):
while True:
try:
target = next(self.it)
except StopIteration:
break
yield target
def close(self):
try:
self.it.close()
except AttributeError:
pass
[docs]
class FilesInDirectoryProcessingSource(ProcessingSource):
"""Processing source for pipelines which iterates over files in a
directory.
Args:
count_total: Should the ``count_total`` attribute be populated by
iterating through the directory once to count all matching files.
Attributes:
client: Create the events using this client.
directory: The path to the directory of files to process.
document_name: Creates documents with this name and adds the file's
plain text.
extension_glob: A glob used to filter documents from the directory.
total: The total number of documents.
errors: The errors argument for :func:`open`.
Examples:
>>> with Pipeline(...) as pipeline:
>>> pipeline.run_multithread(
>>> FilesInDirectoryProcessingSource("/path/to/docs")
>>> )
"""
__slots__ = (
'client',
'directory',
'document_name',
'extension_glob',
'errors',
'total'
)
def __init__(self, client: EventsClient,
directory: Union[str, bytes, PathLike],
*, document_name: str = 'plaintext',
extension_glob: str = '*.txt',
count_total: bool = True,
errors: Optional[str] = None):
self.client = client
if not os.path.isdir(directory):
raise ValueError(f'Invalid input directory: {directory}')
self.directory = directory
self.document_name = document_name
self.extension_glob = extension_glob
self.errors = errors
if count_total:
self.total = sum(1 for _ in Path(directory).rglob(self.extension_glob))
def produce(self):
for path in Path(self.directory).rglob(self.extension_glob):
with path.open('r', errors=self.errors) as f:
txt = f.read()
relative = str(path.relative_to(self.directory))
with Event(event_id=relative, client=self.client, only_create_new=True) as e:
doc = e.create_document(self.document_name, txt)
yield doc