Module ambianic.pipeline.save_event
Pipeline sample storage elements.
Expand source code
"""Pipeline sample storage elements."""
import datetime
import json
import logging
import pathlib
import uuid
from typing import Iterable
from ambianic.configuration import DEFAULT_DATA_DIR
from ambianic.notification import Notification, NotificationHandler
from ambianic.pipeline import PipeElement
from ambianic.util import jsonify
log = logging.getLogger(__name__)
class SaveDetectionEvents(PipeElement):
"""Saves AI detection events(inference samples) to an external storage location."""
def __init__(self, positive_interval=2, idle_interval=600, notify=None, **kwargs):
"""Create SaveDetectionEvents element with the provided arguments.
:Parameters:
----------
output_directory: *object_detect_dir
positive_interval: 2 # how often (in seconds) to save samples
with ANY results above the confidence threshold.
Default is 2 seconds.
idle_interval: 600 # how often (in seconds) to save samples
with NO results above the confidence threshold.
Default it 10 minutes (600 seconds.)
"""
super().__init__(**kwargs)
log.info("Loading pipe element %r ", self.__class__.__name__)
if self.context:
self._sys_data_dir = self.context.data_dir
else:
self._sys_data_dir = DEFAULT_DATA_DIR
self._output_directory = pathlib.Path(self._sys_data_dir)
assert self._output_directory, (
"Pipe element %s: requires argument output_directory:"
% self.__class__.__name__
)
# mkdir succeeds even if directory exists.
self._output_directory.mkdir(parents=True, exist_ok=True)
# add unique suffix to output dir to avvoid collisions
now = datetime.datetime.now()
dir_prefix = "detections/"
dir_time = now.strftime("%Y%m%d-%H%M%S.%f%z")
self._rel_data_dir = dir_prefix + dir_time
self._output_directory = self._output_directory / self._rel_data_dir
self._output_directory.mkdir(parents=True, exist_ok=True)
self._output_directory = self._output_directory.resolve()
log.debug("output_directory: %r", self._output_directory)
# os.makedirs(self._output_directory, exist_ok=True)
# by default save samples with detections every 2 seconds
di = positive_interval
self._positive_interval = datetime.timedelta(seconds=di)
# set the clock to sufficiently outdated timestamp to ensure
# that we won't miss saving the very first sample
self._time_latest_saved_detection = (
datetime.datetime.now() - datetime.timedelta(days=1)
)
# by default save samples without any detections every ten minutes
ii = idle_interval
self._idle_interval = datetime.timedelta(seconds=ii)
self._time_latest_saved_idle = self._time_latest_saved_detection
# setup notification handler
self.notifier = None
self.notification_config = notify or {}
notification_providers = self.notification_config.get("providers")
if notification_providers is None or len(notification_providers) == 0:
# if no notification providers are explicitly configured
# use a default provider
self.notification_config["providers"] = ["default"]
self.notifier = NotificationHandler()
def _save_sample(
self,
inf_time=None,
image=None,
thumbnail=None,
inference_result=None,
inference_meta=None,
):
time_prefix = inf_time.strftime("%Y%m%d-%H%M%S.%f%z-{suffix}.{fext}")
image_file = time_prefix.format(suffix="image", fext="jpg")
image_path = self._output_directory / image_file
thumbnail_file = time_prefix.format(suffix="thumbnail", fext="jpg")
thumbnail_path = self._output_directory / thumbnail_file
json_file = time_prefix.format(suffix="inference", fext="json")
json_path = self._output_directory / json_file
save_json = {
"id": uuid.uuid4().hex,
"datetime": inf_time.isoformat(),
"image_file_name": image_file,
"thumbnail_file_name": thumbnail_file,
"json_file_name": json_file,
# rel_dir is relative to system data dir
# this will be important when resloving REST API data
# file serving
"rel_dir": self._rel_data_dir,
"inference_result": inference_result,
"inference_meta": inference_meta,
}
image.save(image_path)
thumbnail.save(thumbnail_path)
# save samples to local disk
json_str = jsonify(save_json)
with open(json_path, "w", encoding="utf-8") as f:
f.write(json_str)
# e = PipelineEvent('Detected Objects', type='ObjectDetection')
json_encoded_obj = json.loads(json_str)
log_message = "Detection Event"
event_priority = logging.INFO
self.event_log.log(event_priority, log_message, json_encoded_obj)
log.debug("Saved sample (detection event): %r ", save_json)
# format notification message in a way consistent with event log file formatting
# used by PipelineEventFormatter
event_data = {
"message": log_message,
"priority": logging.getLevelName(event_priority),
"args": save_json,
}
# only send notification if there is a non-empty inference result
if inference_result:
self.notify(event_data)
return image_path, json_path
def process_sample(self, **sample) -> Iterable[dict]:
"""Process next detection sample."""
image = sample.get("image", None)
thumbnail = sample.get("thumbnail", None)
inference_result = sample.get("inference_result", None)
inference_meta = sample.get("inference_meta", None)
log.debug(
"Pipe element %s received new sample with keys %s.",
self.__class__.__name__,
str([*sample]),
)
if not image:
# pass through empty samples to next element
yield None
else:
try:
log.debug("sample detections: %r", inference_result)
now = datetime.datetime.now()
if inference_result:
# non-empty result, there is a detection
# let's save it if its been longer than
# the user specified positive_interval
if (
now - self._time_latest_saved_detection
>= self._positive_interval
):
self._save_sample(
inf_time=now,
image=image,
thumbnail=thumbnail,
inference_result=inference_result,
inference_meta=inference_meta,
)
self._time_latest_saved_detection = now
else:
# empty result, there is no detection
# let's save a sample if its been longer than
# the user specified idle_interval
if now - self._time_latest_saved_idle >= self._idle_interval:
self._save_sample(
inf_time=now,
image=image,
thumbnail=thumbnail,
inference_result=inference_result,
inference_meta=inference_meta,
)
self._time_latest_saved_idle = now
except Exception as e:
log.exception("Error %r while saving sample %r", e, sample)
finally:
# pass on the sample to the next pipe element if there is one
processed_sample = {
"image": image,
"inference_result": inference_result,
}
log.debug("Passing sample on: %r ", processed_sample)
yield processed_sample
def notify(self, event_data: dict):
"""Send out a notification with an event payload"""
# don't bother sending notifications for idle events
# used as history log markers
if event_data["args"]["inference_result"] is None:
return
log.info(f"Preparing notification with event payload: {event_data}")
if self.notifier is None:
log.debug("No notifier specified. Skipping notification send.")
return
notification = Notification(
envelope=event_data,
providers=self.notification_config["providers"],
)
self.notifier.send(notification)
Classes
class SaveDetectionEvents (positive_interval=2, idle_interval=600, notify=None, **kwargs)
-
Saves AI detection events(inference samples) to an external storage location.
Create SaveDetectionEvents element with the provided arguments. :Parameters:
output_directory: *object_detect_dir positive_interval: 2 # how often (in seconds) to save samples with ANY results above the confidence threshold. Default is 2 seconds. idle_interval: 600 # how often (in seconds) to save samples with NO results above the confidence threshold. Default it 10 minutes (600 seconds.)
Expand source code
class SaveDetectionEvents(PipeElement): """Saves AI detection events(inference samples) to an external storage location.""" def __init__(self, positive_interval=2, idle_interval=600, notify=None, **kwargs): """Create SaveDetectionEvents element with the provided arguments. :Parameters: ---------- output_directory: *object_detect_dir positive_interval: 2 # how often (in seconds) to save samples with ANY results above the confidence threshold. Default is 2 seconds. idle_interval: 600 # how often (in seconds) to save samples with NO results above the confidence threshold. Default it 10 minutes (600 seconds.) """ super().__init__(**kwargs) log.info("Loading pipe element %r ", self.__class__.__name__) if self.context: self._sys_data_dir = self.context.data_dir else: self._sys_data_dir = DEFAULT_DATA_DIR self._output_directory = pathlib.Path(self._sys_data_dir) assert self._output_directory, ( "Pipe element %s: requires argument output_directory:" % self.__class__.__name__ ) # mkdir succeeds even if directory exists. self._output_directory.mkdir(parents=True, exist_ok=True) # add unique suffix to output dir to avvoid collisions now = datetime.datetime.now() dir_prefix = "detections/" dir_time = now.strftime("%Y%m%d-%H%M%S.%f%z") self._rel_data_dir = dir_prefix + dir_time self._output_directory = self._output_directory / self._rel_data_dir self._output_directory.mkdir(parents=True, exist_ok=True) self._output_directory = self._output_directory.resolve() log.debug("output_directory: %r", self._output_directory) # os.makedirs(self._output_directory, exist_ok=True) # by default save samples with detections every 2 seconds di = positive_interval self._positive_interval = datetime.timedelta(seconds=di) # set the clock to sufficiently outdated timestamp to ensure # that we won't miss saving the very first sample self._time_latest_saved_detection = ( datetime.datetime.now() - datetime.timedelta(days=1) ) # by default save samples without any detections every ten minutes ii = idle_interval self._idle_interval = datetime.timedelta(seconds=ii) self._time_latest_saved_idle = self._time_latest_saved_detection # setup notification handler self.notifier = None self.notification_config = notify or {} notification_providers = self.notification_config.get("providers") if notification_providers is None or len(notification_providers) == 0: # if no notification providers are explicitly configured # use a default provider self.notification_config["providers"] = ["default"] self.notifier = NotificationHandler() def _save_sample( self, inf_time=None, image=None, thumbnail=None, inference_result=None, inference_meta=None, ): time_prefix = inf_time.strftime("%Y%m%d-%H%M%S.%f%z-{suffix}.{fext}") image_file = time_prefix.format(suffix="image", fext="jpg") image_path = self._output_directory / image_file thumbnail_file = time_prefix.format(suffix="thumbnail", fext="jpg") thumbnail_path = self._output_directory / thumbnail_file json_file = time_prefix.format(suffix="inference", fext="json") json_path = self._output_directory / json_file save_json = { "id": uuid.uuid4().hex, "datetime": inf_time.isoformat(), "image_file_name": image_file, "thumbnail_file_name": thumbnail_file, "json_file_name": json_file, # rel_dir is relative to system data dir # this will be important when resloving REST API data # file serving "rel_dir": self._rel_data_dir, "inference_result": inference_result, "inference_meta": inference_meta, } image.save(image_path) thumbnail.save(thumbnail_path) # save samples to local disk json_str = jsonify(save_json) with open(json_path, "w", encoding="utf-8") as f: f.write(json_str) # e = PipelineEvent('Detected Objects', type='ObjectDetection') json_encoded_obj = json.loads(json_str) log_message = "Detection Event" event_priority = logging.INFO self.event_log.log(event_priority, log_message, json_encoded_obj) log.debug("Saved sample (detection event): %r ", save_json) # format notification message in a way consistent with event log file formatting # used by PipelineEventFormatter event_data = { "message": log_message, "priority": logging.getLevelName(event_priority), "args": save_json, } # only send notification if there is a non-empty inference result if inference_result: self.notify(event_data) return image_path, json_path def process_sample(self, **sample) -> Iterable[dict]: """Process next detection sample.""" image = sample.get("image", None) thumbnail = sample.get("thumbnail", None) inference_result = sample.get("inference_result", None) inference_meta = sample.get("inference_meta", None) log.debug( "Pipe element %s received new sample with keys %s.", self.__class__.__name__, str([*sample]), ) if not image: # pass through empty samples to next element yield None else: try: log.debug("sample detections: %r", inference_result) now = datetime.datetime.now() if inference_result: # non-empty result, there is a detection # let's save it if its been longer than # the user specified positive_interval if ( now - self._time_latest_saved_detection >= self._positive_interval ): self._save_sample( inf_time=now, image=image, thumbnail=thumbnail, inference_result=inference_result, inference_meta=inference_meta, ) self._time_latest_saved_detection = now else: # empty result, there is no detection # let's save a sample if its been longer than # the user specified idle_interval if now - self._time_latest_saved_idle >= self._idle_interval: self._save_sample( inf_time=now, image=image, thumbnail=thumbnail, inference_result=inference_result, inference_meta=inference_meta, ) self._time_latest_saved_idle = now except Exception as e: log.exception("Error %r while saving sample %r", e, sample) finally: # pass on the sample to the next pipe element if there is one processed_sample = { "image": image, "inference_result": inference_result, } log.debug("Passing sample on: %r ", processed_sample) yield processed_sample def notify(self, event_data: dict): """Send out a notification with an event payload""" # don't bother sending notifications for idle events # used as history log markers if event_data["args"]["inference_result"] is None: return log.info(f"Preparing notification with event payload: {event_data}") if self.notifier is None: log.debug("No notifier specified. Skipping notification send.") return notification = Notification( envelope=event_data, providers=self.notification_config["providers"], ) self.notifier.send(notification)
Ancestors
Methods
def notify(self, event_data: dict)
-
Send out a notification with an event payload
Expand source code
def notify(self, event_data: dict): """Send out a notification with an event payload""" # don't bother sending notifications for idle events # used as history log markers if event_data["args"]["inference_result"] is None: return log.info(f"Preparing notification with event payload: {event_data}") if self.notifier is None: log.debug("No notifier specified. Skipping notification send.") return notification = Notification( envelope=event_data, providers=self.notification_config["providers"], ) self.notifier.send(notification)
def process_sample(self, **sample) ‑> Iterable[dict]
-
Process next detection sample.
Expand source code
def process_sample(self, **sample) -> Iterable[dict]: """Process next detection sample.""" image = sample.get("image", None) thumbnail = sample.get("thumbnail", None) inference_result = sample.get("inference_result", None) inference_meta = sample.get("inference_meta", None) log.debug( "Pipe element %s received new sample with keys %s.", self.__class__.__name__, str([*sample]), ) if not image: # pass through empty samples to next element yield None else: try: log.debug("sample detections: %r", inference_result) now = datetime.datetime.now() if inference_result: # non-empty result, there is a detection # let's save it if its been longer than # the user specified positive_interval if ( now - self._time_latest_saved_detection >= self._positive_interval ): self._save_sample( inf_time=now, image=image, thumbnail=thumbnail, inference_result=inference_result, inference_meta=inference_meta, ) self._time_latest_saved_detection = now else: # empty result, there is no detection # let's save a sample if its been longer than # the user specified idle_interval if now - self._time_latest_saved_idle >= self._idle_interval: self._save_sample( inf_time=now, image=image, thumbnail=thumbnail, inference_result=inference_result, inference_meta=inference_meta, ) self._time_latest_saved_idle = now except Exception as e: log.exception("Error %r while saving sample %r", e, sample) finally: # pass on the sample to the next pipe element if there is one processed_sample = { "image": image, "inference_result": inference_result, } log.debug("Passing sample on: %r ", processed_sample) yield processed_sample
Inherited members