Source code for fkat.utils.pool

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from concurrent.futures import ThreadPoolExecutor, Future
from typing import Any, TypeVar
import multiprocessing as mp
from multiprocessing.pool import AsyncResult
from collections.abc import Callable, Iterable, Mapping


T = TypeVar("T", covariant=False)
T_co = TypeVar("T_co", covariant=True)


[docs]class FutureResult(AsyncResult[T]): """An AsyncResult implementation for concurrent.future Future object""" def __init__(self, fut: Future[T]) -> None: self.fut = fut
[docs] def ready(self) -> bool: return self.fut.done()
[docs] def get(self, timeout: float | None = None) -> T: return self.fut.result(timeout)
[docs] def wait(self, timeout: float | None = None) -> None: self.fut.exception(timeout)
[docs] def successful(self) -> bool: return self.fut.exception() is None
[docs]class ThreadPool: """A multiprocessing Pool-like implementation that uses ThreadPoolExecutor""" def __init__(self, **kwargs: Any) -> None: self.pool = ThreadPoolExecutor(**kwargs)
[docs] def apply_async( self, func: Callable[..., T_co], args: Iterable[Any] | None = None, kwds: Mapping[str, Any] | None = None, ) -> FutureResult[T_co]: fut = self.pool.submit(func, *(args or ()), **(kwds or {})) return FutureResult(fut)
[docs] def close(self) -> None: self.pool.shutdown()
[docs] def join(self) -> None: if self.pool._shutdown: self.close() else: self.pool.submit(lambda: None).result()
[docs]class NoDaemonProcess(mp.Process): """A Process implementation that never runs in daemon mode""" @property def daemon(self) -> bool: return False @daemon.setter def daemon(self, value: bool) -> None: pass
[docs]class NoDaemonContext(type(mp.get_context())): # type: ignore[misc] """A multiprocessing Context that uses NoDaemonProcess""" Process = NoDaemonProcess
[docs]class NoDaemonPool(mp.pool.Pool): # type: ignore[unresolved-attribute] """A multiprocessing Pool that uses NoDaemonContext""" def __init__(self, *args: Any, **kwargs: Any) -> None: kwargs["context"] = NoDaemonContext() super().__init__(*args, **kwargs)