前言

  1. 本人最近刚接触deepstream,写这篇文章的主要目的是相互交流学习,如有不对的地方,欢迎批评指正。
  2. 代码主要根据此处进行修改,主要不同的是我使用nvstreamdemux插件进行了解复用,使得每一路的rtsp流都对应有一个rtmp流的输出。https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/tree/master/apps/runtime_source_add_delete

一、nvstreamdemux插件

源代码中使用了nvmultistreamtiler插件对多个流进行了合并,意思是多个rtsp流输入最终都只会有一个输出,想要每路rtsp流输入对应一个输出,可以使用nvstreamdemux插件进行解复用,详情可见官方文档:Gst-nvstreamdemux — DeepStream 6.1 Release documentation

二、整体管道图

整体的管道图如下: 

 如何生成管道图,可参考该博客:Gstreamer 管道可视化_papaofdoudou的博客-CSDN博客

三、代码

#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""
    rmmove tracker and output rtmp
"""

import sys

sys.path.append('../')
import gi

gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib, GstRtspServer
import sys
import random
from common.is_aarch_64 import is_aarch64


MAX_DISPLAY_LEN = 64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 4000000
GPU_ID = 1
MAX_NUM_SOURCES = 4
SINK_ELEMENT = "nveglglessink"
PGIE_CONFIG_FILE = "dstest_pgie_config.txt"
CONFIG_GPU_ID = "gpu-id"


g_num_sources = 0
g_source_id_list = [-1] * MAX_NUM_SOURCES
g_eos_list = [False] * MAX_NUM_SOURCES
g_source_enabled = [False] * MAX_NUM_SOURCES
g_source_bin_list = [None] * MAX_NUM_SOURCES

pgie_classes_str = ["Vehicle", "TwoWheeler", "Person", "RoadSign"]

uri_list = [None] * MAX_NUM_SOURCES

loop = None
pipeline = None
streammux = None
pgie = None
demux = None
demux_srcpad_list = [None] * MAX_NUM_SOURCES
nvvideoconv_list = [None] * MAX_NUM_SOURCES
nvosd_list = [None] * MAX_NUM_SOURCES
nvvidconv_postosd_list = [None] * MAX_NUM_SOURCES
caps_list = [None] * MAX_NUM_SOURCES
encoder_list = [None] * MAX_NUM_SOURCES
parse_list = [None] * MAX_NUM_SOURCES
flvmux_list = [None] * MAX_NUM_SOURCES
rtmpsink_list = [None] * MAX_NUM_SOURCES


def decodebin_child_added(child_proxy, Object, name, user_data):
    print("Decodebin child added:", name, "\n")
    if (name.find("decodebin") != -1):
        Object.connect("child-added", decodebin_child_added, user_data)
    if (name.find("nvv4l2decoder") != -1):
        if (is_aarch64()):
            Object.set_property("enable-max-performance", True)
            Object.set_property("drop-frame-interval", 0)
            Object.set_property("num-extra-surfaces", 0)
        else:
            Object.set_property("gpu_id", GPU_ID)


def cb_newpad(decodebin, pad, data):
    global streammux
    print("In cb_newpad\n")
    caps = pad.get_current_caps()
    gststruct = caps.get_structure(0)
    gstname = gststruct.get_name()

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    print("gstname=", gstname)
    if (gstname.find("video") != -1):
        source_id = data
        pad_name = "sink_%u" % source_id
        print(pad_name)
        # Get a sink pad from the streammux, link to decodebin
        sinkpad = streammux.get_request_pad(pad_name)
        if pad.link(sinkpad) == Gst.PadLinkReturn.OK:
            print("Decodebin linked to pipeline")
        else:
            sys.stderr.write("Failed to link decodebin to pipeline\n")


def create_uridecode_bin(index, filename):
    global g_source_id_list
    print("Creating uridecodebin for [%s]" % filename)

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    g_source_id_list[index] = index
    bin_name = "source-bin-%02d" % index
    print(bin_name)

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    bin = Gst.ElementFactory.make("uridecodebin", bin_name)
    if not bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    bin.set_property("uri", filename)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has been created by the decodebin
    bin.connect("pad-added", cb_newpad, g_source_id_list[index])
    bin.connect("child-added", decodebin_child_added, g_source_id_list[index])

    # Set status of the source to enabled
    g_source_enabled[index] = True

    return bin


def stop_release_source(source_id):
    global g_num_sources
    global g_source_bin_list
    global streammux
    global pipeline

    # Attempt to change status of source to be released
    state_return = g_source_bin_list[source_id].set_state(Gst.State.NULL)

    if state_return == Gst.StateChangeReturn.SUCCESS:
        print("STATE CHANGE SUCCESS\n")
        pad_name = "sink_%u" % source_id
        print(pad_name)
        # Retrieve sink pad to be released
        sinkpad = streammux.get_static_pad(pad_name)
        # Send flush stop event to the sink pad, then release from the streammux
        sinkpad.send_event(Gst.Event.new_flush_stop(False))
        streammux.release_request_pad(sinkpad)
        print("STATE CHANGE SUCCESS\n")
        # Remove the source bin from the pipeline
        pipeline.remove(g_source_bin_list[source_id])
        source_id -= 1
        g_num_sources -= 1

    elif state_return == Gst.StateChangeReturn.FAILURE:
        print("STATE CHANGE FAILURE\n")

    elif state_return == Gst.StateChangeReturn.ASYNC:
        state_return = g_source_bin_list[source_id].get_state(Gst.CLOCK_TIME_NONE)
        pad_name = "sink_%u" % source_id
        print(pad_name)
        sinkpad = streammux.get_static_pad(pad_name)
        sinkpad.send_event(Gst.Event.new_flush_stop(False))
        streammux.release_request_pad(sinkpad)
        print("STATE CHANGE ASYNC\n")
        pipeline.remove(g_source_bin_list[source_id])
        source_id -= 1
        g_num_sources -= 1


def delete_sources(data):
    global loop
    global g_num_sources
    global g_eos_list
    global g_source_enabled

    # First delete sources that have reached end of stream
    for source_id in range(MAX_NUM_SOURCES):
        if (g_eos_list[source_id] and g_source_enabled[source_id]):
            g_source_enabled[source_id] = False
            stop_release_source(source_id)

    # Quit if no sources remaining
    if (g_num_sources == 0):
        loop.quit()
        print("All sources stopped quitting")
        return False

    # Randomly choose an enabled source to delete
    source_id = random.randrange(0, MAX_NUM_SOURCES)
    while (not g_source_enabled[source_id]):
        source_id = random.randrange(0, MAX_NUM_SOURCES)
    # Disable the source
    g_source_enabled[source_id] = False
    # Release the source
    print("Calling Stop %d " % source_id)
    stop_release_source(source_id)

    # Quit if no sources remaining
    if (g_num_sources == 0):
        loop.quit()
        print("All sources stopped quitting")
        return False

    return True


def add_rtmpsink(source_id):
    global pipeline
    global nvvideoconv_list
    global nvosd_list
    global nvvidconv_postosd_list
    global caps_list
    global encoder_list
    global parse_list
    global flvmux_list
    global rtmpsink_list
    global demux
    global demux_srcpad_list

    # 经测试,在管道运行时获取不到demux的srcpad
    # print("Creating demux srcpad")
    # srcpad = demux.get_request_pad("src_%u" % source_id)
    # demux_srcpad_list[source_id] = srcpad
    # if not demux_srcpad_list[source_id]:
    #     sys.stderr.write(" Unable to get source pad of demux \n")

    print("Creating nvvidconv_%u \n " % source_id)
    nvvidoconv = Gst.ElementFactory.make("nvvideoconvert", "convertor_%u" % source_id)
    nvvidoconv.set_property("gpu_id", GPU_ID)
    nvvideoconv_list[source_id] = nvvidoconv
    if not nvvideoconv_list[source_id]:
        sys.stderr.write(" Unable to create nvvidconv \n")

    print("Creating nvosd_%u \n " % source_id)
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay_%u" % source_id)
    nvosd.set_property("gpu_id", GPU_ID)
    nvosd_list[source_id] = nvosd
    if not nvosd_list[source_id]:
        sys.stderr.write(" Unable to create nvosd \n")

    print("Creating nvvidconv_postosd_%u \n" % source_id)
    nvvidconv_postosd = Gst.ElementFactory.make("nvvideoconvert", "convertor_postosd_%u" % source_id)
    nvvidconv_postosd.set_property("gpu_id", GPU_ID)
    nvvidconv_postosd_list[source_id] = nvvidconv_postosd
    if not nvvidconv_postosd_list[source_id]:
        sys.stderr.write(" Unable to create nvvidconv_postosd \n")

    # Create a caps filter
    print("Creating caps_%u \n" % source_id)
    caps = Gst.ElementFactory.make("capsfilter", "filter_%u" % source_id)
    caps.set_property(
        "caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420")
    )
    caps_list[source_id] = caps
    if not caps_list[source_id]:
        sys.stderr.write(" Unable to create caps \n")

    # Make the encoder
    print("Creating H264 Encoder")
    encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder_%u" % source_id)
    encoder.set_property("bitrate", 4000000)
    encoder.set_property("gpu_id", GPU_ID)
    encoder_list[source_id] = encoder
    if not encoder_list[source_id]:
        sys.stderr.write(" Unable to create encoder")

    print("Creating H264 parse")
    parse = Gst.ElementFactory.make("h264parse", "h264parse_%u" % source_id)
    parse_list[source_id] = parse
    if not parse_list[source_id]:
        sys.stderr.write(" Unable to create parse")

    print("Creating flvmux")
    flvmux = Gst.ElementFactory.make("flvmux", "flvmux_%u" % source_id)
    flvmux_list[source_id] = flvmux
    if not flvmux_list[source_id]:
        sys.stderr.write(" Unable to create flvmux")

    # Make the rtmp sink
    print("Creating rtmpsink_%u \n" % source_id)
    rtmpsink = Gst.ElementFactory.make("rtmpsink", "rtmpsink_%u" % source_id)
    rtmpurl = "rtmp://ip地址/live/test_%u" % source_id
    rtmpsink.set_property("location", rtmpurl)
    print(f"rtmp url{rtmpurl}")
    rtmpsink_list[source_id] = rtmpsink
    if not rtmpsink_list[source_id]:
        sys.stderr.write(" Unable to create rtmpsink")

    '''
    When the pipeline is running, create a plugin and connect it
    '''

    print("add sink element")
    pipeline.add(nvvideoconv_list[source_id])
    pipeline.add(nvosd_list[source_id])
    pipeline.add(nvvidconv_postosd_list[source_id])
    pipeline.add(caps_list[source_id])
    pipeline.add(encoder_list[source_id])
    pipeline.add(parse_list[source_id])
    pipeline.add(flvmux_list[source_id])
    pipeline.add(rtmpsink_list[source_id])

    print("link sink element")
    sinkpad = nvvideoconv_list[source_id].get_static_pad("sink")
    if not sinkpad:
        sys.stderr.write(" Unable to get the sink pad of capsfilter \n")
    srcpad = demux_srcpad_list[source_id]
    if not srcpad:
        sys.stderr.write(" Unable to get source pad of demux \n")
    if srcpad.link(sinkpad) == Gst.PadLinkReturn.OK:
        print("demux srcpad linked to pipeline")
    nvvideoconv_list[source_id].link(nvosd_list[source_id])
    nvosd_list[source_id].link(nvvidconv_postosd_list[source_id])
    nvvidconv_postosd_list[source_id].link(caps_list[source_id])
    caps_list[source_id].link(encoder_list[source_id])
    encoder_list[source_id].link(parse_list[source_id])
    parse_list[source_id].link(flvmux_list[source_id])
    flvmux_list[source_id].link(rtmpsink_list[source_id])


def add_sources(rtsp_uri):
    global g_num_sources
    global g_source_enabled
    global g_source_bin_list
    global uri_list

    # Randomly select an un-enabled source to add
    source_id = random.randrange(0, MAX_NUM_SOURCES)
    while (g_source_enabled[source_id]):
        source_id = random.randrange(0, MAX_NUM_SOURCES)

    # Enable the source
    g_source_enabled[source_id] = True
    # add rtsp uri
    uri_list[source_id] = rtsp_uri

    print("Calling Start %d " % source_id)

    # Create a uridecode bin with the chosen source id
    source_bin = create_uridecode_bin(source_id, rtsp_uri)

    if (not source_bin):
        sys.stderr.write("Failed to create source bin. Exiting.")
        exit(1)

    # Add source bin to our list and to pipeline
    g_source_bin_list[source_id] = source_bin
    pipeline.add(source_bin)

    add_rtmpsink(source_id)

    # Set state of source bin to playing
    state_return = g_source_bin_list[source_id].set_state(Gst.State.PLAYING)

    if state_return == Gst.StateChangeReturn.SUCCESS:
        print("STATE CHANGE SUCCESS\n")
        source_id += 1

    elif state_return == Gst.StateChangeReturn.FAILURE:
        print("STATE CHANGE FAILURE\n")

    elif state_return == Gst.StateChangeReturn.ASYNC:
        state_return = g_source_bin_list[source_id].get_state(Gst.CLOCK_TIME_NONE)
        source_id += 1

    elif state_return == Gst.StateChangeReturn.NO_PREROLL:
        print("STATE CHANGE NO PREROLL\n")

    g_num_sources += 1

    # If reached the maximum number of sources, delete sources every 10 seconds
    if (g_num_sources == MAX_NUM_SOURCES):
        GLib.timeout_add_seconds(60, delete_sources, g_source_bin_list)
        return False

    return True


def bus_call(bus, message, loop):
    global g_eos_list
    t = message.type
    if t == Gst.MessageType.EOS:
        sys.stdout.write("End-of-stream\n")
        loop.quit()
    elif t == Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        sys.stderr.write("Warning: %s: %s\n" % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        sys.stderr.write("Error: %s: %s\n" % (err, debug))
        loop.quit()
    elif t == Gst.MessageType.ELEMENT:
        struct = message.get_structure()
        # Check for stream-eos message
        if struct is not None and struct.has_name("stream-eos"):
            parsed, stream_id = struct.get_uint("stream-id")
            if parsed:
                # Set eos status of stream to True, to be deleted in delete-sources
                print("Got EOS from stream %d" % stream_id)
                g_eos_list[stream_id] = True
    return True


def main(args):
    global g_num_sources
    global g_source_bin_list
    global uri_list

    global loop
    global pipeline
    global streammux
    global pgie
    global demux
    global demux_srcpad_list
    # Check input arguments
    if len(args) != 2:
        sys.stderr.write("usage: %s <uri1> \n" % args[0])
        sys.exit(1)

    num_sources = len(args[1])

    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()
    is_live = False

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
    print("Creating streammux \n ")

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    streammux.set_property("batched-push-timeout", 25000)
    streammux.set_property("batch-size", MAX_NUM_SOURCES)
    streammux.set_property("gpu_id", GPU_ID)

    streammux.set_property("live-source", 1)
    args_list = args[1]
    for i in range(num_sources):
        print("Creating source_bin ", i, " \n ")
        uri_list[i] = args_list[i]
        uri_name = uri_list[i]
        if uri_name.find("rtsp://") == 0:
            is_live = True
        # Create first source bin and add to pipeline
        source_bin = create_uridecode_bin(i, uri_name)
        if not source_bin:
            sys.stderr.write("Failed to create source bin. Exiting. \n")
            sys.exit(1)
        g_source_bin_list[i] = source_bin
        pipeline.add(source_bin)

    g_num_sources = num_sources

    print("Creating Pgie \n ")
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")

    print("Creating Demux")
    demux = Gst.ElementFactory.make("nvstreamdemux", "Stream-demuxer")
    if not demux:
        sys.stderr.write(" Unable to create NvStreamDeMux \n")

    # 在管道运行前提前创建好demux所有的srcpad
    print("Creating demux srcpad")
    for i in range(MAX_NUM_SOURCES):
        srcpad = demux.get_request_pad("src_%u" % i)
        if not srcpad:
            sys.stderr.write(" Unable to get source pad of demux \n")
        demux_srcpad_list[i] = srcpad

    if is_live:
        print("Atleast one of the sources is live")
        streammux.set_property('live-source', 1)

    # Set streammux width and height
    streammux.set_property('width', MUXER_OUTPUT_WIDTH)
    streammux.set_property('height', MUXER_OUTPUT_HEIGHT)
    pgie.set_property('config-file-path', PGIE_CONFIG_FILE)
    # Set necessary properties of the nvinfer element, the necessary ones are:
    pgie_batch_size = pgie.get_property("batch-size")
    if (pgie_batch_size < MAX_NUM_SOURCES):
        print("WARNING: Overriding infer-config batch-size", pgie_batch_size, " with number of sources ", num_sources,
              " \n")
    pgie.set_property("batch-size", 30)

    # Set gpu IDs of the inference engines
    pgie.set_property("gpu_id", GPU_ID)

    print("Adding elements to Pipeline \n")
    pipeline.add(pgie)
    pipeline.add(streammux)
    pipeline.add(demux)

    print("Linking elements in the Pipeline \n")
    streammux.link(pgie)
    pgie.link(demux)

    print("add rtmpsink")
    for i in range(num_sources):
        add_rtmpsink(i)

    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, loop)

    # List the sources
    print("Now playing...")
    for i, source in enumerate(args[1]):
        print(i, ": ", source)

    print("Starting pipeline \n")
    # start play back and listed to events
    pipeline.set_state(Gst.State.PLAYING)

    GLib.timeout_add_seconds(15, add_sources, "rtsp://rtsp地址?channel=1&subtype=0")

    try:
        loop.run()
    except:
        pass
    # cleanup
    print('create pipline')
    # 生成管道图
    Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, "pipline_my_src_add_del_rtmp2")
    print("Exiting app\n")
    pipeline.set_state(Gst.State.NULL)


if __name__ == '__main__':
    args = ['my_src_add_del.py', ["rtsp://rtsp地址?channel=1&subtype=0",
                                  "rtsp://rtsp地址?channel=1&subtype=0"]]
    sys.exit(main(args))

问题

  1. 我在管道运行状态创建不了demux的srcpad,所以我在管道运行前就根据max_num把所有的srcpad都创建好,但为啥ch_newpad回调函数中,就能动态的创建streammux的sinkpad?
            # Get a sink pad from the streammux, link to decodebin
            sinkpad = streammux.get_request_pad(pad_name)
  2. 管道的后半部分(如下图),应该也需要一个bin对其进行封装,并且在删除输入源时,也应该把这部分从管道中移除,就像uridecodebin那样。但应该用什么bin,具体怎么封装,还在学习中。。希望有大佬指点指点。

 

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