Module ambianic.util

Service classes for OS interaction and multithreading.

Expand source code
"""Service classes for OS interaction and multithreading."""
import asyncio
import json
import logging
import time
import traceback
from abc import abstractmethod
from threading import Event, Thread

import numpy as np

log = logging.getLogger(__name__)


class ManagedService:
    """A service contract with lifecycle methods."""

    @abstractmethod
    def start(self, **kwargs):
        """Start and run until finished or asked to stop()."""

    @abstractmethod
    def stop(self):
        """Exit start() method as soon as possible.

        Delay to exit may result in forceful process termination.
        """

    @abstractmethod
    def healthcheck(self):
        """Report vital health information.

        :Returns:
        -------
        touple (time, string)
            (latest_heartbeat_timestamp, status_code)
            latest_heartbeat_timestamp is in the format of time.monotonic().
            status_code should have a semantic mapping to the service health.

        """
        return time.monotonic(), "OK"

    def heal(self):
        """Inspect and repair the state of the job.

        This method is normally invoked from a management service when
        a long running job doesn't seem healthy.
        Most likely because healthstate() returns bad reports:
        such as outdated heartbeat timestamp or a bad status code.
        The method should try to bring the job back to a healthy state
        as soon as possible to prevent forceful termination.
        """


class ThreadedJob(Thread, ManagedService):
    """A job that runs in its own python thread.

    Jobs managed by Threaded Job must implement all ManagedService methods.
    """

    # Reminder: even though multiple processes can work well for pipelines,
    # since they are mostly independent,
    # Google Coral does not allow access to it from different processes yet.
    # Consider moving access to coral in a separate process that can serve
    # an inference task queue from multiple pipelines.

    def __init__(self, job=None, **kwargs):
        """Inititalize with a ManagedService.

        :Parameters:
        ----------
        job : ManagedService
            The underlying service wrapped in this thread.
        """
        Thread.__init__(self, daemon=True)
        assert isinstance(job, ManagedService)
        self.job = job
        # The shutdown_flag is a threading.Event object that
        # indicates whether the thread should be terminated.
        # self.shutdown_flag = threading.Event()
        # ... Other thread setup code here ...
        self._stop_requested = Event()

    def run(self):
        log.info(
            "Thread #%s started with job: %s", self.ident, self.job.__class__.__name__
        )
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        self.job.start()
        log.info(
            "Thread #%s for job %s stopped", self.ident, self.job.__class__.__name__
        )

    def stop(self):
        log.debug(
            "Thread #%s for job %s is signalled to stop" "Passing request to job.",
            self.ident,
            self.job.__class__.__name__,
        )
        self._stop_requested.set()
        self.job.stop()

    def heal(self):
        log.debug(
            "Thread #%s for job %s is signalled to heal." "Passing request to job.",
            self.ident,
            self.job.__class__.__name__,
        )
        self.job.heal()
        log.debug(
            "Thread #%s for job %s completed heal request.",
            self.ident,
            self.job.__class__.__name__,
        )

    def healthcheck(self):
        log.debug(
            "Thread #%s for job %s healthcheck requested." "Passing request to job.",
            self.ident,
            self.job.__class__.__name__,
        )
        health_status = self.job.healthcheck()
        return health_status


class ServiceExit(Exception):
    """Main program exit signal.

    Custom exception which is used to trigger clean exit
    of all running threads and the main program.

    Method for controlled multi-threaded Python app exit
    suggested by George Notaras:
    https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/
    """


def stacktrace():
    """Get stack trace as a multi line string.

    :Returns:
    -------
    string
        Milti-line string of current stack trace.

    """
    formatted_lines = traceback.format_exc().splitlines()
    strace = "Runtime stack trace:\n %s" + "\n".join(formatted_lines)
    return strace


class JsonEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        if isinstance(obj, np.floating):
            return float(obj)
        if isinstance(obj, np.ndarray):
            return obj.tolist()

        return super().default(obj)


def jsonify(val: any = None):
    return json.dumps(val, cls=JsonEncoder)

Functions

def jsonify(val:  = None)
Expand source code
def jsonify(val: any = None):
    return json.dumps(val, cls=JsonEncoder)
