Module ambianic.webapp.server.timeline_dao

REST API for timeline events fired by pipelines.

Expand source code
"""REST API for timeline events fired by pipelines."""
import logging
import os
from datetime import datetime
from pathlib import Path

import yaml

log = logging.getLogger()


def _remove_timeline(file_path):
    try:
        os.remove(file_path)
    except Exception:
        logging.exception("Error removing %s" % file_path)


def get_timeline(before_datetime=None, page=1, data_dir=None):
    """Get stored pipeline timeline events.

    :Parameters:
    ----------
    before_datetime : date time in ISO 8601 compatible format,
        YYYY-MM-DDTHH:MM:SS. For example '2002-12-25 00:00:00-06:39'.
        It uses python's standard function datetime.fromisoformat().
        If not provided, the function will start with the most recent available
        sample.
    page : positive integer
        Paginates samples in batches of 5. Defaults to page=1.

    :Returns:
    -------
    list: json
        Returns a list of previously saved pipeline events.

    """

    if data_dir is None or not os.path.exists(data_dir):
        log.warning("data_dir is not valid: %s", data_dir)
        return []

    parsed_datetime = None
    assert isinstance(page, int)
    assert page > 0
    page_size = 5
    if before_datetime:
        try:
            parsed_datetime = datetime.fromisoformat(before_datetime)
            log.debug("Fetching samples saved before %s", parsed_datetime)
        except ValueError as e:
            log.warning(
                "Unable to parse before_datetime parameter: %s. " " Error: %s",
                before_datetime,
                str(e),
            )
    page_start_position = (page - 1) * page_size
    page_end_position = page_start_position + page_size

    if not parsed_datetime:
        log.debug("Fetching most recent saved samples")
    log.debug(
        "Fetching samples page %d. Page size %d. " "Sample index range [%d:%d]. ",
        page,
        page_size,
        page_start_position,
        page_end_position,
    )

    files = list(Path(data_dir).glob("./timeline-event-log.yaml*"))
    files = sorted(files, reverse=False)

    page_count = 1
    events_queue = []

    # load the event history, older first
    for file_path in files:
        with file_path.open() as pf:

            try:
                timeline_events = yaml.safe_load(pf)
                timeline_events += events_queue
            except (
                yaml.reader.ReaderError,
                yaml.scanner.ScannerError,
                yaml.composer.ComposerError,
                yaml.constructor.ConstructorError,
            ):
                log.exception("Detected unreadable timeline, removing %s" % file_path)
                _remove_timeline(file_path)
                continue

            events_queue = []
            events_len = len(timeline_events)
            if events_len < page_end_position:

                pages_mod = events_len % page_size
                if pages_mod > 0:
                    events_queue = timeline_events[0:pages_mod]
                    page_start_position += pages_mod
                    page_end_position += pages_mod

                page_start_position -= events_len
                page_end_position -= events_len
                page_count += 1

            else:

                if page_start_position >= events_len:
                    return []

                # events are appended to the file as they arrive
                # we need to read in reverse order to get the latest one first

                return timeline_events[
                    -1 * page_start_position - 1 : -1 * page_end_position - 1 : -1
                ]

    page_count += 1

    if page_count < page:
        return []

    # return the remaining queue if there are no more files to process
    return events_queue

Functions

def get_timeline(before_datetime=None, page=1, data_dir=None)

Get stored pipeline timeline events.

:Parameters:

before_datetime : date time in ISO 8601 compatible format, YYYY-MM-DDTHH:MM:SS. For example '2002-12-25 00:00:00-06:39'. It uses python's standard function datetime.fromisoformat(). If not provided, the function will start with the most recent available sample. page : positive integer Paginates samples in batches of 5. Defaults to page=1.

:Returns:

list: json Returns a list of previously saved pipeline events.

Expand source code
def get_timeline(before_datetime=None, page=1, data_dir=None):
    """Get stored pipeline timeline events.

    :Parameters:
    ----------
    before_datetime : date time in ISO 8601 compatible format,
        YYYY-MM-DDTHH:MM:SS. For example '2002-12-25 00:00:00-06:39'.
        It uses python's standard function datetime.fromisoformat().
        If not provided, the function will start with the most recent available
        sample.
    page : positive integer
        Paginates samples in batches of 5. Defaults to page=1.

    :Returns:
    -------
    list: json
        Returns a list of previously saved pipeline events.

    """

    if data_dir is None or not os.path.exists(data_dir):
        log.warning("data_dir is not valid: %s", data_dir)
        return []

    parsed_datetime = None
    assert isinstance(page, int)
    assert page > 0
    page_size = 5
    if before_datetime:
        try:
            parsed_datetime = datetime.fromisoformat(before_datetime)
            log.debug("Fetching samples saved before %s", parsed_datetime)
        except ValueError as e:
            log.warning(
                "Unable to parse before_datetime parameter: %s. " " Error: %s",
                before_datetime,
                str(e),
            )
    page_start_position = (page - 1) * page_size
    page_end_position = page_start_position + page_size

    if not parsed_datetime:
        log.debug("Fetching most recent saved samples")
    log.debug(
        "Fetching samples page %d. Page size %d. " "Sample index range [%d:%d]. ",
        page,
        page_size,
        page_start_position,
        page_end_position,
    )

    files = list(Path(data_dir).glob("./timeline-event-log.yaml*"))
    files = sorted(files, reverse=False)

    page_count = 1
    events_queue = []

    # load the event history, older first
    for file_path in files:
        with file_path.open() as pf:

            try:
                timeline_events = yaml.safe_load(pf)
                timeline_events += events_queue
            except (
                yaml.reader.ReaderError,
                yaml.scanner.ScannerError,
                yaml.composer.ComposerError,
                yaml.constructor.ConstructorError,
            ):
                log.exception("Detected unreadable timeline, removing %s" % file_path)
                _remove_timeline(file_path)
                continue

            events_queue = []
            events_len = len(timeline_events)
            if events_len < page_end_position:

                pages_mod = events_len % page_size
                if pages_mod > 0:
                    events_queue = timeline_events[0:pages_mod]
                    page_start_position += pages_mod
                    page_end_position += pages_mod

                page_start_position -= events_len
                page_end_position -= events_len
                page_count += 1

            else:

                if page_start_position >= events_len:
                    return []

                # events are appended to the file as they arrive
                # we need to read in reverse order to get the latest one first

                return timeline_events[
                    -1 * page_start_position - 1 : -1 * page_end_position - 1 : -1
                ]

    page_count += 1

    if page_count < page:
        return []

    # return the remaining queue if there are no more files to process
    return events_queue