Module ambianic.pipeline.pipeline_event

Pipeline event timeline read/write/search functions.

Expand source code
"""Pipeline event timeline read/write/search functions."""

import logging
import os
import pathlib
import uuid

import yaml
from concurrent_log_handler import ConcurrentRotatingFileHandler

log = logging.getLogger(__name__)
TIMELINE_EVENT_LOGGER_NAME = __name__ + "__timeline__event__logger__"
PIPELINE_CONTEXT_KEY = "pipeline_context"


class PipelineEvent:
    """Encapsulates information for a pipeline timeline event."""

    def __init__(self, message: str = None, **kwargs):
        """Create a new event instance.

        :Parameters:
        ----------
        message : String
            Human readable display message for the event.
        **kwargs : type
            Additional event arguments.

        """
        self.message = message
        self.kwargs = kwargs
        self.args = {}
        self.args["message"] = self.message
        self.args["args"] = self.kwargs

    def __str__(self):
        """Format event as yaml string."""
        s = yaml.dump(self.kwargs)
        return s


class PipelineContext:
    """Runtime dynamic context for a pipeline.

    Carries information
    such as pipeline name and pipe element stack
    up to and including the element firing the event.

    """

    def __init__(self, unique_pipeline_name: str = None):
        """Instantiate timeline context for a pipeline.

        :Parameters:
        ----------
        unique_pipeline_name : str
            The unique runtime name of a pipeline.

        """
        self._unique_pipeline_name = unique_pipeline_name
        self._element_stack = []
        self._data_dir = None

    @property
    def unique_pipeline_name(self):
        """Return pipeline unique name."""
        return self._unique_pipeline_name

    @property
    def data_dir(self):
        """Return system wide configured data dir."""
        return self._data_dir

    @data_dir.setter
    def data_dir(self, dd=None):
        """Set system wide configured data dir."""
        self._data_dir = dd

    def push_element_context(self, element_context: dict = None):
        """Push new element information to the context stack."""
        self._element_stack.append(element_context)

    def pop_element_context(self) -> dict:
        """Pop element information from the context stack."""
        return self._element_stack.pop()


class PipelineEventFormatter(logging.Formatter):
    """Custom logging formatter for pipeline events."""

    def format(self, record: logging.LogRecord = None) -> str:
        """Populate event information and return as yaml formatted string."""
        # s = super().format(record)
        s = None
        e = {}
        e["id"] = uuid.uuid4().hex
        e["message"] = record.getMessage()
        # log.warning('record.message: %r', record.getMessage())
        # log.warning('record.args: %r', record.args)
        e["created"] = record.created
        e["priority"] = record.levelname

        e["args"] = record.args

        e["source_code"] = {}
        e["source_code"]["pathname"] = record.pathname
        e["source_code"]["funcName"] = record.funcName
        e["source_code"]["lineno"] = record.lineno
        ctx = record.args.get(PIPELINE_CONTEXT_KEY, None)
        if ctx:
            e[PIPELINE_CONTEXT_KEY] = ctx.toDict()
        # use array enclosure a[] to mainain the log file
        # yaml compliant as new events are appended
        # - event1:
        # - event2:
        # - ...
        a = [e]
        s = yaml.dump(a)
        return s


def configure_timeline(config: dict = None):
    """Initialize timeline event logger.

    Sets up pipeline event logger once to be reused by pipelines
    in the current runtime.
    Should be called before any pipeline starts.

    A good place to initialize it is around the time when the root logger
    is initialized.

    :Parameters:
    ------------

    config : dict
        A dictionary of configuration parameters.

    """
    if config is None:
        config = {}
    log_filename = config.get("event_log", None)
    if not log_filename:
        log_filename = "timeline-event-log.yaml"
    log_directory = os.path.dirname(log_filename)
    with pathlib.Path(log_directory) as log_dir:
        log_dir.mkdir(parents=True, exist_ok=True)
    log.debug(f"Timeline event log messages directed to {log_filename}")
    event_log = logging.getLogger(TIMELINE_EVENT_LOGGER_NAME)
    event_log.setLevel(logging.INFO)
    # Use rotating files as log message handler
    handler = ConcurrentRotatingFileHandler(
        log_filename,
        # each event file will keep up to 100K data
        maxBytes=100 * 1024,
        # 100 backup files will be kept. Older will be erased.
        backupCount=100,
    )
    fmt = PipelineEventFormatter()
    handler.setFormatter(fmt)
    # remove any other handlers that may be assigned previously
    # and could cause unexpected log collisions
    event_log.handlers = []
    # add custom event handler
    event_log.addHandler(handler)


