deepstream构建动态管道,在运行时添加和删除rtsp源
deepstream动态添加和删除rtsp源,并使用nvstreamdemux插件进行解复用,rtmpsink插件输出rtmp流
·
前言
- 本人最近刚接触deepstream,写这篇文章的主要目的是相互交流学习,如有不对的地方,欢迎批评指正。
- 代码主要根据此处进行修改,主要不同的是我使用nvstreamdemux插件进行了解复用,使得每一路的rtsp流都对应有一个rtmp流的输出。https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/tree/master/apps/runtime_source_add_delete
一、nvstreamdemux插件
源代码中使用了nvmultistreamtiler插件对多个流进行了合并,意思是多个rtsp流输入最终都只会有一个输出,想要每路rtsp流输入对应一个输出,可以使用nvstreamdemux插件进行解复用,详情可见官方文档:Gst-nvstreamdemux — DeepStream 6.1 Release documentation
二、整体管道图
整体的管道图如下:
如何生成管道图,可参考该博客:Gstreamer 管道可视化_papaofdoudou的博客-CSDN博客
三、代码
#!/usr/bin/env python3
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
rmmove tracker and output rtmp
"""
import sys
sys.path.append('../')
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib, GstRtspServer
import sys
import random
from common.is_aarch_64 import is_aarch64
MAX_DISPLAY_LEN = 64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 4000000
GPU_ID = 1
MAX_NUM_SOURCES = 4
SINK_ELEMENT = "nveglglessink"
PGIE_CONFIG_FILE = "dstest_pgie_config.txt"
CONFIG_GPU_ID = "gpu-id"
g_num_sources = 0
g_source_id_list = [-1] * MAX_NUM_SOURCES
g_eos_list = [False] * MAX_NUM_SOURCES
g_source_enabled = [False] * MAX_NUM_SOURCES
g_source_bin_list = [None] * MAX_NUM_SOURCES
pgie_classes_str = ["Vehicle", "TwoWheeler", "Person", "RoadSign"]
uri_list = [None] * MAX_NUM_SOURCES
loop = None
pipeline = None
streammux = None
pgie = None
demux = None
demux_srcpad_list = [None] * MAX_NUM_SOURCES
nvvideoconv_list = [None] * MAX_NUM_SOURCES
nvosd_list = [None] * MAX_NUM_SOURCES
nvvidconv_postosd_list = [None] * MAX_NUM_SOURCES
caps_list = [None] * MAX_NUM_SOURCES
encoder_list = [None] * MAX_NUM_SOURCES
parse_list = [None] * MAX_NUM_SOURCES
flvmux_list = [None] * MAX_NUM_SOURCES
rtmpsink_list = [None] * MAX_NUM_SOURCES
def decodebin_child_added(child_proxy, Object, name, user_data):
print("Decodebin child added:", name, "\n")
if (name.find("decodebin") != -1):
Object.connect("child-added", decodebin_child_added, user_data)
if (name.find("nvv4l2decoder") != -1):
if (is_aarch64()):
Object.set_property("enable-max-performance", True)
Object.set_property("drop-frame-interval", 0)
Object.set_property("num-extra-surfaces", 0)
else:
Object.set_property("gpu_id", GPU_ID)
def cb_newpad(decodebin, pad, data):
global streammux
print("In cb_newpad\n")
caps = pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
# Need to check if the pad created by the decodebin is for video and not
# audio.
print("gstname=", gstname)
if (gstname.find("video") != -1):
source_id = data
pad_name = "sink_%u" % source_id
print(pad_name)
# Get a sink pad from the streammux, link to decodebin
sinkpad = streammux.get_request_pad(pad_name)
if pad.link(sinkpad) == Gst.PadLinkReturn.OK:
print("Decodebin linked to pipeline")
else:
sys.stderr.write("Failed to link decodebin to pipeline\n")
def create_uridecode_bin(index, filename):
global g_source_id_list
print("Creating uridecodebin for [%s]" % filename)
# Create a source GstBin to abstract this bin's content from the rest of the
# pipeline
g_source_id_list[index] = index
bin_name = "source-bin-%02d" % index
print(bin_name)
# Source element for reading from the uri.
# We will use decodebin and let it figure out the container format of the
# stream and the codec and plug the appropriate demux and decode plugins.
bin = Gst.ElementFactory.make("uridecodebin", bin_name)
if not bin:
sys.stderr.write(" Unable to create uri decode bin \n")
# We set the input uri to the source element
bin.set_property("uri", filename)
# Connect to the "pad-added" signal of the decodebin which generates a
# callback once a new pad for raw data has been created by the decodebin
bin.connect("pad-added", cb_newpad, g_source_id_list[index])
bin.connect("child-added", decodebin_child_added, g_source_id_list[index])
# Set status of the source to enabled
g_source_enabled[index] = True
return bin
def stop_release_source(source_id):
global g_num_sources
global g_source_bin_list
global streammux
global pipeline
# Attempt to change status of source to be released
state_return = g_source_bin_list[source_id].set_state(Gst.State.NULL)
if state_return == Gst.StateChangeReturn.SUCCESS:
print("STATE CHANGE SUCCESS\n")
pad_name = "sink_%u" % source_id
print(pad_name)
# Retrieve sink pad to be released
sinkpad = streammux.get_static_pad(pad_name)
# Send flush stop event to the sink pad, then release from the streammux
sinkpad.send_event(Gst.Event.new_flush_stop(False))
streammux.release_request_pad(sinkpad)
print("STATE CHANGE SUCCESS\n")
# Remove the source bin from the pipeline
pipeline.remove(g_source_bin_list[source_id])
source_id -= 1
g_num_sources -= 1
elif state_return == Gst.StateChangeReturn.FAILURE:
print("STATE CHANGE FAILURE\n")
elif state_return == Gst.StateChangeReturn.ASYNC:
state_return = g_source_bin_list[source_id].get_state(Gst.CLOCK_TIME_NONE)
pad_name = "sink_%u" % source_id
print(pad_name)
sinkpad = streammux.get_static_pad(pad_name)
sinkpad.send_event(Gst.Event.new_flush_stop(False))
streammux.release_request_pad(sinkpad)
print("STATE CHANGE ASYNC\n")
pipeline.remove(g_source_bin_list[source_id])
source_id -= 1
g_num_sources -= 1
def delete_sources(data):
global loop
global g_num_sources
global g_eos_list
global g_source_enabled
# First delete sources that have reached end of stream
for source_id in range(MAX_NUM_SOURCES):
if (g_eos_list[source_id] and g_source_enabled[source_id]):
g_source_enabled[source_id] = False
stop_release_source(source_id)
# Quit if no sources remaining
if (g_num_sources == 0):
loop.quit()
print("All sources stopped quitting")
return False
# Randomly choose an enabled source to delete
source_id = random.randrange(0, MAX_NUM_SOURCES)
while (not g_source_enabled[source_id]):
source_id = random.randrange(0, MAX_NUM_SOURCES)
# Disable the source
g_source_enabled[source_id] = False
# Release the source
print("Calling Stop %d " % source_id)
stop_release_source(source_id)
# Quit if no sources remaining
if (g_num_sources == 0):
loop.quit()
print("All sources stopped quitting")
return False
return True
def add_rtmpsink(source_id):
global pipeline
global nvvideoconv_list
global nvosd_list
global nvvidconv_postosd_list
global caps_list
global encoder_list
global parse_list
global flvmux_list
global rtmpsink_list
global demux
global demux_srcpad_list
# 经测试,在管道运行时获取不到demux的srcpad
# print("Creating demux srcpad")
# srcpad = demux.get_request_pad("src_%u" % source_id)
# demux_srcpad_list[source_id] = srcpad
# if not demux_srcpad_list[source_id]:
# sys.stderr.write(" Unable to get source pad of demux \n")
print("Creating nvvidconv_%u \n " % source_id)
nvvidoconv = Gst.ElementFactory.make("nvvideoconvert", "convertor_%u" % source_id)
nvvidoconv.set_property("gpu_id", GPU_ID)
nvvideoconv_list[source_id] = nvvidoconv
if not nvvideoconv_list[source_id]:
sys.stderr.write(" Unable to create nvvidconv \n")
print("Creating nvosd_%u \n " % source_id)
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay_%u" % source_id)
nvosd.set_property("gpu_id", GPU_ID)
nvosd_list[source_id] = nvosd
if not nvosd_list[source_id]:
sys.stderr.write(" Unable to create nvosd \n")
print("Creating nvvidconv_postosd_%u \n" % source_id)
nvvidconv_postosd = Gst.ElementFactory.make("nvvideoconvert", "convertor_postosd_%u" % source_id)
nvvidconv_postosd.set_property("gpu_id", GPU_ID)
nvvidconv_postosd_list[source_id] = nvvidconv_postosd
if not nvvidconv_postosd_list[source_id]:
sys.stderr.write(" Unable to create nvvidconv_postosd \n")
# Create a caps filter
print("Creating caps_%u \n" % source_id)
caps = Gst.ElementFactory.make("capsfilter", "filter_%u" % source_id)
caps.set_property(
"caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420")
)
caps_list[source_id] = caps
if not caps_list[source_id]:
sys.stderr.write(" Unable to create caps \n")
# Make the encoder
print("Creating H264 Encoder")
encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder_%u" % source_id)
encoder.set_property("bitrate", 4000000)
encoder.set_property("gpu_id", GPU_ID)
encoder_list[source_id] = encoder
if not encoder_list[source_id]:
sys.stderr.write(" Unable to create encoder")
print("Creating H264 parse")
parse = Gst.ElementFactory.make("h264parse", "h264parse_%u" % source_id)
parse_list[source_id] = parse
if not parse_list[source_id]:
sys.stderr.write(" Unable to create parse")
print("Creating flvmux")
flvmux = Gst.ElementFactory.make("flvmux", "flvmux_%u" % source_id)
flvmux_list[source_id] = flvmux
if not flvmux_list[source_id]:
sys.stderr.write(" Unable to create flvmux")
# Make the rtmp sink
print("Creating rtmpsink_%u \n" % source_id)
rtmpsink = Gst.ElementFactory.make("rtmpsink", "rtmpsink_%u" % source_id)
rtmpurl = "rtmp://ip地址/live/test_%u" % source_id
rtmpsink.set_property("location", rtmpurl)
print(f"rtmp url{rtmpurl}")
rtmpsink_list[source_id] = rtmpsink
if not rtmpsink_list[source_id]:
sys.stderr.write(" Unable to create rtmpsink")
'''
When the pipeline is running, create a plugin and connect it
'''
print("add sink element")
pipeline.add(nvvideoconv_list[source_id])
pipeline.add(nvosd_list[source_id])
pipeline.add(nvvidconv_postosd_list[source_id])
pipeline.add(caps_list[source_id])
pipeline.add(encoder_list[source_id])
pipeline.add(parse_list[source_id])
pipeline.add(flvmux_list[source_id])
pipeline.add(rtmpsink_list[source_id])
print("link sink element")
sinkpad = nvvideoconv_list[source_id].get_static_pad("sink")
if not sinkpad:
sys.stderr.write(" Unable to get the sink pad of capsfilter \n")
srcpad = demux_srcpad_list[source_id]
if not srcpad:
sys.stderr.write(" Unable to get source pad of demux \n")
if srcpad.link(sinkpad) == Gst.PadLinkReturn.OK:
print("demux srcpad linked to pipeline")
nvvideoconv_list[source_id].link(nvosd_list[source_id])
nvosd_list[source_id].link(nvvidconv_postosd_list[source_id])
nvvidconv_postosd_list[source_id].link(caps_list[source_id])
caps_list[source_id].link(encoder_list[source_id])
encoder_list[source_id].link(parse_list[source_id])
parse_list[source_id].link(flvmux_list[source_id])
flvmux_list[source_id].link(rtmpsink_list[source_id])
def add_sources(rtsp_uri):
global g_num_sources
global g_source_enabled
global g_source_bin_list
global uri_list
# Randomly select an un-enabled source to add
source_id = random.randrange(0, MAX_NUM_SOURCES)
while (g_source_enabled[source_id]):
source_id = random.randrange(0, MAX_NUM_SOURCES)
# Enable the source
g_source_enabled[source_id] = True
# add rtsp uri
uri_list[source_id] = rtsp_uri
print("Calling Start %d " % source_id)
# Create a uridecode bin with the chosen source id
source_bin = create_uridecode_bin(source_id, rtsp_uri)
if (not source_bin):
sys.stderr.write("Failed to create source bin. Exiting.")
exit(1)
# Add source bin to our list and to pipeline
g_source_bin_list[source_id] = source_bin
pipeline.add(source_bin)
add_rtmpsink(source_id)
# Set state of source bin to playing
state_return = g_source_bin_list[source_id].set_state(Gst.State.PLAYING)
if state_return == Gst.StateChangeReturn.SUCCESS:
print("STATE CHANGE SUCCESS\n")
source_id += 1
elif state_return == Gst.StateChangeReturn.FAILURE:
print("STATE CHANGE FAILURE\n")
elif state_return == Gst.StateChangeReturn.ASYNC:
state_return = g_source_bin_list[source_id].get_state(Gst.CLOCK_TIME_NONE)
source_id += 1
elif state_return == Gst.StateChangeReturn.NO_PREROLL:
print("STATE CHANGE NO PREROLL\n")
g_num_sources += 1
# If reached the maximum number of sources, delete sources every 10 seconds
if (g_num_sources == MAX_NUM_SOURCES):
GLib.timeout_add_seconds(60, delete_sources, g_source_bin_list)
return False
return True
def bus_call(bus, message, loop):
global g_eos_list
t = message.type
if t == Gst.MessageType.EOS:
sys.stdout.write("End-of-stream\n")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write("Warning: %s: %s\n" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write("Error: %s: %s\n" % (err, debug))
loop.quit()
elif t == Gst.MessageType.ELEMENT:
struct = message.get_structure()
# Check for stream-eos message
if struct is not None and struct.has_name("stream-eos"):
parsed, stream_id = struct.get_uint("stream-id")
if parsed:
# Set eos status of stream to True, to be deleted in delete-sources
print("Got EOS from stream %d" % stream_id)
g_eos_list[stream_id] = True
return True
def main(args):
global g_num_sources
global g_source_bin_list
global uri_list
global loop
global pipeline
global streammux
global pgie
global demux
global demux_srcpad_list
# Check input arguments
if len(args) != 2:
sys.stderr.write("usage: %s <uri1> \n" % args[0])
sys.exit(1)
num_sources = len(args[1])
# Standard GStreamer initialization
Gst.init(None)
# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
print("Creating streammux \n ")
# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
streammux.set_property("batched-push-timeout", 25000)
streammux.set_property("batch-size", MAX_NUM_SOURCES)
streammux.set_property("gpu_id", GPU_ID)
streammux.set_property("live-source", 1)
args_list = args[1]
for i in range(num_sources):
print("Creating source_bin ", i, " \n ")
uri_list[i] = args_list[i]
uri_name = uri_list[i]
if uri_name.find("rtsp://") == 0:
is_live = True
# Create first source bin and add to pipeline
source_bin = create_uridecode_bin(i, uri_name)
if not source_bin:
sys.stderr.write("Failed to create source bin. Exiting. \n")
sys.exit(1)
g_source_bin_list[i] = source_bin
pipeline.add(source_bin)
g_num_sources = num_sources
print("Creating Pgie \n ")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie \n")
print("Creating Demux")
demux = Gst.ElementFactory.make("nvstreamdemux", "Stream-demuxer")
if not demux:
sys.stderr.write(" Unable to create NvStreamDeMux \n")
# 在管道运行前提前创建好demux所有的srcpad
print("Creating demux srcpad")
for i in range(MAX_NUM_SOURCES):
srcpad = demux.get_request_pad("src_%u" % i)
if not srcpad:
sys.stderr.write(" Unable to get source pad of demux \n")
demux_srcpad_list[i] = srcpad
if is_live:
print("Atleast one of the sources is live")
streammux.set_property('live-source', 1)
# Set streammux width and height
streammux.set_property('width', MUXER_OUTPUT_WIDTH)
streammux.set_property('height', MUXER_OUTPUT_HEIGHT)
pgie.set_property('config-file-path', PGIE_CONFIG_FILE)
# Set necessary properties of the nvinfer element, the necessary ones are:
pgie_batch_size = pgie.get_property("batch-size")
if (pgie_batch_size < MAX_NUM_SOURCES):
print("WARNING: Overriding infer-config batch-size", pgie_batch_size, " with number of sources ", num_sources,
" \n")
pgie.set_property("batch-size", 30)
# Set gpu IDs of the inference engines
pgie.set_property("gpu_id", GPU_ID)
print("Adding elements to Pipeline \n")
pipeline.add(pgie)
pipeline.add(streammux)
pipeline.add(demux)
print("Linking elements in the Pipeline \n")
streammux.link(pgie)
pgie.link(demux)
print("add rtmpsink")
for i in range(num_sources):
add_rtmpsink(i)
# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
# List the sources
print("Now playing...")
for i, source in enumerate(args[1]):
print(i, ": ", source)
print("Starting pipeline \n")
# start play back and listed to events
pipeline.set_state(Gst.State.PLAYING)
GLib.timeout_add_seconds(15, add_sources, "rtsp://rtsp地址?channel=1&subtype=0")
try:
loop.run()
except:
pass
# cleanup
print('create pipline')
# 生成管道图
Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, "pipline_my_src_add_del_rtmp2")
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)
if __name__ == '__main__':
args = ['my_src_add_del.py', ["rtsp://rtsp地址?channel=1&subtype=0",
"rtsp://rtsp地址?channel=1&subtype=0"]]
sys.exit(main(args))
问题
- 我在管道运行状态创建不了demux的srcpad,所以我在管道运行前就根据max_num把所有的srcpad都创建好,但为啥ch_newpad回调函数中,就能动态的创建streammux的sinkpad?
# Get a sink pad from the streammux, link to decodebin sinkpad = streammux.get_request_pad(pad_name)
- 管道的后半部分(如下图),应该也需要一个bin对其进行封装,并且在删除输入源时,也应该把这部分从管道中移除,就像uridecodebin那样。但应该用什么bin,具体怎么封装,还在学习中。。希望有大佬指点指点。

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)