Module ambianic.pipeline.avsource.gst_process
Input source video processing via Gstreamer.
Expand source code
"""Input source video processing via Gstreamer."""
import logging
import os
import signal
import sys
import threading
import traceback
import gi
from ambianic.util import stacktrace
# to prevent flake8 import reordering before setting gi versions
if "gi" in sys.modules:
gi.require_version("Gst", "1.0")
gi.require_version("GstBase", "1.0")
from gi.repository import GLib, Gst # ,GObject, GLib
Gst.init(None)
# No need to call GObject.threads_init() since version 3.11
# GObject.threads_init()
log = logging.getLogger(__name__)
class GstService:
"""Streams audio/video samples from various network and local A/V sources.
Runs in a separate OS process. Reads from vadious sources and
formatts. Serves audio/video samples in a normalized format to its master
AVElement, which then passes on to the next element in the Ambianic
pipeline.
:Parameters:
----------
source_conf : URI
Source configuration. At this time URI schemes are supported such as
rtsp://host:ip/path_to_stream.
out_queue : multiprocessing.Queue
The queue where this service adds samples in a normalized format
for its master AVElement to receive and pass on to the next Ambianic
pipeline element.
"""
class ImageShape:
width = height = None
class PipelineSource:
def __init__(self, source_conf=None):
assert source_conf, "pipeline source configuration required."
assert source_conf["uri"], "pipeline source config missing uri element"
# rtsp://..., rtmp://..., http://..., file:///...
self.uri = source_conf["uri"]
# video, image, audio, auto
self.type = source_conf.get("type", "auto")
self.is_live = source_conf.get("live", False)
self.format = source_conf.get("format", None)
def __init__(
self, source_conf=None, out_queue=None, stop_signal=None, eos_reached=None
):
assert source_conf
assert out_queue
assert stop_signal
assert eos_reached
# pipeline source info
log.debug("Initializing GstService with source: %s ", source_conf)
self._out_queue = out_queue
self._stop_signal = stop_signal
self._eos_reached = eos_reached
self.source = self.PipelineSource(source_conf=source_conf)
# Reference to Gstreamer main loop structure
self.mainloop = None
# Gstreamer pipeline for a given input source
# (could be image, audio or video)
self.gst_pipeline = None
self.gst_video_source = None
self._gst_video_source_connect_id = None
# shape of the input stream image or video
self._source_shape = self.ImageShape()
self.gst_queue0 = None
self.gst_vconvert = None
self.gst_vconvert_connect_id = None
self.gst_queue1 = None
# gst_appsink handlies GStreamer callbacks
# for new media samples which it passes on to the next pipe element
self.gst_appsink = None
self._gst_appsink_connect_id = None
# indicates whether stop was requested via the API
self._stop_requested = False
self.gst_bus = None
def on_autoplug_continue(self, src_bin, src_pad, src_caps):
# print('on_autoplug_continue called for uridecodebin')
# print('src_bin: {}'.format(str(src_bin)))
# print('src_pad: {}'.format(str(src_pad)))
# print('src_caps: {}'.format(str(src_caps)))
struct = src_caps.get_structure(0)
# print("src caps struct: {}".format(struct))
self._source_shape.width = struct["width"]
self._source_shape.height = struct["height"]
if self._source_shape.width:
log.info(
"Input source width: %d, height: %d",
self._source_shape.width,
self._source_shape.height,
)
return True
def _on_bus_message_eos(self, message):
# print('GstService._handle_eos_reached')
# if its a live source uri, we will keep trying to reconnect
# otherwise end source input processing
if not self.source.is_live:
log.debug("End of stream. Exiting gstreamer loop " "for this video stream.")
self._eos_reached.set()
self._gst_cleanup()
def _on_bus_message_warning(self, message):
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
def _on_bus_message_error(self, message):
err, debug = message.parse_error()
log.warning("Error: %s: %s", err, debug)
self._gst_cleanup()
def _on_bus_message(self, bus, message, loop):
t = message.type
# print('GST: On bus message: type: %r, details: %r'
# % (message.type.get_name(message.type), message))
if t == Gst.MessageType.EOS:
self._on_bus_message_eos(message)
elif t == Gst.MessageType.WARNING:
self._on_bus_message_warning(message)
elif t == Gst.MessageType.ERROR:
self._on_bus_message_error(message)
else:
# pass
log.debug(
"GST: Ignoring bus message: type: %r, details: %r",
message.type.get_name(message.type),
message,
)
return True
def _on_new_sample_out_queue_full(self, sink):
log.debug("Out queue full, skipping sample.")
# free appsink buffer so its not blocked waiting on app pull
sink.emit("pull-sample")
return Gst.FlowReturn.OK
def _on_new_sample(self, sink):
log.debug("Input stream received new image sample.")
if self._out_queue.full():
return self._on_new_sample_out_queue_full(sink)
sample = sink.emit("pull-sample")
buf = sample.get_buffer()
caps = sample.get_caps()
struct = caps.get_structure(0)
# print("gst_appsink caps struct: {}".format(struct))
app_width = struct["width"]
app_height = struct["height"]
# print("gst_appsink(inference image) width: {}, height: {}".
# format(app_width, app_height))
result, mapinfo = buf.map(Gst.MapFlags.READ)
if result:
sample = {
"type": "image",
"format": "RGB",
"width": app_width,
"height": app_height,
"bytes": mapinfo.data,
}
log.info("GstService adding sample to out_queue.")
self._out_queue.put(sample)
buf.unmap(mapinfo)
return Gst.FlowReturn.OK
def _get_pipeline_args(self):
log.debug("Preparing Gstreamer pipeline args")
videosrc = self.source.uri
videofmt = self.source.format
if videofmt == "h264":
SRC_CAPS = "video/x-h264,framerate=30/1"
elif videofmt == "jpeg":
SRC_CAPS = "image/jpeg,framerate=30/1"
else:
SRC_CAPS = "video/x-raw,framerate=30/1"
PIPELINE_SRC = "uridecodebin uri=%s use-buffering=true" % videosrc
if videosrc.startswith("/dev/video") or videosrc.startswith(
"file:///dev/video"
):
PIPELINE_SRC = f"v4l2src device={videosrc} ! {SRC_CAPS}"
PIPELINE = """
{pipeline_src}
! {leaky_q0} ! videoconvert name=vconvert ! {sink_caps}
! {leaky_q1} ! {sink_element}
"""
# Ask gstreamer to format the images in a way that are close
# to the TF model tensor.
# Note: Having gstreamer resize doesn't appear to make
# a big performance difference.
# Need to look closer at hardware acceleration options where available.
# ,width={width},pixel-aspect-ratio=1/1'
SINK_CAPS = "video/x-raw,format=RGB"
LEAKY_Q_ = "queue2 "
LEAKY_Q0 = LEAKY_Q_ + " name=queue0"
LEAKY_Q1 = LEAKY_Q_ + " name=queue1"
SINK_ELEMENT = """
appsink name=appsink sync=false
emit-signals=true max-buffers=1 drop=true
"""
pipeline_args = PIPELINE.format(
leaky_q0=LEAKY_Q0,
leaky_q1=LEAKY_Q1,
sink_caps=SINK_CAPS,
sink_element=SINK_ELEMENT,
pipeline_src=PIPELINE_SRC,
)
log.debug("Gstreamer pipeline args: %s", pipeline_args)
print("pipeline_args ", pipeline_args)
return pipeline_args
def _set_gst_debug_level(self):
if log.getEffectiveLevel() <= logging.INFO:
# set Gst debug log level
Gst.debug_set_active(True)
Gst.debug_set_default_threshold(3)
def _build_gst_pipeline(self):
log.debug("Building new gstreamer pipeline")
pipeline_args = self._get_pipeline_args()
log.debug("Initializing gstreamer pipeline")
self.gst_pipeline = Gst.parse_launch(pipeline_args)
# self.gst_video_source = self.gst_pipeline.get_by_name('source')
# self.gst_video_source.props.uri = self.source.uri
# self.gst_video_source_connect_id = self.gst_video_source.connect(
# 'autoplug-continue', self.on_autoplug_continue)
# assert self.gst_video_source_connect_id
self.gst_queue0 = self.gst_pipeline.get_by_name("queue0")
self.gst_vconvert = self.gst_pipeline.get_by_name("vconvert")
self.gst_queue1 = self.gst_pipeline.get_by_name("queue1")
self.gst_appsink = self.gst_pipeline.get_by_name("appsink")
log.debug("appsink: %s", str(self.gst_appsink))
log.debug("appsink will emit signals: %s", self.gst_appsink.props.emit_signals)
# register to receive new image sample events from gst
self._gst_appsink_connect_id = self.gst_appsink.connect(
"new-sample", self._on_new_sample
)
self.mainloop = GLib.MainLoop()
self._set_gst_debug_level()
# Set up a pipeline bus watch to catch errors.
self.gst_bus = self.gst_pipeline.get_bus()
self.gst_bus.add_signal_watch()
self.gst_bus.connect("message", self._on_bus_message, self.mainloop)
def _gst_mainloop_run(self):
log.debug("Entering main gstreamer loop")
self.mainloop.run()
log.debug("Exited main gstreamer loop")
def _gst_pipeline_play(self):
return self.gst_pipeline.set_state(Gst.State.PLAYING)
def _gst_loop(self):
# build new gst pipeline
self._build_gst_pipeline()
# Run pipeline.
# Start playing media from source
ret = self._gst_pipeline_play()
# Check if the source is a live stream
# Ref: A network-resilient example
# https://gstreamer.freedesktop.org/documentation/tutorials/basic/streaming.html?gi-language=c
# https://lazka.github.io/pgi-docs/Gst-1.0/classes/Pipeline.html
# https://lazka.github.io/pgi-docs/Gst-1.0/enums.html#Gst.StateChangeReturn.NO_PREROLL
if ret == Gst.StateChangeReturn.FAILURE:
raise RuntimeError(
"Unable to set pipeline to playing state ", self.source.uri
)
elif ret == Gst.StateChangeReturn.NO_PREROLL:
self.source.is_live = True
log.info("Live streaming source detected: %r", self.source.uri)
else:
log.debug("Gst pipeline set_state PLAYING result: %r", ret)
self._gst_mainloop_run()
def _gst_cleanup(self):
log.debug("GST cleaning up resources...")
try:
if (
self.mainloop
and self.mainloop.is_running()
and self.gst_pipeline
and self.gst_pipeline.get_state(timeout=1)[1] != Gst.State.NULL
):
log.debug("gst pipeline still active. Terminating...")
self.gst_pipeline.set_state(Gst.State.PAUSED)
log.debug("self.gst_pipeline.set_state(Gst.State.PAUSED)")
self.gst_pipeline.set_state(Gst.State.READY)
log.debug("self.gst_pipeline.set_state(Gst.State.READY)")
# stop pipeline elements in reverse order (from last to first)
log.debug("gst_bus.remove_signal_watch()")
if self.gst_bus:
self.gst_bus.remove_signal_watch()
self.gst_bus = None
log.debug("gst_appsink.set_state(Gst.State.NULL)")
if self.gst_appsink:
self.gst_appsink.set_state(Gst.State.NULL)
# self.gst_appsink.disconnect(self._gst_appsink_connect_id)
self.gst_appsink = None
log.debug("gst_queue1.set_state(Gst.State.NULL)")
if self.gst_queue1:
self.gst_queue1.set_state(Gst.State.NULL)
# self.gst_queue.disconnect()
self.gst_queue1 = None
log.debug("gst_vconvert.set_state(Gst.State.NULL)")
if self.gst_vconvert:
self.gst_vconvert.set_state(Gst.State.NULL)
# self.gst_vconvert.disconnect(self.gst_vconvert_connect_id)
self.gst_vconvert = None
log.debug("gst_queue0.set_state(Gst.State.NULL)")
if self.gst_queue0:
self.gst_queue0.set_state(Gst.State.NULL)
# self.gst_queue.disconnect()
self.gst_queue0 = None
log.debug("gst_video_source.set_state(Gst.State.NULL)")
if self.gst_video_source:
self.gst_video_source.set_state(Gst.State.NULL)
# self.gst_video_source.disconnect(self._gst_video_source_connect_id)
self.gst_video_source = None
log.debug("gst_pipeline.set_state(Gst.State.NULL)")
self.gst_pipeline.set_state(Gst.State.NULL)
self.gst_pipeline = None
log.debug("while GLib.MainContext.default().iteration(False)")
# while GLib.MainContext.default().iteration(False):
# pass
else:
log.debug("self.gst_pipeline: %r", self.gst_pipeline)
if self.mainloop:
log.debug("gst mainloop.quit()")
self.mainloop.quit()
self.mainloop = None
else:
log.debug("mainloop: None")
except Exception as e:
log.warning("Error while cleaning up gstreamer resources: %s", str(e))
formatted_lines = traceback.format_exc().splitlines()
log.warning("Exception stack trace: %s", "\n".join(formatted_lines))
log.debug("GST clean up exiting.")
def _service_terminate(self, signum, frame):
log.info("GST service caught system terminate signal %d", signum)
if not self._stop_signal.is_set():
self._stop_signal.set()
def _stop_handler(self):
self._stop_signal.wait()
log.info("GST service received stop signal")
self._gst_cleanup()
def _register_stop_handler(self):
stop_watch_thread = threading.Thread(
name="GST stop watch thread", daemon=True, target=self._stop_handler
)
stop_watch_thread.start()
def _register_sys_signal_handler(self):
# Register the signal handlers
signal.signal(signal.SIGTERM, self._service_terminate)
signal.signal(signal.SIGINT, self._service_terminate)
def run(self):
"""Run the gstreamer pipeline service."""
log.info("Starting %s", self.__class__.__name__)
self._register_sys_signal_handler()
self._register_stop_handler()
try:
self._gst_loop()
except Exception as e:
log.warning("GST loop exited with error: %s. ", str(e))
log.warning(stacktrace())
finally:
log.debug("Gst service cleaning up before exit...")
self._gst_cleanup()
# self._out_queue.close()
log.debug("Gst service cleaned up and ready to exit.")
log.info("Stopped %s", self.__class__.__name__)
def start_gst_service(
source_conf=None, out_queue=None, stop_signal=None, eos_reached=None
):
svc = GstService(
source_conf=source_conf,
out_queue=out_queue,
stop_signal=stop_signal,
eos_reached=eos_reached,
)
# set priority level below parent process
# in order to preserve UX responsiveness
os.nice(10)
svc.run()
log.info("Exiting GST process")
Functions
def start_gst_service(source_conf=None, out_queue=None, stop_signal=None, eos_reached=None)
-
Expand source code
def start_gst_service( source_conf=None, out_queue=None, stop_signal=None, eos_reached=None ): svc = GstService( source_conf=source_conf, out_queue=out_queue, stop_signal=stop_signal, eos_reached=eos_reached, ) # set priority level below parent process # in order to preserve UX responsiveness os.nice(10) svc.run() log.info("Exiting GST process")
Classes
class GstService (source_conf=None, out_queue=None, stop_signal=None, eos_reached=None)
-
Streams audio/video samples from various network and local A/V sources.
Runs in a separate OS process. Reads from vadious sources and formatts. Serves audio/video samples in a normalized format to its master AVElement, which then passes on to the next element in the Ambianic pipeline.
:Parameters:
source_conf : URI Source configuration. At this time URI schemes are supported such as rtsp://host:ip/path_to_stream.
out_queue : multiprocessing.Queue The queue where this service adds samples in a normalized format for its master AVElement to receive and pass on to the next Ambianic pipeline element.
Expand source code
class GstService: """Streams audio/video samples from various network and local A/V sources. Runs in a separate OS process. Reads from vadious sources and formatts. Serves audio/video samples in a normalized format to its master AVElement, which then passes on to the next element in the Ambianic pipeline. :Parameters: ---------- source_conf : URI Source configuration. At this time URI schemes are supported such as rtsp://host:ip/path_to_stream. out_queue : multiprocessing.Queue The queue where this service adds samples in a normalized format for its master AVElement to receive and pass on to the next Ambianic pipeline element. """ class ImageShape: width = height = None class PipelineSource: def __init__(self, source_conf=None): assert source_conf, "pipeline source configuration required." assert source_conf["uri"], "pipeline source config missing uri element" # rtsp://..., rtmp://..., http://..., file:///... self.uri = source_conf["uri"] # video, image, audio, auto self.type = source_conf.get("type", "auto") self.is_live = source_conf.get("live", False) self.format = source_conf.get("format", None) def __init__( self, source_conf=None, out_queue=None, stop_signal=None, eos_reached=None ): assert source_conf assert out_queue assert stop_signal assert eos_reached # pipeline source info log.debug("Initializing GstService with source: %s ", source_conf) self._out_queue = out_queue self._stop_signal = stop_signal self._eos_reached = eos_reached self.source = self.PipelineSource(source_conf=source_conf) # Reference to Gstreamer main loop structure self.mainloop = None # Gstreamer pipeline for a given input source # (could be image, audio or video) self.gst_pipeline = None self.gst_video_source = None self._gst_video_source_connect_id = None # shape of the input stream image or video self._source_shape = self.ImageShape() self.gst_queue0 = None self.gst_vconvert = None self.gst_vconvert_connect_id = None self.gst_queue1 = None # gst_appsink handlies GStreamer callbacks # for new media samples which it passes on to the next pipe element self.gst_appsink = None self._gst_appsink_connect_id = None # indicates whether stop was requested via the API self._stop_requested = False self.gst_bus = None def on_autoplug_continue(self, src_bin, src_pad, src_caps): # print('on_autoplug_continue called for uridecodebin') # print('src_bin: {}'.format(str(src_bin))) # print('src_pad: {}'.format(str(src_pad))) # print('src_caps: {}'.format(str(src_caps))) struct = src_caps.get_structure(0) # print("src caps struct: {}".format(struct)) self._source_shape.width = struct["width"] self._source_shape.height = struct["height"] if self._source_shape.width: log.info( "Input source width: %d, height: %d", self._source_shape.width, self._source_shape.height, ) return True def _on_bus_message_eos(self, message): # print('GstService._handle_eos_reached') # if its a live source uri, we will keep trying to reconnect # otherwise end source input processing if not self.source.is_live: log.debug("End of stream. Exiting gstreamer loop " "for this video stream.") self._eos_reached.set() self._gst_cleanup() def _on_bus_message_warning(self, message): err, debug = message.parse_warning() log.warning("Warning: %s: %s", err, debug) def _on_bus_message_error(self, message): err, debug = message.parse_error() log.warning("Error: %s: %s", err, debug) self._gst_cleanup() def _on_bus_message(self, bus, message, loop): t = message.type # print('GST: On bus message: type: %r, details: %r' # % (message.type.get_name(message.type), message)) if t == Gst.MessageType.EOS: self._on_bus_message_eos(message) elif t == Gst.MessageType.WARNING: self._on_bus_message_warning(message) elif t == Gst.MessageType.ERROR: self._on_bus_message_error(message) else: # pass log.debug( "GST: Ignoring bus message: type: %r, details: %r", message.type.get_name(message.type), message, ) return True def _on_new_sample_out_queue_full(self, sink): log.debug("Out queue full, skipping sample.") # free appsink buffer so its not blocked waiting on app pull sink.emit("pull-sample") return Gst.FlowReturn.OK def _on_new_sample(self, sink): log.debug("Input stream received new image sample.") if self._out_queue.full(): return self._on_new_sample_out_queue_full(sink) sample = sink.emit("pull-sample") buf = sample.get_buffer() caps = sample.get_caps() struct = caps.get_structure(0) # print("gst_appsink caps struct: {}".format(struct)) app_width = struct["width"] app_height = struct["height"] # print("gst_appsink(inference image) width: {}, height: {}". # format(app_width, app_height)) result, mapinfo = buf.map(Gst.MapFlags.READ) if result: sample = { "type": "image", "format": "RGB", "width": app_width, "height": app_height, "bytes": mapinfo.data, } log.info("GstService adding sample to out_queue.") self._out_queue.put(sample) buf.unmap(mapinfo) return Gst.FlowReturn.OK def _get_pipeline_args(self): log.debug("Preparing Gstreamer pipeline args") videosrc = self.source.uri videofmt = self.source.format if videofmt == "h264": SRC_CAPS = "video/x-h264,framerate=30/1" elif videofmt == "jpeg": SRC_CAPS = "image/jpeg,framerate=30/1" else: SRC_CAPS = "video/x-raw,framerate=30/1" PIPELINE_SRC = "uridecodebin uri=%s use-buffering=true" % videosrc if videosrc.startswith("/dev/video") or videosrc.startswith( "file:///dev/video" ): PIPELINE_SRC = f"v4l2src device={videosrc} ! {SRC_CAPS}" PIPELINE = """ {pipeline_src} ! {leaky_q0} ! videoconvert name=vconvert ! {sink_caps} ! {leaky_q1} ! {sink_element} """ # Ask gstreamer to format the images in a way that are close # to the TF model tensor. # Note: Having gstreamer resize doesn't appear to make # a big performance difference. # Need to look closer at hardware acceleration options where available. # ,width={width},pixel-aspect-ratio=1/1' SINK_CAPS = "video/x-raw,format=RGB" LEAKY_Q_ = "queue2 " LEAKY_Q0 = LEAKY_Q_ + " name=queue0" LEAKY_Q1 = LEAKY_Q_ + " name=queue1" SINK_ELEMENT = """ appsink name=appsink sync=false emit-signals=true max-buffers=1 drop=true """ pipeline_args = PIPELINE.format( leaky_q0=LEAKY_Q0, leaky_q1=LEAKY_Q1, sink_caps=SINK_CAPS, sink_element=SINK_ELEMENT, pipeline_src=PIPELINE_SRC, ) log.debug("Gstreamer pipeline args: %s", pipeline_args) print("pipeline_args ", pipeline_args) return pipeline_args def _set_gst_debug_level(self): if log.getEffectiveLevel() <= logging.INFO: # set Gst debug log level Gst.debug_set_active(True) Gst.debug_set_default_threshold(3) def _build_gst_pipeline(self): log.debug("Building new gstreamer pipeline") pipeline_args = self._get_pipeline_args() log.debug("Initializing gstreamer pipeline") self.gst_pipeline = Gst.parse_launch(pipeline_args) # self.gst_video_source = self.gst_pipeline.get_by_name('source') # self.gst_video_source.props.uri = self.source.uri # self.gst_video_source_connect_id = self.gst_video_source.connect( # 'autoplug-continue', self.on_autoplug_continue) # assert self.gst_video_source_connect_id self.gst_queue0 = self.gst_pipeline.get_by_name("queue0") self.gst_vconvert = self.gst_pipeline.get_by_name("vconvert") self.gst_queue1 = self.gst_pipeline.get_by_name("queue1") self.gst_appsink = self.gst_pipeline.get_by_name("appsink") log.debug("appsink: %s", str(self.gst_appsink)) log.debug("appsink will emit signals: %s", self.gst_appsink.props.emit_signals) # register to receive new image sample events from gst self._gst_appsink_connect_id = self.gst_appsink.connect( "new-sample", self._on_new_sample ) self.mainloop = GLib.MainLoop() self._set_gst_debug_level() # Set up a pipeline bus watch to catch errors. self.gst_bus = self.gst_pipeline.get_bus() self.gst_bus.add_signal_watch() self.gst_bus.connect("message", self._on_bus_message, self.mainloop) def _gst_mainloop_run(self): log.debug("Entering main gstreamer loop") self.mainloop.run() log.debug("Exited main gstreamer loop") def _gst_pipeline_play(self): return self.gst_pipeline.set_state(Gst.State.PLAYING) def _gst_loop(self): # build new gst pipeline self._build_gst_pipeline() # Run pipeline. # Start playing media from source ret = self._gst_pipeline_play() # Check if the source is a live stream # Ref: A network-resilient example # https://gstreamer.freedesktop.org/documentation/tutorials/basic/streaming.html?gi-language=c # https://lazka.github.io/pgi-docs/Gst-1.0/classes/Pipeline.html # https://lazka.github.io/pgi-docs/Gst-1.0/enums.html#Gst.StateChangeReturn.NO_PREROLL if ret == Gst.StateChangeReturn.FAILURE: raise RuntimeError( "Unable to set pipeline to playing state ", self.source.uri ) elif ret == Gst.StateChangeReturn.NO_PREROLL: self.source.is_live = True log.info("Live streaming source detected: %r", self.source.uri) else: log.debug("Gst pipeline set_state PLAYING result: %r", ret) self._gst_mainloop_run() def _gst_cleanup(self): log.debug("GST cleaning up resources...") try: if ( self.mainloop and self.mainloop.is_running() and self.gst_pipeline and self.gst_pipeline.get_state(timeout=1)[1] != Gst.State.NULL ): log.debug("gst pipeline still active. Terminating...") self.gst_pipeline.set_state(Gst.State.PAUSED) log.debug("self.gst_pipeline.set_state(Gst.State.PAUSED)") self.gst_pipeline.set_state(Gst.State.READY) log.debug("self.gst_pipeline.set_state(Gst.State.READY)") # stop pipeline elements in reverse order (from last to first) log.debug("gst_bus.remove_signal_watch()") if self.gst_bus: self.gst_bus.remove_signal_watch() self.gst_bus = None log.debug("gst_appsink.set_state(Gst.State.NULL)") if self.gst_appsink: self.gst_appsink.set_state(Gst.State.NULL) # self.gst_appsink.disconnect(self._gst_appsink_connect_id) self.gst_appsink = None log.debug("gst_queue1.set_state(Gst.State.NULL)") if self.gst_queue1: self.gst_queue1.set_state(Gst.State.NULL) # self.gst_queue.disconnect() self.gst_queue1 = None log.debug("gst_vconvert.set_state(Gst.State.NULL)") if self.gst_vconvert: self.gst_vconvert.set_state(Gst.State.NULL) # self.gst_vconvert.disconnect(self.gst_vconvert_connect_id) self.gst_vconvert = None log.debug("gst_queue0.set_state(Gst.State.NULL)") if self.gst_queue0: self.gst_queue0.set_state(Gst.State.NULL) # self.gst_queue.disconnect() self.gst_queue0 = None log.debug("gst_video_source.set_state(Gst.State.NULL)") if self.gst_video_source: self.gst_video_source.set_state(Gst.State.NULL) # self.gst_video_source.disconnect(self._gst_video_source_connect_id) self.gst_video_source = None log.debug("gst_pipeline.set_state(Gst.State.NULL)") self.gst_pipeline.set_state(Gst.State.NULL) self.gst_pipeline = None log.debug("while GLib.MainContext.default().iteration(False)") # while GLib.MainContext.default().iteration(False): # pass else: log.debug("self.gst_pipeline: %r", self.gst_pipeline) if self.mainloop: log.debug("gst mainloop.quit()") self.mainloop.quit() self.mainloop = None else: log.debug("mainloop: None") except Exception as e: log.warning("Error while cleaning up gstreamer resources: %s", str(e)) formatted_lines = traceback.format_exc().splitlines() log.warning("Exception stack trace: %s", "\n".join(formatted_lines)) log.debug("GST clean up exiting.") def _service_terminate(self, signum, frame): log.info("GST service caught system terminate signal %d", signum) if not self._stop_signal.is_set(): self._stop_signal.set() def _stop_handler(self): self._stop_signal.wait() log.info("GST service received stop signal") self._gst_cleanup() def _register_stop_handler(self): stop_watch_thread = threading.Thread( name="GST stop watch thread", daemon=True, target=self._stop_handler ) stop_watch_thread.start() def _register_sys_signal_handler(self): # Register the signal handlers signal.signal(signal.SIGTERM, self._service_terminate) signal.signal(signal.SIGINT, self._service_terminate) def run(self): """Run the gstreamer pipeline service.""" log.info("Starting %s", self.__class__.__name__) self._register_sys_signal_handler() self._register_stop_handler() try: self._gst_loop() except Exception as e: log.warning("GST loop exited with error: %s. ", str(e)) log.warning(stacktrace()) finally: log.debug("Gst service cleaning up before exit...") self._gst_cleanup() # self._out_queue.close() log.debug("Gst service cleaned up and ready to exit.") log.info("Stopped %s", self.__class__.__name__)
Class variables
var ImageShape
var PipelineSource
Methods
def on_autoplug_continue(self, src_bin, src_pad, src_caps)
-
Expand source code
def on_autoplug_continue(self, src_bin, src_pad, src_caps): # print('on_autoplug_continue called for uridecodebin') # print('src_bin: {}'.format(str(src_bin))) # print('src_pad: {}'.format(str(src_pad))) # print('src_caps: {}'.format(str(src_caps))) struct = src_caps.get_structure(0) # print("src caps struct: {}".format(struct)) self._source_shape.width = struct["width"] self._source_shape.height = struct["height"] if self._source_shape.width: log.info( "Input source width: %d, height: %d", self._source_shape.width, self._source_shape.height, ) return True
def run(self)
-
Run the gstreamer pipeline service.
Expand source code
def run(self): """Run the gstreamer pipeline service.""" log.info("Starting %s", self.__class__.__name__) self._register_sys_signal_handler() self._register_stop_handler() try: self._gst_loop() except Exception as e: log.warning("GST loop exited with error: %s. ", str(e)) log.warning(stacktrace()) finally: log.debug("Gst service cleaning up before exit...") self._gst_cleanup() # self._out_queue.close() log.debug("Gst service cleaned up and ready to exit.") log.info("Stopped %s", self.__class__.__name__)