def stacktrace()

Get stack trace as a multi line string.

:Returns:

string Milti-line string of current stack trace.

Expand source code
def stacktrace():
    """Get stack trace as a multi line string.

    :Returns:
    -------
    string
        Milti-line string of current stack trace.

    """
    formatted_lines = traceback.format_exc().splitlines()
    strace = "Runtime stack trace:\n %s" + "\n".join(formatted_lines)
    return strace

Classes

class JsonEncoder (*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)

Extensible JSON http://json.org encoder for Python data structures.

Supports the following objects and types by default:

+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+

To extend this to recognize other objects, subclass and implement a .default() method with another method that returns a serializable object for o if possible, otherwise it should call the superclass implementation (to raise TypeError).

Constructor for JSONEncoder, with sensible defaults.

If skipkeys is false, then it is a TypeError to attempt encoding of keys that are not str, int, float or None. If skipkeys is True, such items are simply skipped.

If ensure_ascii is true, the output is guaranteed to be str objects with all incoming non-ASCII characters escaped. If ensure_ascii is false, the output can contain non-ASCII characters.

If check_circular is true, then lists, dicts, and custom encoded objects will be checked for circular references during encoding to prevent an infinite recursion (which would cause an OverflowError). Otherwise, no such check takes place.

If allow_nan is true, then NaN, Infinity, and -Infinity will be encoded as such. This behavior is not JSON specification compliant, but is consistent with most JavaScript based encoders and decoders. Otherwise, it will be a ValueError to encode such floats.

If sort_keys is true, then the output of dictionaries will be sorted by key; this is useful for regression tests to ensure that JSON serializations can be compared on a day-to-day basis.

If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation.

If specified, separators should be an (item_separator, key_separator) tuple. The default is (', ', ': ') if indent is None and (',', ': ') otherwise. To get the most compact JSON representation, you should specify (',', ':') to eliminate whitespace.

If specified, default is a function that gets called for objects that can't otherwise be serialized. It should return a JSON encodable version of the object or raise a TypeError.

Expand source code
class JsonEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        if isinstance(obj, np.floating):
            return float(obj)
        if isinstance(obj, np.ndarray):
            return obj.tolist()

        return super().default(obj)

Ancestors

  • json.encoder.JSONEncoder

Methods