def get_event_log(pipeline_context: PipelineContext = None) -> logging.Logger:
    """Get an instance of pipeline event logger.

    :Parameters:
    ----------
    pipe_context : PipelineContext

    :Returns:
    -------
    type
        Implementation of logging.Logger that handles pipeline events

    """
    pipeline_event_log = logging.getLogger(TIMELINE_EVENT_LOGGER_NAME)
    # wrap logger in an adapter that carries pipeline context
    # such as pipeline name and current pipe element.
    pipeline_event_log = logging.LoggerAdapter(
        pipeline_event_log, {PIPELINE_CONTEXT_KEY: pipeline_context}
    )
    return pipeline_event_log

Functions

def configure_timeline(config: dict = None)

Initialize timeline event logger.

Sets up pipeline event logger once to be reused by pipelines in the current runtime. Should be called before any pipeline starts.

A good place to initialize it is around the time when the root logger is initialized.

:Parameters:

config : dict A dictionary of configuration parameters.

Expand source code
def configure_timeline(config: dict = None):
    """Initialize timeline event logger.

    Sets up pipeline event logger once to be reused by pipelines
    in the current runtime.
    Should be called before any pipeline starts.

    A good place to initialize it is around the time when the root logger
    is initialized.

    :Parameters:
    ------------

    config : dict
        A dictionary of configuration parameters.

    """
    if config is None:
        config = {}
    log_filename = config.get("event_log", None)
    if not log_filename:
        log_filename = "timeline-event-log.yaml"
    log_directory = os.path.dirname(log_filename)
    with pathlib.Path(log_directory) as log_dir:
        log_dir.mkdir(parents=True, exist_ok=True)
    log.debug(f"Timeline event log messages directed to {log_filename}")
    event_log = logging.getLogger(TIMELINE_EVENT_LOGGER_NAME)
    event_log.setLevel(logging.INFO)
    # Use rotating files as log message handler
    handler = ConcurrentRotatingFileHandler(
        log_filename,
        # each event file will keep up to 100K data
        maxBytes=100 * 1024,
        # 100 backup files will be kept. Older will be erased.
        backupCount=100,
    )
    fmt = PipelineEventFormatter()
    handler.setFormatter(fmt)
    # remove any other handlers that may be assigned previously
    # and could cause unexpected log collisions
    event_log.handlers = []
    # add custom event handler
    event_log.addHandler(handler)
def get_event_log(pipeline_context: PipelineContext = None) ‑> logging.Logger

Get an instance of pipeline event logger.

:Parameters:

pipe_context : PipelineContext

:Returns:

type Implementation of logging.Logger that handles pipeline events

Expand source code
def get_event_log(pipeline_context: PipelineContext = None) -> logging.Logger:
    """Get an instance of pipeline event logger.

    :Parameters:
    ----------
    pipe_context : PipelineContext

    :Returns:
    -------
    type
        Implementation of logging.Logger that handles pipeline events

    """
    pipeline_event_log = logging.getLogger(TIMELINE_EVENT_LOGGER_NAME)
    # wrap logger in an adapter that carries pipeline context
    # such as pipeline name and current pipe element.
    pipeline_event_log = logging.LoggerAdapter(
        pipeline_event_log, {PIPELINE_CONTEXT_KEY: pipeline_context}
    )
    return pipeline_event_log

Classes

class PipelineContext (unique_pipeline_name: str = None)

Runtime dynamic context for a pipeline.

Carries information such as pipeline name and pipe element stack up to and including the element firing the event.

Instantiate timeline context for a pipeline.

:Parameters:

unique_pipeline_name : str The unique runtime name of a pipeline.

Expand source code
class PipelineContext:
    """Runtime dynamic context for a pipeline.

    Carries information
    such as pipeline name and pipe element stack
    up to and including the element firing the event.

    """

    def __init__(self, unique_pipeline_name: str = None):
        """Instantiate timeline context for a pipeline.

        :Parameters:
        ----------
        unique_pipeline_name : str
            The unique runtime name of a pipeline.

        """
        self._unique_pipeline_name = unique_pipeline_name
        self._element_stack = []
        self._data_dir = None

    @property
    def unique_pipeline_name(self):
        """Return pipeline unique name."""
        return self._unique_pipeline_name

    @property
    def data_dir(self):
        """Return system wide configured data dir."""
        return self._data_dir

    @data_dir.setter
    def data_dir(self, dd=None):
        """Set system wide configured data dir."""
        self._data_dir = dd

    def push_element_context(self, element_context: dict = None):
        """Push new element information to the context stack."""
        self._element_stack.append(element_context)

    def pop_element_context(self) -> dict:
        """Pop element information from the context stack."""
        return self._element_stack.pop()

