Module ambianic.util
Service classes for OS interaction and multithreading.
Expand source code
"""Service classes for OS interaction and multithreading."""
import asyncio
import json
import logging
import time
import traceback
from abc import abstractmethod
from threading import Event, Thread
import numpy as np
log = logging.getLogger(__name__)
class ManagedService:
"""A service contract with lifecycle methods."""
@abstractmethod
def start(self, **kwargs):
"""Start and run until finished or asked to stop()."""
@abstractmethod
def stop(self):
"""Exit start() method as soon as possible.
Delay to exit may result in forceful process termination.
"""
@abstractmethod
def healthcheck(self):
"""Report vital health information.
:Returns:
-------
touple (time, string)
(latest_heartbeat_timestamp, status_code)
latest_heartbeat_timestamp is in the format of time.monotonic().
status_code should have a semantic mapping to the service health.
"""
return time.monotonic(), "OK"
def heal(self):
"""Inspect and repair the state of the job.
This method is normally invoked from a management service when
a long running job doesn't seem healthy.
Most likely because healthstate() returns bad reports:
such as outdated heartbeat timestamp or a bad status code.
The method should try to bring the job back to a healthy state
as soon as possible to prevent forceful termination.
"""
class ThreadedJob(Thread, ManagedService):
"""A job that runs in its own python thread.
Jobs managed by Threaded Job must implement all ManagedService methods.
"""
# Reminder: even though multiple processes can work well for pipelines,
# since they are mostly independent,
# Google Coral does not allow access to it from different processes yet.
# Consider moving access to coral in a separate process that can serve
# an inference task queue from multiple pipelines.
def __init__(self, job=None, **kwargs):
"""Inititalize with a ManagedService.
:Parameters:
----------
job : ManagedService
The underlying service wrapped in this thread.
"""
Thread.__init__(self, daemon=True)
assert isinstance(job, ManagedService)
self.job = job
# The shutdown_flag is a threading.Event object that
# indicates whether the thread should be terminated.
# self.shutdown_flag = threading.Event()
# ... Other thread setup code here ...
self._stop_requested = Event()
def run(self):
log.info(
"Thread #%s started with job: %s", self.ident, self.job.__class__.__name__
)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.job.start()
log.info(
"Thread #%s for job %s stopped", self.ident, self.job.__class__.__name__
)
def stop(self):
log.debug(
"Thread #%s for job %s is signalled to stop" "Passing request to job.",
self.ident,
self.job.__class__.__name__,
)
self._stop_requested.set()
self.job.stop()
def heal(self):
log.debug(
"Thread #%s for job %s is signalled to heal." "Passing request to job.",
self.ident,
self.job.__class__.__name__,
)
self.job.heal()
log.debug(
"Thread #%s for job %s completed heal request.",
self.ident,
self.job.__class__.__name__,
)
def healthcheck(self):
log.debug(
"Thread #%s for job %s healthcheck requested." "Passing request to job.",
self.ident,
self.job.__class__.__name__,
)
health_status = self.job.healthcheck()
return health_status
class ServiceExit(Exception):
"""Main program exit signal.
Custom exception which is used to trigger clean exit
of all running threads and the main program.
Method for controlled multi-threaded Python app exit
suggested by George Notaras:
https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/
"""
def stacktrace():
"""Get stack trace as a multi line string.
:Returns:
-------
string
Milti-line string of current stack trace.
"""
formatted_lines = traceback.format_exc().splitlines()
strace = "Runtime stack trace:\n %s" + "\n".join(formatted_lines)
return strace
class JsonEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
return super().default(obj)
def jsonify(val: any = None):
return json.dumps(val, cls=JsonEncoder)
Functions
def jsonify(val:
= None) -
Expand source code
def jsonify(val: any = None): return json.dumps(val, cls=JsonEncoder)
def stacktrace()
-
Get stack trace as a multi line string.
:Returns:
string Milti-line string of current stack trace.
Expand source code
def stacktrace(): """Get stack trace as a multi line string. :Returns: ------- string Milti-line string of current stack trace. """ formatted_lines = traceback.format_exc().splitlines() strace = "Runtime stack trace:\n %s" + "\n".join(formatted_lines) return strace
Classes
class JsonEncoder (*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)
-
Extensible JSON http://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable object foro
if possible, otherwise it should call the superclass implementation (to raiseTypeError
).Constructor for JSONEncoder, with sensible defaults.
If skipkeys is false, then it is a TypeError to attempt encoding of keys that are not str, int, float or None. If skipkeys is True, such items are simply skipped.
If ensure_ascii is true, the output is guaranteed to be str objects with all incoming non-ASCII characters escaped. If ensure_ascii is false, the output can contain non-ASCII characters.
If check_circular is true, then lists, dicts, and custom encoded objects will be checked for circular references during encoding to prevent an infinite recursion (which would cause an OverflowError). Otherwise, no such check takes place.
If allow_nan is true, then NaN, Infinity, and -Infinity will be encoded as such. This behavior is not JSON specification compliant, but is consistent with most JavaScript based encoders and decoders. Otherwise, it will be a ValueError to encode such floats.
If sort_keys is true, then the output of dictionaries will be sorted by key; this is useful for regression tests to ensure that JSON serializations can be compared on a day-to-day basis.
If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation.
If specified, separators should be an (item_separator, key_separator) tuple. The default is (', ', ': ') if indent is
None
and (',', ': ') otherwise. To get the most compact JSON representation, you should specify (',', ':') to eliminate whitespace.If specified, default is a function that gets called for objects that can't otherwise be serialized. It should return a JSON encodable version of the object or raise a
TypeError
.Expand source code
class JsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super().default(obj)
Ancestors
- json.encoder.JSONEncoder
Methods
def default(self, obj)
-
Implement this method in a subclass such that it returns a serializable object for
o
, or calls the base implementation (to raise aTypeError
).For example, to support arbitrary iterators, you could implement default like this::
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder.default(self, o)
Expand source code
def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super().default(obj)
class ManagedService
-
A service contract with lifecycle methods.
Expand source code
class ManagedService: """A service contract with lifecycle methods.""" @abstractmethod def start(self, **kwargs): """Start and run until finished or asked to stop().""" @abstractmethod def stop(self): """Exit start() method as soon as possible. Delay to exit may result in forceful process termination. """ @abstractmethod def healthcheck(self): """Report vital health information. :Returns: ------- touple (time, string) (latest_heartbeat_timestamp, status_code) latest_heartbeat_timestamp is in the format of time.monotonic(). status_code should have a semantic mapping to the service health. """ return time.monotonic(), "OK" def heal(self): """Inspect and repair the state of the job. This method is normally invoked from a management service when a long running job doesn't seem healthy. Most likely because healthstate() returns bad reports: such as outdated heartbeat timestamp or a bad status code. The method should try to bring the job back to a healthy state as soon as possible to prevent forceful termination. """
Subclasses
Methods
def heal(self)
-
Inspect and repair the state of the job.
This method is normally invoked from a management service when a long running job doesn't seem healthy. Most likely because healthstate() returns bad reports: such as outdated heartbeat timestamp or a bad status code. The method should try to bring the job back to a healthy state as soon as possible to prevent forceful termination.
Expand source code
def heal(self): """Inspect and repair the state of the job. This method is normally invoked from a management service when a long running job doesn't seem healthy. Most likely because healthstate() returns bad reports: such as outdated heartbeat timestamp or a bad status code. The method should try to bring the job back to a healthy state as soon as possible to prevent forceful termination. """
def healthcheck(self)
-
Report vital health information.
:Returns:
touple (time, string) (latest_heartbeat_timestamp, status_code) latest_heartbeat_timestamp is in the format of time.monotonic(). status_code should have a semantic mapping to the service health.
Expand source code
@abstractmethod def healthcheck(self): """Report vital health information. :Returns: ------- touple (time, string) (latest_heartbeat_timestamp, status_code) latest_heartbeat_timestamp is in the format of time.monotonic(). status_code should have a semantic mapping to the service health. """ return time.monotonic(), "OK"
def start(self, **kwargs)
-
Start and run until finished or asked to stop().
Expand source code
@abstractmethod def start(self, **kwargs): """Start and run until finished or asked to stop()."""
def stop(self)
-
Exit start() method as soon as possible.
Delay to exit may result in forceful process termination.
Expand source code
@abstractmethod def stop(self): """Exit start() method as soon as possible. Delay to exit may result in forceful process termination. """
class ServiceExit (*args, **kwargs)
-
Main program exit signal.
Custom exception which is used to trigger clean exit of all running threads and the main program.
Method for controlled multi-threaded Python app exit suggested by George Notaras: https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/
Expand source code
class ServiceExit(Exception): """Main program exit signal. Custom exception which is used to trigger clean exit of all running threads and the main program. Method for controlled multi-threaded Python app exit suggested by George Notaras: https://www.g-loaded.eu/2016/11/24/how-to-terminate-running-python-threads-using-signals/ """
Ancestors
- builtins.Exception
- builtins.BaseException
class ThreadedJob (job=None, **kwargs)
-
A job that runs in its own python thread.
Jobs managed by Threaded Job must implement all ManagedService methods.
Inititalize with a ManagedService.
:Parameters:
job : ManagedService The underlying service wrapped in this thread.
Expand source code
class ThreadedJob(Thread, ManagedService): """A job that runs in its own python thread. Jobs managed by Threaded Job must implement all ManagedService methods. """ # Reminder: even though multiple processes can work well for pipelines, # since they are mostly independent, # Google Coral does not allow access to it from different processes yet. # Consider moving access to coral in a separate process that can serve # an inference task queue from multiple pipelines. def __init__(self, job=None, **kwargs): """Inititalize with a ManagedService. :Parameters: ---------- job : ManagedService The underlying service wrapped in this thread. """ Thread.__init__(self, daemon=True) assert isinstance(job, ManagedService) self.job = job # The shutdown_flag is a threading.Event object that # indicates whether the thread should be terminated. # self.shutdown_flag = threading.Event() # ... Other thread setup code here ... self._stop_requested = Event() def run(self): log.info( "Thread #%s started with job: %s", self.ident, self.job.__class__.__name__ ) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) self.job.start() log.info( "Thread #%s for job %s stopped", self.ident, self.job.__class__.__name__ ) def stop(self): log.debug( "Thread #%s for job %s is signalled to stop" "Passing request to job.", self.ident, self.job.__class__.__name__, ) self._stop_requested.set() self.job.stop() def heal(self): log.debug( "Thread #%s for job %s is signalled to heal." "Passing request to job.", self.ident, self.job.__class__.__name__, ) self.job.heal() log.debug( "Thread #%s for job %s completed heal request.", self.ident, self.job.__class__.__name__, ) def healthcheck(self): log.debug( "Thread #%s for job %s healthcheck requested." "Passing request to job.", self.ident, self.job.__class__.__name__, ) health_status = self.job.healthcheck() return health_status
Ancestors
- threading.Thread
- ManagedService
Methods
def run(self)
-
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
Expand source code
def run(self): log.info( "Thread #%s started with job: %s", self.ident, self.job.__class__.__name__ ) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) self.job.start() log.info( "Thread #%s for job %s stopped", self.ident, self.job.__class__.__name__ )
Inherited members