def default(self, obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
Expand source code
def default(self, obj):
    if isinstance(obj, np.integer):
        return int(obj)
    if isinstance(obj, np.floating):
        return float(obj)
    if isinstance(obj, np.ndarray):
        return obj.tolist()

    return super().default(obj)
class ManagedService

A service contract with lifecycle methods.

Expand source code
class ManagedService:
    """A service contract with lifecycle methods."""

    @abstractmethod
    def start(self, **kwargs):
        """Start and run until finished or asked to stop()."""

    @abstractmethod
    def stop(self):
        """Exit start() method as soon as possible.

        Delay to exit may result in forceful process termination.
        """

    @abstractmethod
    def healthcheck(self):
        """Report vital health information.

        :Returns:
        -------
        touple (time, string)
            (latest_heartbeat_timestamp, status_code)
            latest_heartbeat_timestamp is in the format of time.monotonic().
            status_code should have a semantic mapping to the service health.

        """
        return time.monotonic(), "OK"

    def heal(self):
        """Inspect and repair the state of the job.

        This method is normally invoked from a management service when
        a long running job doesn't seem healthy.
        Most likely because healthstate() returns bad reports:
        such as outdated heartbeat timestamp or a bad status code.
        The method should try to bring the job back to a healthy state
        as soon as possible to prevent forceful termination.
        """

Subclasses

Methods

def heal(self)

Inspect and repair the state of the job.

This method is normally invoked from a management service when a long running job doesn't seem healthy. Most likely because healthstate() returns bad reports: such as outdated heartbeat timestamp or a bad status code. The method should try to bring the job back to a healthy state as soon as possible to prevent forceful termination.

Expand source code
def heal(self):
    """Inspect and repair the state of the job.

    This method is normally invoked from a management service when
    a long running job doesn't seem healthy.
    Most likely because healthstate() returns bad reports:
    such as outdated heartbeat timestamp or a bad status code.
    The method should try to bring the job back to a healthy state
    as soon as possible to prevent forceful termination.
    """
def healthcheck(self)

Report vital health information.

:Returns:

touple (time, string) (latest_heartbeat_timestamp, status_code) latest_heartbeat_timestamp is in the format of time.monotonic(). status_code should have a semantic mapping to the service health.

Expand source code
@abstractmethod
def healthcheck(self):
    """Report vital health information.

    :Returns:
    -------
    touple (time, string)
        (latest_heartbeat_timestamp, status_code)
        latest_heartbeat_timestamp is in the format of time.monotonic().
        status_code should have a semantic mapping to the service health.

    """
    return time.monotonic(), "OK"
def start(self, **kwargs)

Start and run until finished or asked to stop().

Expand source code
@abstractmethod
def start(self, **kwargs):
    """Start and run until finished or asked to stop()."""
def stop(self)

Exit start() method as soon as possible.

Delay to exit may result in forceful process termination.

Expand source code
@abstractmethod
def stop(self):
    """Exit start() method as soon as possible.

    Delay to exit may result in forceful process termination.
    """
class ServiceExit (*args, **kwargs)

Main program exit signal.

Custom exception which is used to trigger clean exit of all running threads and the main program.

Method for controlled multi-threaded Python app exit suggested by George Notaras: https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/

Expand source code
class ServiceExit(Exception):
    """Main program exit signal.

    Custom exception which is used to trigger clean exit
    of all running threads and the main program.

    Method for controlled multi-threaded Python app exit
    suggested by George Notaras:
    https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/
    """

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ThreadedJob (job=None, **kwargs)

A job that runs in its own python thread.

Jobs managed by Threaded Job must implement all ManagedService methods.

Inititalize with a ManagedService.

:Parameters:

job : ManagedService The underlying service wrapped in this thread.

Expand source code
class ThreadedJob(Thread, ManagedService):
    """A job that runs in its own python thread.

    Jobs managed by Threaded Job must implement all ManagedService methods.
    """

    # Reminder: even though multiple processes can work well for pipelines,
    # since they are mostly independent,
    # Google Coral does not allow access to it from different processes yet.
    # Consider moving access to coral in a separate process that can serve
    # an inference task queue from multiple pipelines.

    def __init__(self, job=None, **kwargs):
        """Inititalize with a ManagedService.

        :Parameters:
        ----------
        job : ManagedService
            The underlying service wrapped in this thread.
        """
        Thread.__init__(self, daemon=True)
        assert isinstance(job, ManagedService)
        self.job = job
        # The shutdown_flag is a threading.Event object that
        # indicates whether the thread should be terminated.
        # self.shutdown_flag = threading.Event()
        # ... Other thread setup code here ...
        self._stop_requested = Event()

    def run(self):
        log.info(
            "Thread #%s started with job: %s", self.ident, self.job.__class__.__name__
        )
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        self.job.start()
        log.info(
            "Thread #%s for job %s stopped", self.ident, self.job.__class__.__name__
        )

    def stop(self):
        log.debug(
            "Thread #%s for job %s is signalled to stop" "Passing request to job.",
            self.ident,
            self.job.__class__.__name__,
        )
        self._stop_requested.set()
        self.job.stop()

    def heal(self):
        log.debug(
            "Thread #%s for job %s is signalled to heal." "Passing request to job.",
            self.ident,
            self.job.__class__.__name__,
        )
        self.job.heal()
        log.debug(
            "Thread #%s for job %s completed heal request.",
            self.ident,
            self.job.__class__.__name__,
        )

    def healthcheck(self):
        log.debug(
            "Thread #%s for job %s healthcheck requested." "Passing request to job.",
            self.ident,
            self.job.__class__.__name__,
        )
        health_status = self.job.healthcheck()
        return health_status

Ancestors

Methods

def run(self)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Expand source code
def run(self):
    log.info(
        "Thread #%s started with job: %s", self.ident, self.job.__class__.__name__
    )
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    self.job.start()
    log.info(
        "Thread #%s for job %s stopped", self.ident, self.job.__class__.__name__
    )

Inherited members