Module ambianic.pipeline.avsource.av_element

Audio Video sourcing to an Ambianic pipeline.

Expand source code
"""Audio Video sourcing to an Ambianic pipeline."""

import logging
import multiprocessing
import queue
import threading
import time
from io import BytesIO

import requests
from ambianic.pipeline import PipeElement
from ambianic.pipeline.avsource import gst_process
from ambianic.pipeline.avsource.picam import Picamera
from ambianic.util import stacktrace
from PIL import Image

log = logging.getLogger(__name__)


MIN_HEALING_INTERVAL = 5


class AVSourceElement(PipeElement):
    """
    Pipe element that handles a wide range of media input sources.

    Detects media input source, processes and passes on normalized raw
    image samples to the next pipe element.
    """

    def __init__(self, uri=None, type=None, live=False, **kwargs):
        """Create an av source element with given configuration.

        :Parameters:
        ----------
        source_conf : dict
            uri: string (examples
                uri: rtsp://somehost/ipcam/channel0
                uri: http://somehost/ipcam/sample.jpg
                )
            type: string (video, audio or image)
            live: boolean (True if the source is a live stream)
                When live is True AVSourceElement source element will
                keep trying to reconnect
                in case there is disruption of the source stream
                until explicit stop() is requested of the element.
        """
        super().__init__(**kwargs)

        assert uri

        element_conf = dict(kwargs)
        element_conf["uri"] = uri
        element_conf["type"] = type
        element_conf["live"] = live

        # pipeline source info
        self._source_conf = element_conf
        self._is_live = live
        self._gst_process = None
        self._gst_out_queue = None
        self._gst_process_stop_signal = None
        self._gst_process_eos_reached = None
        # protects access to gstreamer resources in rare cases
        # such as supervised healing requests
        self._healing_in_progress = threading.RLock()
        # ensure healing requests are reasonably spaced out
        self._latest_healing = time.monotonic()

    def _on_new_sample(self, sample=None):
        log.debug("Input stream received new gst sample.")
        assert sample
        sample_type = sample["type"]
        # only image type supported at this time
        assert sample_type == "image"
        # make sure the sample is in RGB format
        sample_format = sample["format"]
        assert sample_format == "RGB"
        width = sample["width"]
        height = sample["height"]
        sample_bytes = sample["bytes"]
        img = Image.frombytes(sample_format, (width, height), sample_bytes, "raw")
        # pass image sample to next pipe element, e.g. ai inference
        log.debug("Input stream sending sample to next element.")
        self.receive_next_sample(image=img)

    def _get_gst_service_starter(self):
        return gst_process.start_gst_service

    def _get_sample_queue(self):
        q = multiprocessing.Queue(3)
        return q

    def fetch_img(self, session=None, url=None) -> Image:
        assert url
        r = requests.get(url)
        r.raise_for_status()
        img = Image.open(BytesIO(r.content))
        return img

    def _on_fetch_img_exception(self, _exception=None):
        pass

    def _fetch_img_exception_recovery(self):
        log.debug("Pausing for a moment to let remote network issues settle")
        time.sleep(1)

    def _run_picamera_fetch(self):
        with Picamera() as picamera:
            while not self._stop_requested:
                if picamera.has_failure():
                    log.warning(picamera.error)
                    break
                image = picamera.acquire()
                if image is not None:
                    self.receive_next_sample(image=image)

    def _run_http_fetch(self, url=None, continuous=False):
        log.debug("Fetching source uri sample over http: %r", url)
        assert url
        while not self._stop_requested:
            img = None
            try:
                img = self.fetch_img(url=url)
                log.debug(
                    """
                    Image fetched: %r
                    From URL: %r
                    """,
                    img,
                    url,
                )
                log.debug("Sending sample to next element.")
                self.receive_next_sample(image=img)
            except Exception as e:
                self._on_fetch_img_exception(_exception=e)
                log.exception(
                    """
                    Failed to fetch image from pipeline source.
                    URL: %r
                    """,
                    url,
                )
                if continuous:
                    log.warning(
                        "Will keep trying to fetch image from continuous source."
                    )
                    self._fetch_img_exception_recovery()
            finally:
                if not continuous:
                    # this is not a live (continuous) media source
                    # exit the image fetch loop
                    log.debug("Completed one time http image fetch from URL: %r", url)
                    break

    def _run_gst_service(self):
        log.debug("Starting Gst service process...")
        self._gst_out_queue = self._get_sample_queue()
        self._gst_process_stop_signal = multiprocessing.Event()
        self._gst_process_eos_reached = multiprocessing.Event()
        gst_service = self._get_gst_service_starter()
        self._gst_process = multiprocessing.Process(
            target=gst_service,
            name="Gstreamer Service Process",
            daemon=True,
            kwargs={
                "source_conf": self._source_conf,
                "out_queue": self._gst_out_queue,
                "stop_signal": self._gst_process_stop_signal,
                "eos_reached": self._gst_process_eos_reached,
            },
        )
        self._gst_process.daemon = True
        self._gst_process.start()
        gst_proc = self._gst_process
        while not self._stop_requested and gst_proc.is_alive():
            # do not use process.join() to avoid deadlock due to shared queue
            try:
                next_sample = self._gst_out_queue.get(timeout=1)
                # print('next sample received from gst queue, _on_new_sample')
                self._on_new_sample(sample=next_sample)
            except queue.Empty:
                log.debug("no new sample available yet in gst out queue")
            except Exception as e:
                log.warning("AVElement loop caught an error: %s. ", str(e))
                log.warning(stacktrace())
                # print('Exception caught from _on_new_sample %r' % e)
        # print('end of _run_gst_service.')
        log.debug("exiting _run_gst_service")

    def _clear_gst_out_queue(self):
        log.debug("Clearing _gst_out_queue.")
        while not self._gst_out_queue.empty():
            try:
                self._gst_out_queue.get_nowait()
            except queue.Empty:
                log.debug("_gst_out_queue already empty.")
        log.debug("Cleared _gst_out_queue.")

    def _process_terminate(self, proc=None):
        proc.terminate()
        # give it a few seconds to terminate cleanly
        for i in range(10):
            self._clear_gst_out_queue()
            # do not use process.join() to avoid deadlock
            # due to shared queue. Use sleep instead.
            time.sleep(1)
            if not proc.is_alive():
                break

    def _process_good_kill(self, proc=None):
        # print('AVElement: Killing Gst process PID %r' % proc.pid)
        proc.kill()
        return True
        # time.sleep(3)
        # if proc.exitcode is None:
        #    # process is still alive
        #    log.warning('GST process kill was not clean. Process still alive.'
        #                'PID %r',
        #                proc.pid)
        #    # print('GST process kill was not clean. Process still alive. '
        #    #      'PID %r' %
        #    #      proc.pid)
        #    return False
        # else:
        #    log.warning('GST process killed. '
        #                'PID %r , exit code: %r',
        #                proc.pid,
        #                proc.exitcode)
        #    # print('GST process killed. '
        #    #      'PID %r , exit code: %r' %
        #    #      (proc.pid, proc.exitcode))
        #    return True

    def _stop_gst_service(self):
        log.debug("Stopping Gst service process.")
        gst_proc = self._gst_process
        stop_signal = self._gst_process_stop_signal
        if gst_proc and gst_proc.is_alive():
            # tell the OS we won't use this queue any more
            log.debug("GST process still alive. Shutting it down.")
            # log.debug('Closing out queue shared with GST proces.')
            # self._gst_out_queue.close()
            # send a polite request to the process to stop
            log.debug("Sending stop signal to GST process.")
            stop_signal.set()
            log.debug("Signalled gst process to stop")
            # give it a few seconds to stop cleanly
            for i in range(10):
                # make sure a non-empty queue doesn't block
                # the gst process from stopping
                self._clear_gst_out_queue()
                # do not use process.join() to avoid deadlock
                # due to shared queue.  Use sleep instead.
                time.sleep(1)
                if not gst_proc.is_alive():
                    break
            # process did not stop, we need to be a bit more assertive
            if gst_proc.is_alive():
                log.debug("Gst process did not stop. Terminating.")
                self._process_terminate(gst_proc)
                if gst_proc.is_alive():
                    # last resort, force kill the process
                    log.debug(
                        "Gst proess did not terminate." " Resorting to force kill."
                    )
                    clean_kill = self._process_good_kill(gst_proc)
                    log.debug("Gst process killed. Clean: %r.", clean_kill)
                else:
                    log.debug("Gst process stopped after terminate signal.")
            else:
                log.debug("Gst process stopped after stop signal.")

    def start(self):
        """Start processing input from the configured audio or video source."""
        super().start()
        log.info("Starting %s", self.__class__.__name__)
        self._stop_requested = False

        if self._source_conf["uri"] == "picamera":
            log.debug("Input source is picamera")
            self._run_picamera_fetch()
        elif (
            self._source_conf["uri"].startswith("http")
            and self._source_conf["type"] == "image"
        ):
            log.debug(
                """
                Input source is an http still image: %r
                Will use python requests lib for sampling.
                """,
                self._source_conf["uri"],
            )
            # use http client library to fetch still images
            self._run_http_fetch(url=self._source_conf["uri"], continuous=self._is_live)
        else:
            log.debug(
                """
                Input source is : %r
                Will use gstreamer for sampling.
                """,
                self._source_conf["uri"],
            )
            # use gstreamer for all other types of media sources
            while not self._stop_requested:
                self._run_gst_service()
                if self._gst_process_eos_reached and not self._is_live:
                    # gst process reached end of its input stream
                    # and this is not a live (continuous) stream loop
                    # exit the avsource element loop
                    log.debug(
                        "GST EOS reached for source uri: %r", self._source_conf["uri"]
                    )
                    break
            self._stop_gst_service()
        super().stop()
        log.info("Stopped %s", self.__class__.__name__)

    def heal(self):
        """Attempt to heal a damaged AV source processing service."""
        log.debug("Entering healing method... %s", self.__class__.__name__)
        log.debug("Healing waiting for lock.")
        self._healing_in_progress.acquire()
        try:
            logging.debug("Healing lock acquired.")
            now = time.monotonic()
            # Space out healing attempts.
            # No point in back to back healing runs when there are
            # blocking dependencies on external resources.
            log.warning(
                "latest healing ts: %r, now-MIN_HEALING_INTERVAL: %r",
                self._latest_healing,
                now - MIN_HEALING_INTERVAL,
            )
            if self._latest_healing < now - MIN_HEALING_INTERVAL:
                # cause gst loop to exit and repair
                self._latest_healing = now
                self._stop_gst_service()
                # lets give external resources a chance to recover
                # for example wifi connection is down temporarily
                time.sleep(1)
                log.debug("AVElement healing completed.")
            else:
                log.debug(
                    "Healing request ignored. "
                    "Too soon after previous healing request."
                )
        finally:
            logging.debug("Healing lock released.")
            self._healing_in_progress.release()
        log.debug("Exiting healing method. %s", self.__class__.__name__)

    def stop(self):
        """Stop the AV source processing loop."""
        log.info("Entering stop method ... %s", self.__class__.__name__)
        self._stop_requested = True
        super().stop()
        log.info("Exiting stop method. %s", self.__class__.__name__)