Instance variables

var data_dir

Return system wide configured data dir.

Expand source code
@property
def data_dir(self):
    """Return system wide configured data dir."""
    return self._data_dir
var unique_pipeline_name

Return pipeline unique name.

Expand source code
@property
def unique_pipeline_name(self):
    """Return pipeline unique name."""
    return self._unique_pipeline_name

Methods

def pop_element_context(self) ‑> dict

Pop element information from the context stack.

Expand source code
def pop_element_context(self) -> dict:
    """Pop element information from the context stack."""
    return self._element_stack.pop()
def push_element_context(self, element_context: dict = None)

Push new element information to the context stack.

Expand source code
def push_element_context(self, element_context: dict = None):
    """Push new element information to the context stack."""
    self._element_stack.append(element_context)
class PipelineEvent (message: str = None, **kwargs)

Encapsulates information for a pipeline timeline event.

Create a new event instance.

:Parameters:

message : String Human readable display message for the event. **kwargs : type Additional event arguments.

Expand source code
class PipelineEvent:
    """Encapsulates information for a pipeline timeline event."""

    def __init__(self, message: str = None, **kwargs):
        """Create a new event instance.

        :Parameters:
        ----------
        message : String
            Human readable display message for the event.
        **kwargs : type
            Additional event arguments.

        """
        self.message = message
        self.kwargs = kwargs
        self.args = {}
        self.args["message"] = self.message
        self.args["args"] = self.kwargs

    def __str__(self):
        """Format event as yaml string."""
        s = yaml.dump(self.kwargs)
        return s
class PipelineEventFormatter (fmt=None, datefmt=None, style='%')

Custom logging formatter for pipeline events.

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of '%', '{' or '$' to specify that you want to use one of %-formatting, :meth:str.format ({}) formatting or :class:string.Template formatting in your format string.

Changed in version: 3.2

Added the style parameter.

Expand source code
class PipelineEventFormatter(logging.Formatter):
    """Custom logging formatter for pipeline events."""

    def format(self, record: logging.LogRecord = None) -> str:
        """Populate event information and return as yaml formatted string."""
        # s = super().format(record)
        s = None
        e = {}
        e["id"] = uuid.uuid4().hex
        e["message"] = record.getMessage()
        # log.warning('record.message: %r', record.getMessage())
        # log.warning('record.args: %r', record.args)
        e["created"] = record.created
        e["priority"] = record.levelname

        e["args"] = record.args

        e["source_code"] = {}
        e["source_code"]["pathname"] = record.pathname
        e["source_code"]["funcName"] = record.funcName
        e["source_code"]["lineno"] = record.lineno
        ctx = record.args.get(PIPELINE_CONTEXT_KEY, None)
        if ctx:
            e[PIPELINE_CONTEXT_KEY] = ctx.toDict()
        # use array enclosure a[] to mainain the log file
        # yaml compliant as new events are appended
        # - event1:
        # - event2:
        # - ...
        a = [e]
        s = yaml.dump(a)
        return s

Ancestors

  • logging.Formatter

Methods

def format(self, record: logging.LogRecord = None) ‑> str

Populate event information and return as yaml formatted string.

Expand source code
def format(self, record: logging.LogRecord = None) -> str:
    """Populate event information and return as yaml formatted string."""
    # s = super().format(record)
    s = None
    e = {}
    e["id"] = uuid.uuid4().hex
    e["message"] = record.getMessage()
    # log.warning('record.message: %r', record.getMessage())
    # log.warning('record.args: %r', record.args)
    e["created"] = record.created
    e["priority"] = record.levelname

    e["args"] = record.args

    e["source_code"] = {}
    e["source_code"]["pathname"] = record.pathname
    e["source_code"]["funcName"] = record.funcName
    e["source_code"]["lineno"] = record.lineno
    ctx = record.args.get(PIPELINE_CONTEXT_KEY, None)
    if ctx:
        e[PIPELINE_CONTEXT_KEY] = ctx.toDict()
    # use array enclosure a[] to mainain the log file
    # yaml compliant as new events are appended
    # - event1:
    # - event2:
    # - ...
    a = [e]
    s = yaml.dump(a)
    return s