Classes

class AVSourceElement (uri=None, type=None, live=False, **kwargs)

Pipe element that handles a wide range of media input sources.

Detects media input source, processes and passes on normalized raw image samples to the next pipe element.

Create an av source element with given configuration.

:Parameters:

source_conf : dict uri: string (examples uri: rtsp://somehost/ipcam/channel0 uri: http://somehost/ipcam/sample.jpg ) type: string (video, audio or image) live: boolean (True if the source is a live stream) When live is True AVSourceElement source element will keep trying to reconnect in case there is disruption of the source stream until explicit stop() is requested of the element.

Expand source code
class AVSourceElement(PipeElement):
    """
    Pipe element that handles a wide range of media input sources.

    Detects media input source, processes and passes on normalized raw
    image samples to the next pipe element.
    """

    def __init__(self, uri=None, type=None, live=False, **kwargs):
        """Create an av source element with given configuration.

        :Parameters:
        ----------
        source_conf : dict
            uri: string (examples
                uri: rtsp://somehost/ipcam/channel0
                uri: http://somehost/ipcam/sample.jpg
                )
            type: string (video, audio or image)
            live: boolean (True if the source is a live stream)
                When live is True AVSourceElement source element will
                keep trying to reconnect
                in case there is disruption of the source stream
                until explicit stop() is requested of the element.
        """
        super().__init__(**kwargs)

        assert uri

        element_conf = dict(kwargs)
        element_conf["uri"] = uri
        element_conf["type"] = type
        element_conf["live"] = live

        # pipeline source info
        self._source_conf = element_conf
        self._is_live = live
        self._gst_process = None
        self._gst_out_queue = None
        self._gst_process_stop_signal = None
        self._gst_process_eos_reached = None
        # protects access to gstreamer resources in rare cases
        # such as supervised healing requests
        self._healing_in_progress = threading.RLock()
        # ensure healing requests are reasonably spaced out
        self._latest_healing = time.monotonic()

    def _on_new_sample(self, sample=None):
        log.debug("Input stream received new gst sample.")
        assert sample
        sample_type = sample["type"]
        # only image type supported at this time
        assert sample_type == "image"
        # make sure the sample is in RGB format
        sample_format = sample["format"]
        assert sample_format == "RGB"
        width = sample["width"]
        height = sample["height"]
        sample_bytes = sample["bytes"]
        img = Image.frombytes(sample_format, (width, height), sample_bytes, "raw")
        # pass image sample to next pipe element, e.g. ai inference
        log.debug("Input stream sending sample to next element.")
        self.receive_next_sample(image=img)

    def _get_gst_service_starter(self):
        return gst_process.start_gst_service

    def _get_sample_queue(self):
        q = multiprocessing.Queue(3)
        return q

    def fetch_img(self, session=None, url=None) -> Image:
        assert url
        r = requests.get(url)
        r.raise_for_status()
        img = Image.open(BytesIO(r.content))
        return img

    def _on_fetch_img_exception(self, _exception=None):
        pass

    def _fetch_img_exception_recovery(self):
        log.debug("Pausing for a moment to let remote network issues settle")
        time.sleep(1)

    def _run_picamera_fetch(self):
        with Picamera() as picamera:
            while not self._stop_requested:
                if picamera.has_failure():
                    log.warning(picamera.error)
                    break
                image = picamera.acquire()
                if image is not None:
                    self.receive_next_sample(image=image)

    def _run_http_fetch(self, url=None, continuous=False):
        log.debug("Fetching source uri sample over http: %r", url)
        assert url
        while not self._stop_requested:
            img = None
            try:
                img = self.fetch_img(url=url)
                log.debug(
                    """
                    Image fetched: %r
                    From URL: %r
                    """,
                    img,
                    url,
                )
                log.debug("Sending sample to next element.")
                self.receive_next_sample(image=img)
            except Exception as e:
                self._on_fetch_img_exception(_exception=e)
                log.exception(
                    """
                    Failed to fetch image from pipeline source.
                    URL: %r
                    """,
                    url,
                )
                if continuous:
                    log.warning(
                        "Will keep trying to fetch image from continuous source."
                    )
                    self._fetch_img_exception_recovery()
            finally:
                if not continuous:
                    # this is not a live (continuous) media source
                    # exit the image fetch loop
                    log.debug("Completed one time http image fetch from URL: %r", url)
                    break

    def _run_gst_service(self):
        log.debug("Starting Gst service process...")
        self._gst_out_queue = self._get_sample_queue()
        self._gst_process_stop_signal = multiprocessing.Event()
        self._gst_process_eos_reached = multiprocessing.Event()
        gst_service = self._get_gst_service_starter()
        self._gst_process = multiprocessing.Process(
            target=gst_service,
            name="Gstreamer Service Process",
            daemon=True,
            kwargs={
                "source_conf": self._source_conf,
                "out_queue": self._gst_out_queue,
                "stop_signal": self._gst_process_stop_signal,
                "eos_reached": self._gst_process_eos_reached,
            },
        )
        self._gst_process.daemon = True
        self._gst_process.start()
        gst_proc = self._gst_process
        while not self._stop_requested and gst_proc.is_alive():
            # do not use process.join() to avoid deadlock due to shared queue
            try:
                next_sample = self._gst_out_queue.get(timeout=1)
                # print('next sample received from gst queue, _on_new_sample')
                self._on_new_sample(sample=next_sample)
            except queue.Empty:
                log.debug("no new sample available yet in gst out queue")
            except Exception as e:
                log.warning("AVElement loop caught an error: %s. ", str(e))
                log.warning(stacktrace())
                # print('Exception caught from _on_new_sample %r' % e)
        # print('end of _run_gst_service.')
        log.debug("exiting _run_gst_service")

    def _clear_gst_out_queue(self):
        log.debug("Clearing _gst_out_queue.")
        while not self._gst_out_queue.empty():
            try:
                self._gst_out_queue.get_nowait()
            except queue.Empty:
                log.debug("_gst_out_queue already empty.")
        log.debug("Cleared _gst_out_queue.")

    def _process_terminate(self, proc=None):
        proc.terminate()
        # give it a few seconds to terminate cleanly
        for i in range(10):
            self._clear_gst_out_queue()
            # do not use process.join() to avoid deadlock
            # due to shared queue. Use sleep instead.
            time.sleep(1)
            if not proc.is_alive():
                break

    def _process_good_kill(self, proc=None):
        # print('AVElement: Killing Gst process PID %r' % proc.pid)
        proc.kill()
        return True
        # time.sleep(3)
        # if proc.exitcode is None:
        #    # process is still alive
        #    log.warning('GST process kill was not clean. Process still alive.'
        #                'PID %r',
        #                proc.pid)
        #    # print('GST process kill was not clean. Process still alive. '
        #    #      'PID %r' %
        #    #      proc.pid)
        #    return False
        # else:
        #    log.warning('GST process killed. '
        #                'PID %r , exit code: %r',
        #                proc.pid,
        #                proc.exitcode)
        #    # print('GST process killed. '
        #    #      'PID %r , exit code: %r' %
        #    #      (proc.pid, proc.exitcode))
        #    return True

    def _stop_gst_service(self):
        log.debug("Stopping Gst service process.")
        gst_proc = self._gst_process
        stop_signal = self._gst_process_stop_signal
        if gst_proc and gst_proc.is_alive():
            # tell the OS we won't use this queue any more
            log.debug("GST process still alive. Shutting it down.")
            # log.debug('Closing out queue shared with GST proces.')
            # self._gst_out_queue.close()
            # send a polite request to the process to stop
            log.debug("Sending stop signal to GST process.")
            stop_signal.set()
            log.debug("Signalled gst process to stop")
            # give it a few seconds to stop cleanly
            for i in range(10):
                # make sure a non-empty queue doesn't block
                # the gst process from stopping
                self._clear_gst_out_queue()
                # do not use process.join() to avoid deadlock
                # due to shared queue.  Use sleep instead.
                time.sleep(1)
                if not gst_proc.is_alive():
                    break
            # process did not stop, we need to be a bit more assertive
            if gst_proc.is_alive():
                log.debug("Gst process did not stop. Terminating.")
                self._process_terminate(gst_proc)
                if gst_proc.is_alive():
                    # last resort, force kill the process
                    log.debug(
                        "Gst proess did not terminate." " Resorting to force kill."
                    )
                    clean_kill = self._process_good_kill(gst_proc)
                    log.debug("Gst process killed. Clean: %r.", clean_kill)
                else:
                    log.debug("Gst process stopped after terminate signal.")
            else:
                log.debug("Gst process stopped after stop signal.")

    def start(self):
        """Start processing input from the configured audio or video source."""
        super().start()
        log.info("Starting %s", self.__class__.__name__)
        self._stop_requested = False

        if self._source_conf["uri"] == "picamera":
            log.debug("Input source is picamera")
            self._run_picamera_fetch()
        elif (
            self._source_conf["uri"].startswith("http")
            and self._source_conf["type"] == "image"
        ):
            log.debug(
                """
                Input source is an http still image: %r
                Will use python requests lib for sampling.
                """,
                self._source_conf["uri"],
            )
            # use http client library to fetch still images
            self._run_http_fetch(url=self._source_conf["uri"], continuous=self._is_live)
        else:
            log.debug(
                """
                Input source is : %r
                Will use gstreamer for sampling.
                """,
                self._source_conf["uri"],
            )
            # use gstreamer for all other types of media sources
            while not self._stop_requested:
                self._run_gst_service()
                if self._gst_process_eos_reached and not self._is_live:
                    # gst process reached end of its input stream
                    # and this is not a live (continuous) stream loop
                    # exit the avsource element loop
                    log.debug(
                        "GST EOS reached for source uri: %r", self._source_conf["uri"]
                    )
                    break
            self._stop_gst_service()
        super().stop()
        log.info("Stopped %s", self.__class__.__name__)

    def heal(self):
        """Attempt to heal a damaged AV source processing service."""
        log.debug("Entering healing method... %s", self.__class__.__name__)
        log.debug("Healing waiting for lock.")
        self._healing_in_progress.acquire()
        try:
            logging.debug("Healing lock acquired.")
            now = time.monotonic()
            # Space out healing attempts.
            # No point in back to back healing runs when there are
            # blocking dependencies on external resources.
            log.warning(
                "latest healing ts: %r, now-MIN_HEALING_INTERVAL: %r",
                self._latest_healing,
                now - MIN_HEALING_INTERVAL,
            )
            if self._latest_healing < now - MIN_HEALING_INTERVAL:
                # cause gst loop to exit and repair
                self._latest_healing = now
                self._stop_gst_service()
                # lets give external resources a chance to recover
                # for example wifi connection is down temporarily
                time.sleep(1)
                log.debug("AVElement healing completed.")
            else:
                log.debug(
                    "Healing request ignored. "
                    "Too soon after previous healing request."
                )
        finally:
            logging.debug("Healing lock released.")
            self._healing_in_progress.release()
        log.debug("Exiting healing method. %s", self.__class__.__name__)

    def stop(self):
        """Stop the AV source processing loop."""
        log.info("Entering stop method ... %s", self.__class__.__name__)
        self._stop_requested = True
        super().stop()
        log.info("Exiting stop method. %s", self.__class__.__name__)

Ancestors

Methods

def fetch_img(self, session=None, url=None) ‑> 
Expand source code
def fetch_img(self, session=None, url=None) -> Image:
    assert url
    r = requests.get(url)
    r.raise_for_status()
    img = Image.open(BytesIO(r.content))
    return img
def heal(self)

Attempt to heal a damaged AV source processing service.

Expand source code
def heal(self):
    """Attempt to heal a damaged AV source processing service."""
    log.debug("Entering healing method... %s", self.__class__.__name__)
    log.debug("Healing waiting for lock.")
    self._healing_in_progress.acquire()
    try:
        logging.debug("Healing lock acquired.")
        now = time.monotonic()
        # Space out healing attempts.
        # No point in back to back healing runs when there are
        # blocking dependencies on external resources.
        log.warning(
            "latest healing ts: %r, now-MIN_HEALING_INTERVAL: %r",
            self._latest_healing,
            now - MIN_HEALING_INTERVAL,
        )
        if self._latest_healing < now - MIN_HEALING_INTERVAL:
            # cause gst loop to exit and repair
            self._latest_healing = now
            self._stop_gst_service()
            # lets give external resources a chance to recover
            # for example wifi connection is down temporarily
            time.sleep(1)
            log.debug("AVElement healing completed.")
        else:
            log.debug(
                "Healing request ignored. "
                "Too soon after previous healing request."
            )
    finally:
        logging.debug("Healing lock released.")
        self._healing_in_progress.release()
    log.debug("Exiting healing method. %s", self.__class__.__name__)
def start(self)

Start processing input from the configured audio or video source.

Expand source code
def start(self):
    """Start processing input from the configured audio or video source."""
    super().start()
    log.info("Starting %s", self.__class__.__name__)
    self._stop_requested = False

    if self._source_conf["uri"] == "picamera":
        log.debug("Input source is picamera")
        self._run_picamera_fetch()
    elif (
        self._source_conf["uri"].startswith("http")
        and self._source_conf["type"] == "image"
    ):
        log.debug(
            """
            Input source is an http still image: %r
            Will use python requests lib for sampling.
            """,
            self._source_conf["uri"],
        )
        # use http client library to fetch still images
        self._run_http_fetch(url=self._source_conf["uri"], continuous=self._is_live)
    else:
        log.debug(
            """
            Input source is : %r
            Will use gstreamer for sampling.
            """,
            self._source_conf["uri"],
        )
        # use gstreamer for all other types of media sources
        while not self._stop_requested:
            self._run_gst_service()
            if self._gst_process_eos_reached and not self._is_live:
                # gst process reached end of its input stream
                # and this is not a live (continuous) stream loop
                # exit the avsource element loop
                log.debug(
                    "GST EOS reached for source uri: %r", self._source_conf["uri"]
                )
                break
        self._stop_gst_service()
    super().stop()
    log.info("Stopped %s", self.__class__.__name__)
def stop(self)

Stop the AV source processing loop.

Expand source code
def stop(self):
    """Stop the AV source processing loop."""
    log.info("Entering stop method ... %s", self.__class__.__name__)
    self._stop_requested = True
    super().stop()
    log.info("Exiting stop method. %s", self.__class__.__name__)

Inherited members