自学内容网 自学内容网

CNStream流处理多路并发Pipeline框架相关问题整理:Pipeline整体流程、数据传输、多路并发

目录

1 CNStream之前博客汇总

1.1 Pipeline中的EventBus

1.2 Pipeline中的内存池

1.3 Pipeline中的视频解码流程分析

1.4 Pipeline中的视频编码流程分析

1.5 Pipeline中的反射机制

1.6 Pipeline中的单例模式代码

1.7 怎么将CNStream适配到NVIDIA Jetson Orin

2 构建Pipeline整体流程

2.1 new Pipeline("TACLPipeline")

2.2 BuildPipelineByJSONFile

2.2.1 graph_config.ParseByJSONFile(config_file)

2.2.2  BuildPipeline(graph_config);

3 不同module之间怎么传递数据

4 怎么区分多路不同流----用stream_id

5 std::shared_ptr data里面的stream_id是哪里来的

参考文献:


1 CNStream之前博客汇总

1.1 Pipeline中的EventBus

见我以前的博客: 

EventBus的C++实现、代码分析-CSDN博客

1.2 Pipeline中的内存池

见我以前的博客: 

C++内存池Memory Pool的高级实现、代码详解、CMake构建工程、应用实例-CSDN博客

1.3 Pipeline中的视频解码流程分析

见我以前的博客: 

aclStream流处理多路并发Pipeline框架中 视频解码 代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理_acl 多模型 多视频流处理-CSDN博客

1.4 Pipeline中的视频编码流程分析

见我以前的博客:  

aclStream流处理多路并发Pipeline框架中VEncode Module代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理-CSDN博客

1.5 Pipeline中的反射机制

见我以前的博客:

CNStream代码中C++反射机制的使用-CSDN博客

1.6 Pipeline中的单例模式代码

见我以前的博客:

C++单例模式代码实现与分析-CSDN博客

1.7 怎么将CNStream适配到NVIDIA Jetson Orin

见我以前的博客: 

完整指南:CNStream流处理多路并发框架适配到NVIDIA Jetson Orin (一) 依赖库编译、第三方库编译安装_cnstream编译-CSDN博客

完整指南:CNStream流处理多路并发框架适配到NVIDIA Jetson Orin (二) 源码架构流程梳理、代码编写_cnstreamer-CSDN博客

完整指南:CNStream流处理多路并发框架适配到NVIDIA Jetson Orin (三) 代码编译、各种问题解决、代码修改-CSDN博客

完整指南:CNStream流处理多路并发框架适配到NVIDIA Jetson Orin (四) 运行、调试、各种问题解决_mpeg 并发处理-CSDN博客

2 构建Pipeline整体流程

直接从main函数开始看


int main(int argc, char** argv) {
    gflags::ParseCommandLineFlags(&argc, &argv, false);

    FLAGS_stderrthreshold = google::INFO;
    FLAGS_colorlogtostderr = true;
    FLAGS_log_dir = "logs";
    FLAGS_max_log_size = 100; //最大日志大小(MB�? 如果设置�?将默认为1
    FLAGS_stop_logging_if_full_disk = true;

    path_check(FLAGS_log_dir);

    google::InitGoogleLogging("taclstream");

    LOGI(APP) << "TACLSTREAM VERSION:" << cnstream::VersionString();

    int rt = general_init();
    if (rt != ERROR_OK) {
        LOGE(MAIN) << "general_init failed";
        exit(rt);
    }

    pause();

    LOGI(MAIN) << "TACLStream server exit after pause!";
    google::ShutdownGoogleLogging();

    return EXIT_SUCCESS;
}

然后看general_init()函数


int general_init() {
    int ret = build_pipeline(FLAGS_config_fname);
    if (ret != 0) {
        LOGE(MAIN) << "build pipeline failed!";
        return ret;
    }
    LOGI(MAIN) << "build pipeline succeed.";

    std::string file_service_host;
    int file_service_port{};
    if (enable_file_service(file_service_host, file_service_port)) {
        LOGI(MAIN) << "file service start succeed. port is " << file_service_port;
    }else {
        LOGW(MAIN) << "not support file service.";
    }

    int http_server_port = file_service_port;

    //节点信息
    g_node_info.node_name = "tcnstream";
    g_node_info.node_ip = file_service_host;
    g_node_info.node_port = http_server_port;

    try{
        httplib_service_start(http_server_port);
        LOGI(MAIN) << "http server start succeed, server port is " << http_server_port << ".";
    }
    catch (GeneralException2& e) {
        httplib_service_close();
        LOGE(TCNSTREAM) << "init failed: " << e.err_code() << "," << e.err_str();
        return e.err_code();
    }

    // 启动时检查本地是否有缓存任务文件
    restart_session_from_local();

    start_profiler();

    // 启动保活线程
    std::thread keepliveTh([]() {
      while (true) {
        std::this_thread::sleep_for(std::chrono::seconds(30));
        post_keepalive_msg();
      }
    });
    keepliveTh.detach();

    return ERROR_OK;
}

然后看build_pipeline(FLAGS_config_fname);函数

int build_pipeline(string const& config) {
    s_pipeline = std::shared_ptr<Pipeline>(new Pipeline("TACLPipeline"), [](Pipeline* pipeline) { pipeline->Stop(); });
    if (s_pipeline == nullptr) {
        return ERROR_ALLOC_MEMORY;
    }

    //build pipeline
    if (!s_pipeline->BuildPipelineByJSONFile(config)) {
        LOGE(SESSION_MGR) << "Build pipeline failed.";
        return EXIT_FAILURE;
    }

    //message observer
    static MsgObserver msg_observer(s_pipeline.get(), g_source_name);
    s_pipeline->SetStreamMsgObserver(reinterpret_cast<cnstream::StreamMsgObserver*>(&msg_observer));

    s_pipeline->GetEventBus()->AddBusWatch([&](const Event & event)->EventHandleFlag{
        if(event.module_name == "TuringPipeline/decode/source" && EventType::EVENT_EOS == event.type){
            LOGI(SESSION_MGR) << "recv bus event: " << static_cast<int8_t>(event.type) << "_" << event.module_name << "_" << event.stream_id << "_" << event.message;
            {
                std::string run_msg_str;
                std::unique_lock<std::mutex> locker(s_session_locker);
                auto it = s_task_sessions.find(event.stream_id);
                if (it != s_task_sessions.end()) {
                    LOGE(SESSION_MGR) << event.stream_id << "stream interruption!!!!!!!!!!";
                    //如果是文件就直接停止,如果不是文件就进入重连机制
                    if(it->second->get_video_type() == EFILE)
                    {
                        it->second->stop(g_source_name);
                        run_msg_str = it->second->post_end_mark();

                        LOGI(SESSION_MGR) << "run msg is: " << run_msg_str;

                        //puglish msg
                        cnstream::ActiveMQ* activemq_run_msg = dynamic_cast<cnstream::ActiveMQ*>(s_pipeline->GetModule(g_activemq_name));
                        if (activemq_run_msg != nullptr && run_msg_str != "") {
                            activemq_run_msg->PublishEvent(run_msg_str);
                        }

                        s_task_sessions.erase(it->first);
                    }
                    else if(it->second->get_s_state_() != S_USER_CLOSE)
                    {
                        it->second->set_s_state_(S_EXCEPTION_CLOSE);
                    }
                }
            }
        }
    });

    //start pipeline
    if (!s_pipeline->Start()) {
        LOGE(SESSION_MGR) << "Pipeline start failed.";
        return EXIT_FAILURE;
    }
    return ERROR_OK;
}

这里面重点就看3个函数

  • new Pipeline("TACLPipeline")
  • BuildPipelineByJSONFile
  • s_pipeline->Start()

2.1 new Pipeline("TACLPipeline")

其中构造函数如下

Pipeline::Pipeline(const std::string& name) : name_(name) {
  // stream message handle thread
  exit_msg_loop_ = false;
  smsg_thread_ = std::thread(&Pipeline::StreamMsgHandleFunc, this);

  event_bus_.reset(new (std::nothrow) EventBus());
  LOGF_IF(CORE, nullptr == event_bus_) << "Pipeline::Pipeline() failed to alloc EventBus";
  GetEventBus()->AddBusWatch(std::bind(&Pipeline::DefaultBusWatch, this, std::placeholders::_1));

  idxManager_.reset(new (std::nothrow) IdxManager());
  LOGF_IF(CORE, nullptr == idxManager_) << "Pipeline::Pipeline() failed to alloc IdxManager";

  graph_.reset(new (std::nothrow) CNGraph<NodeContext>());
  LOGF_IF(CORE, nullptr == graph_) << "Pipeline::Pipeline() failed to alloc CNGraph";
}

前面两个都是跟eventbus相关的,然后IdxManager是用来管理stream_id的,然后最下面的CNGraph就是图,这里创建图的时候传入的模板类型是


/**
 * @brief The node context used by pipeline.
 */
struct NodeContext {
  std::shared_ptr<Module> module;
  std::shared_ptr<Connector> connector;
  uint64_t parent_nodes_mask = 0;
  uint64_t route_mask = 0;  // for head nodes
  // for gets node instance by a module, see Module::context_;
  std::weak_ptr<CNGraph<NodeContext>::CNNode> node;
};

这里面的Connector就是用来连接不同module的,

class Connector : private NonCopyable {
 public:
  /**
   * @brief Connector constructor.
   * @param
   *   [conveyor_count]: the conveyor num of this connector.
   *   [conveyor_capacity]: the maximum buffer number of a conveyor.
   */
  explicit Connector(const size_t conveyor_count, size_t conveyor_capacity = 20);
  ~Connector();

  const size_t GetConveyorCount() const;
  size_t GetConveyorCapacity() const;
  bool IsConveyorFull(int conveyor_idx) const;
  bool IsConveyorEmpty(int conveyor_idx) const;
  size_t GetConveyorSize(int conveyor_idx) const;
  uint64_t GetFailTime(int conveyor_idx) const;

  CNFrameInfoPtr PopDataBufferFromConveyor(int conveyor_idx);
  bool PushDataBufferToConveyor(int conveyor_idx, CNFrameInfoPtr data);

  void Start();
  void Stop();
  bool IsStopped();
  void EmptyDataQueue();

 private:
  Conveyor* GetConveyorByIdx(int idx) const;
  Conveyor* GetConveyor(int conveyor_idx) const;

  std::vector<Conveyor*> conveyors_;
  size_t conveyor_capacity_ = 20;
  std::vector<uint64_t> fail_times_;
  std::atomic<bool> stop_{false};
};  // class Connector

Connector里面有个conveyors_成员,这个conveyors_可以看到是个vector,那就说明有多个,为什么有多个,是因为比如我们有很多路流,那么两个module之间连接时要有多个conveyor,每个conveyor负责一路流的传输,然后这里的conveyor_capacity_就是指队列的长度不能超过20,


/**
 * @brief Conveyor is used to transmit data between two modules.
 *
 * Conveyor belongs to Connector.
 * Each Connect could have several conveyors which depends on the paramllelism of each module.
 *
 * Conveyor has one buffer queue for transmitting data from one module to another.
 * The upstream node module will push data to buffer queue, and the downstream node will pop data from buffer queue.
 *
 * The capacity of buffer queue could be set in configuration json file (see README for more information of
 * configuration json file). If there is no element in buffer queue, the downstream node will wait to pop and
 * be blocked. On contrary, if the queue is full, the upstream node will wait to push and be blocked.
 */
class Conveyor : private NonCopyable {
 public:
  explicit Conveyor(size_t max_size);
  ~Conveyor() = default;
  bool PushDataBuffer(CNFrameInfoPtr data);
  CNFrameInfoPtr PopDataBuffer();
  std::vector<CNFrameInfoPtr> PopAllDataBuffer();
  uint32_t GetBufferSize();
  uint64_t GetFailTime();

#ifdef UNIT_TEST
 public:  // NOLINT
#else
 private:  // NOLINT
#endif

 private:
  std::queue<CNFrameInfoPtr> dataq_;
  size_t max_size_;
  uint64_t fail_time_ = 0;
  std::mutex data_mutex_;
  std::condition_variable notempty_cond_;
  const std::chrono::milliseconds rel_time_{20};
};  // class Conveyor

这个Conveyor类就是负责具体的传输数据的。

关于Pipeline的构造函数就先看这么多,图的东西在后面的两个函数中再看。

2.2 BuildPipelineByJSONFile

inline bool Pipeline::BuildPipelineByJSONFile(const std::string& config_file) {
  CNGraphConfig graph_config;
  if (!graph_config.ParseByJSONFile(config_file)) {
    LOGE(CORE) << "Parse graph config file failed.";
    return false;
  }
  return BuildPipeline(graph_config);
}

2.2.1 graph_config.ParseByJSONFile(config_file)

这里首先是解析json文件,

bool CNConfigBase::ParseByJSONFile(const std::string& jfile) {
  printf("jfile is %s\n", jfile.c_str());
  std::ifstream ifs(jfile);
  if (!ifs.is_open()) {
    LOGE(CORE) << "Config file open failed :" << jfile;
    return false;
  }
  std::string jstr((std::istreambuf_iterator<char>(ifs)), std::istreambuf_iterator<char>());
  ifs.close();
  config_root_dir = GetPathDir(jfile);
  if (!ParseByJSONStr(jstr)) {
    return false;
  }
  return true;
}

然后看


bool CNGraphConfig::ParseByJSONStr(const std::string& json_str) {
  rapidjson::Document doc;
  if (doc.Parse<rapidjson::kParseCommentsFlag>(json_str.c_str()).HasParseError()) {
    LOGE(CORE) << "Parse graph configuration failed. Error code [" << std::to_string(doc.GetParseError()) << "]"
               << " Offset [" << std::to_string(doc.GetErrorOffset()) << "]. ";
    return false;
  }

  // traversing config items
  for (rapidjson::Document::ConstMemberIterator iter = doc.MemberBegin(); iter != doc.MemberEnd(); ++iter) {
    rapidjson::StringBuffer sbuf;
    rapidjson::Writer<rapidjson::StringBuffer> jwriter(sbuf);
    iter->value.Accept(jwriter);

    std::string item_name = iter->name.GetString();
    std::string item_value = sbuf.GetString();

    if (IsProfilerItem(item_name)) {
      // parse if profiler config
      if (!profiler_config.ParseByJSONStr(item_value)) {
        LOGE(CORE) << "Parse profiler config failed.";
        return false;
      }
    } else if (IsSubgraphItem(item_name)) {
      // parse if subgraph config
      CNSubgraphConfig subgraph_config;
      subgraph_config.name = item_name;
      if (!subgraph_config.ParseByJSONStr(item_value)) {
        LOGE(CORE) << "Parse subgraph config failed. Subgraph name : [" + item_name + "].";
        return false;
      }
      // correct the relative path of subgraph configuration file.
      subgraph_config.config_path = config_root_dir + subgraph_config.config_path;
      subgraph_configs.push_back(std::move(subgraph_config));
    } else {
      // parse module config and insert graph nodes
      CNModuleConfig mconf;
      mconf.config_root_dir = config_root_dir;
      mconf.name = item_name;
      if (!mconf.ParseByJSONStr(item_value)) {
        LOGE(CORE) << "Parse module config failed. Module name : [" << mconf.name << "]";
        return false;
      }

      module_configs.push_back(std::move(mconf));
    }
  }  // for json items
  return true;
}

结合者config文件看

{
  "profiler_config" : {
    "enable_profiling" : false,
    "enable_tracing" : false
  },

  "subgraph:app_config":{
    "config_path" : "configs/app_config.json"
  },

  "subgraph:decode" : {
    "config_path" : "configs/decode_config.json",
    "next_modules" : ["subgraph:object_detection"]
  },

  "subgraph:object_detection" : {
    "config_path" : "configs/yolov5_nv_pcb_df.json",
    "next_modules" : ["subgraph:object_tracking"]
  },

  "subgraph:object_tracking" : {
    "config_path" : "configs/object_track_sort.json",
    "next_modules" : ["subgraph:business"]
  },
  
  "subgraph:business" : {
    "config_path" : "configs/business.json",
    "next_modules" : ["subgraph:image_store","subgraph:sinker"]
  },
  
  "subgraph:image_store" : {
    "config_path" : "configs/image_store.json",
    "next_modules" : ["subgraph:activemq", "subgraph:mysql"]
  },
  
  "subgraph:activemq" : {
    "config_path" : "configs/sinker_configs/activemq.json"
  },

  "subgraph:mysql" : {
    "config_path" : "configs/sinker_configs/mysql.json"
  },
  
  "subgraph:sinker" : {
    "config_path" : "configs/sinker_configs/rtsp.json"
  },

  "subgraph:preview_decode" : {
    "config_path" : "configs/preview_decode_config.json",
    "next_modules" : ["subgraph:preview_image_store"]
  },
  
  "subgraph:preview_image_store" : {
    "config_path" : "configs/image_store.json"
  }
}

然后调试看到,他就是把左右的都保存到了subgraph_configs里面。

然后就开始调用BuildPipeline(graph_config);了

2.2.2  BuildPipeline(graph_config);


bool Pipeline::BuildPipeline(const CNGraphConfig& graph_config) {
  auto t = graph_config;
  t.name = GetName();
  if (!graph_->Init(t)) {
    LOGE(CORE) << "Init graph failed.";
    return false;
  }
  // create modules by config
  std::vector<std::shared_ptr<Module>> modules;  // used to init profiler
  if (!CreateModules(&modules)) {
    LOGE(CORE) << "Create modules failed.";
    return false;
  }

  // generate parant mask for all nodes and route mask for head nodes.
  GenerateModulesMask();

  // This call must after GenerateModulesMask called,
  profiler_.reset(
      new PipelineProfiler(graph_->GetConfig().profiler_config, GetName(), modules, GetSortedModuleNames()));

  // create connectors for all nodes beside head nodes.
  // This call must after GenerateModulesMask called,
  // then we can determine witch are the head nodes.
  return CreateConnectors();
}

然后先看init函数


template <typename T>
bool CNGraph<T>::Init() {
  Clear();
  // subgraph analysis loop detect
  static thread_local std::set<std::string> subgraph_paths;
  if (nullptr == parent_graph_) {
    // root graph, init subgraph_paths
    subgraph_paths.clear();
  }

  dag_algorithm_.Reserve(config_.module_configs.size() + config_.subgraph_configs.size());
  // insert vertices
  for (const auto& module_config : config_.module_configs)
    if (!AddVertex(module_config)) return false;
  for (const auto& subgraph_config : config_.subgraph_configs) {
    if (!IsSubgraphItem(subgraph_config.name)) {
      // check subgraph name prefix, when subgraph_config.name is not parsed by CNGraphConfig::Parsexxxxx,
      // maybe will set an wrong name by user.
      LOGE(CORE) << "Subgraph's name must set with an prefix [" << std::string(kSubgraphConfigPrefix)
                 << "], wrong name : " << subgraph_config.name;
      return false;
    }
    if (parent_graph_) {
      // Current graph is a subgraph of other graphs
      auto real_path = __help_functions__::GetRealPath(subgraph_config.config_path);
      if (real_path.empty()) return false;
      if (!subgraph_paths.insert(real_path).second) {
        LOGE(CORE) << GetLogPrefix() << "A graph analysis loop was detected when parsing the subgraph named ["
                   << subgraph_config.name + "].";
        return false;
      }
    }
    if (!AddVertex(subgraph_config)) return false;
  }

  if (!InitEdges()) return false;

  FindHeadsAndTails();

  // check circle
  auto topo_result = dag_algorithm_.TopoSort();
  if (topo_result.second.size()) {
    LOGE(CORE) << GetLogPrefix() + "Ring detected.";
    return false;
  }
  return true;
}

其中

  dag_algorithm_.Reserve(config_.module_configs.size() + config_.subgraph_configs.size());

这里是图的顶点的个数,也就是config里面所有的module和subgraph这两个配置项加起来的个数。

然后直接到了if (!AddVertex(subgraph_config)) return false;


template <typename T>
bool CNGraph<T>::AddVertex(const CNSubgraphConfig& config) {
  if (!__help_functions__::IsNodeNameValid(config.name)) {
    LOGE(CORE) << GetLogPrefix() << "Subgraph[" << config.name << "] name invalid. "
               << "The name of modules or subgraphs can not contain slashes or risks";
    return false;
  }
  CNGraphConfig graph_config;
  if (!graph_config.ParseByJSONFile(config.config_path)) {
    LOGE(CORE) << GetLogPrefix() << "Parse subgraph config file failed. subgraph name: " << config.name;
    return false;
  }
  graph_config.name = __help_functions__::NameIgnoreSubgraphPrefix(config.name);
  auto subgraph = std::make_shared<CNGraph<T>>(graph_config);
  subgraph->parent_graph_ = this;
  if (!subgraph->Init()) {
    LOGE(CORE) << GetLogPrefix() + "Init subgraph[" + config.name + "] failed.";
    return false;
  }

  int vertex_id = dag_algorithm_.AddVertex();
  vertex_map_to_node_name_.push_back(config.name);

  if (!subgraph_node_map_.insert(std::make_pair(config.name, std::make_tuple(vertex_id, config, subgraph))).second) {
    LOGE(CORE) << GetLogPrefix() + "Subgraph[" + config.name + "] name duplicated.";
    return false;
  }
  return true;
}

3 不同module之间怎么传递数据

从前面的构建Pipeline的整体流程也可以看出来,在Pipeline的构造函数中最下面

Pipeline::Pipeline(const std::string& name) : name_(name) {
  // stream message handle thread
  exit_msg_loop_ = false;
  smsg_thread_ = std::thread(&Pipeline::StreamMsgHandleFunc, this);

  event_bus_.reset(new (std::nothrow) EventBus());
  LOGF_IF(CORE, nullptr == event_bus_) << "Pipeline::Pipeline() failed to alloc EventBus";
  GetEventBus()->AddBusWatch(std::bind(&Pipeline::DefaultBusWatch, this, std::placeholders::_1));

  idxManager_.reset(new (std::nothrow) IdxManager());
  LOGF_IF(CORE, nullptr == idxManager_) << "Pipeline::Pipeline() failed to alloc IdxManager";

  graph_.reset(new (std::nothrow) CNGraph<NodeContext>());
  LOGF_IF(CORE, nullptr == graph_) << "Pipeline::Pipeline() failed to alloc CNGraph";
}

前面两个都是跟eventbus相关的,然后IdxManager是用来管理stream_id的,然后最下面的CNGraph就是图,这里创建图的时候传入的模板类型是


/**
 * @brief The node context used by pipeline.
 */
struct NodeContext {
  std::shared_ptr<Module> module;
  std::shared_ptr<Connector> connector;
  uint64_t parent_nodes_mask = 0;
  uint64_t route_mask = 0;  // for head nodes
  // for gets node instance by a module, see Module::context_;
  std::weak_ptr<CNGraph<NodeContext>::CNNode> node;
};

这里面的Connector就是用来连接不同module的,

class Connector : private NonCopyable {
 public:
  /**
   * @brief Connector constructor.
   * @param
   *   [conveyor_count]: the conveyor num of this connector.
   *   [conveyor_capacity]: the maximum buffer number of a conveyor.
   */
  explicit Connector(const size_t conveyor_count, size_t conveyor_capacity = 20);
  ~Connector();

  const size_t GetConveyorCount() const;
  size_t GetConveyorCapacity() const;
  bool IsConveyorFull(int conveyor_idx) const;
  bool IsConveyorEmpty(int conveyor_idx) const;
  size_t GetConveyorSize(int conveyor_idx) const;
  uint64_t GetFailTime(int conveyor_idx) const;

  CNFrameInfoPtr PopDataBufferFromConveyor(int conveyor_idx);
  bool PushDataBufferToConveyor(int conveyor_idx, CNFrameInfoPtr data);

  void Start();
  void Stop();
  bool IsStopped();
  void EmptyDataQueue();

 private:
  Conveyor* GetConveyorByIdx(int idx) const;
  Conveyor* GetConveyor(int conveyor_idx) const;

  std::vector<Conveyor*> conveyors_;
  size_t conveyor_capacity_ = 20;
  std::vector<uint64_t> fail_times_;
  std::atomic<bool> stop_{false};
};  // class Connector

Connector里面有个conveyors_成员,这个conveyors_可以看到是个vector,那就说明有多个,为什么有多个,是因为比如我们有很多路流,那么两个module之间连接时要有多个conveyor,每个conveyor负责一路流的传输,然后这里的conveyor_capacity_就是指队列的长度不能超过20,


/**
 * @brief Conveyor is used to transmit data between two modules.
 *
 * Conveyor belongs to Connector.
 * Each Connect could have several conveyors which depends on the paramllelism of each module.
 *
 * Conveyor has one buffer queue for transmitting data from one module to another.
 * The upstream node module will push data to buffer queue, and the downstream node will pop data from buffer queue.
 *
 * The capacity of buffer queue could be set in configuration json file (see README for more information of
 * configuration json file). If there is no element in buffer queue, the downstream node will wait to pop and
 * be blocked. On contrary, if the queue is full, the upstream node will wait to push and be blocked.
 */
class Conveyor : private NonCopyable {
 public:
  explicit Conveyor(size_t max_size);
  ~Conveyor() = default;
  bool PushDataBuffer(CNFrameInfoPtr data);
  CNFrameInfoPtr PopDataBuffer();
  std::vector<CNFrameInfoPtr> PopAllDataBuffer();
  uint32_t GetBufferSize();
  uint64_t GetFailTime();

#ifdef UNIT_TEST
 public:  // NOLINT
#else
 private:  // NOLINT
#endif

 private:
  std::queue<CNFrameInfoPtr> dataq_;
  size_t max_size_;
  uint64_t fail_time_ = 0;
  std::mutex data_mutex_;
  std::condition_variable notempty_cond_;
  const std::chrono::milliseconds rel_time_{20};
};  // class Conveyor

这个Conveyor类就是负责具体的传输数据的。

那么实际传输数据的代码在哪里呢,每个module类里面的TransmitData会调用Pipeline里面的TransmitData函数,憨厚


void Pipeline::TransmitData(NodeContext* context, const std::shared_ptr<CNFrameInfo>& data) {
 //std::chrono::high_resolution_clock::time_point tnow = std::chrono::high_resolution_clock::now();
  if (data->IsInvalid()) {
    OnDataInvalid(context, data);
    return;
  }
  if (!context->parent_nodes_mask) {
    // root node
    // set mask to 1 for never touched modules, for case which has multiple source modules.
    data->SetModulesMask(all_modules_mask_ ^ context->route_mask);
  }
  if (data->IsEos()) {
    OnEos(context, data);
  } else {
    OnProcessEnd(context, data);
    if (IsStreamRemoved(data->stream_id)) return;
  }

  auto node = context->node.lock();
  auto module = context->module;
  const uint64_t cur_mask = data->MarkPassed(module.get());
  const bool passed_by_all_modules = PassedByAllModules(cur_mask);

  if (passed_by_all_modules) {
    OnPassThrough(data);
    return;
  }

 // std::chrono::high_resolution_clock::time_point tpost = std::chrono::high_resolution_clock::now();
  //std::cout << "<<<<<< 44444444444 TransmitData  up time = " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost - tnow).count() * 1000 << " ms" << std::endl;

 // std::chrono::high_resolution_clock::time_point tnow1 = std::chrono::high_resolution_clock::now();
  // transmit to next nodes
  for (auto next_node : node->GetNext()) {
    if (!PassedByAllParentNodes(&next_node->data, cur_mask)) continue;
    auto next_module = next_node->data.module;
    auto connector = next_node->data.connector;
    // push data to conveyor only after data passed by all parent nodes.
    if (IsProfilingEnabled() && !data->IsEos())
      next_module->GetProfiler()->RecordProcessStart(kINPUT_PROFILER_NAME,
                                                     std::make_pair(data->stream_id, data->timestamp));
    const int conveyor_idx = data->GetStreamIndex() % connector->GetConveyorCount();

    int retry_cnt = 1;
    while (!connector->IsStopped() && connector->PushDataBufferToConveyor(conveyor_idx, data) == false) {
      std::this_thread::sleep_for(std::chrono::milliseconds(5 * retry_cnt));
      retry_cnt = std::min(retry_cnt * 2, 10);
    }  // while try push
  }  // loop next nodes
  //std::chrono::high_resolution_clock::time_point tpost1 = std::chrono::high_resolution_clock::now();
 // std::cout << "<<<<<< 555555555555 TransmitData  down time = " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost1 - tnow1).count() * 1000 << " ms" << std::endl;

}

然后在

bool Connector::PushDataBufferToConveyor(int conveyor_idx, CNFrameInfoPtr data) {
  return GetConveyor(conveyor_idx)->PushDataBuffer(data);
}

然后最终是通过Conveyor类进行传输的,在这里push到队列中

bool Conveyor::PushDataBuffer(CNFrameInfoPtr data) {
  std::unique_lock<std::mutex> lk(data_mutex_);
  if (dataq_.size() < max_size_) {
    dataq_.push(data);
    notempty_cond_.notify_one();
    fail_time_ = 0;
    return true;
  }
  fail_time_ += 1;
  return false;
}

每个module在这里从队列总pop出数据进行处理

void Pipeline::TaskLoop(NodeContext* context, uint32_t conveyor_idx) {
  auto module = context->module;
  auto connector = context->connector;
  auto node_name = module->GetName();

  // process loop
  while (1) {
    std::shared_ptr<CNFrameInfo> data = nullptr;
    // pull data from conveyor
    while (!connector->IsStopped() && data == nullptr) data = connector->PopDataBufferFromConveyor(conveyor_idx);
    if (connector->IsStopped()) break;
    if (data == nullptr) {
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
      continue;
    }

    OnProcessStart(context, data);
    int ret = module->DoProcess(data);
    if (ret < 0) OnProcessFailed(context, data, ret);

    std::this_thread::yield();
  }  // while process loop
}

4 怎么区分多路不同流----用stream_id

根据stream_id区分多路流,具体是

ret = AddSourceForFile(source, stream_id, filename, FLAGS_src_frame_rate);

        cnstream::CreateSource(source, stream_id, param);

                std::make_shared<FileHandler>(module, stream_id, param);               

                        FileHandler::FileHandler(DataSource *module, const std::string &stream_id,                                 const FileSourceParam &param)

                                : SourceHandler(module, stream_id) {

                                impl_ = new (std::nothrow) FileHandlerImpl(module, param, this);

                        }

                                stream_index_ = module_->GetStreamIndex(stream_id_);

                                        if (container_) return container_->GetStreamIndex(stream_id);

                                                return idxManager_->GetStreamIndex(stream_id);     

然后

uint32_t IdxManager::GetStreamIndex(const std::string& stream_id) {
  std::lock_guard<std::mutex> guard(id_lock);
  auto search = stream_idx_map.find(stream_id);
  if (search != stream_idx_map.end()) {
    return search->second;
  }

  for (uint32_t i = 0; i < GetMaxStreamNumber(); i++) {
    if (!stream_bitset[i]) {
      stream_bitset.set(i);
      stream_idx_map[stream_id] = i;
      return i;
    }
  }
  return kInvalidStreamIdx;
}

每个Pipeline都有一个IdxManager类成员std::unique_ptr<IdxManager> idxManager_ = nullptr;,用来管理stream_id的

/**
 * @brief ModuleId&StreamIdx manager for pipeline. Allocates and deallocates id for Pipeline modules & streams.
 */
class IdxManager {
 public:
  IdxManager() = default;
  IdxManager(const IdxManager &) = delete;
  IdxManager &operator=(const IdxManager &) = delete;
  uint32_t GetStreamIndex(const std::string &stream_id);
  void ReturnStreamIndex(const std::string &stream_id);
  size_t GetModuleIdx();
  void ReturnModuleIdx(size_t id_);

 private:
  std::mutex id_lock;
  std::map<std::string, uint32_t> stream_idx_map;
  std::bitset<kMaxStreamNum> stream_bitset;
  uint64_t module_id_mask_ = 0;
};  // class IdxManager

5 std::shared_ptr<CNFrameInfo> data里面的stream_id是哪里来的

然后

void FileHandlerImpl::OnDecodeFrame(BufSurfWrapperPtr wrapper) {
        if (frame_count_++ % param_.interval != 0) {
            // LOGI(SOURCE) << "frames are discarded" << frame_count_;
            return;  // discard frames
        }

        std::shared_ptr<CNFrameInfo> data = this->CreateFrameInfo();
        if (!data) {
            LOGW(SOURCE) << "[FileHandlerImpl] OnDecodeFrame(): failed to create FrameInfo.";
            return;
        }

        data->timestamp = wrapper->GetPts();
        if (!wrapper->GetBufSurface()) {
            data->flags = static_cast<size_t>(CNFrameFlag::CN_FRAME_FLAG_INVALID);
            this->SendFrameInfo(data);
            return;
        }

#if 0 //调试代码,记得修改。
        static int debug_count  = 0;
        BufSurface* src_buf = wrapper->GetBufSurface();
        std::vector<uint8_t> indata(src_buf->surface_list[0].data_size);//调试代码,记得删除。
        MemcpyHD(indata.data(), BUF_MEMORY_NORMAL, src_buf->surface_list[0].data_ptr, BUF_MEMORY_NORMAL, src_buf->surface_list[0].data_size);//调试代码,记得删除。
        cv::Mat yuv_mat(src_buf->surface_list[0].height * 3 / 2, src_buf->surface_list[0].width, CV_8UC1, indata.data());//调试代码,记得修改。
        // std::vector<uint8_t> indata(1920*1080*1.5);//调试代码,记得删除。
        // MemcpyHD(indata.data(), BUF_MEMORY_NORMAL, src_buf->surface_list[0].data_ptr, BUF_MEMORY_NORMAL, 1920*1080*1.5);//调试代码,记得删除。
        // cv::Mat yuv_mat(1080 * 3 / 2, 1920, CV_8UC1, indata.data());//调试代码,记得修改。
        cv::Mat bgr_mat;
        cv::cvtColor(yuv_mat, bgr_mat, cv::COLOR_YUV2BGR_NV12); //调试代码,记得修改,这是原代码  
        //cv::cvtColor(yuv_mat, bgr_mat, cv::COLOR_YUV2BGR_I420); //调试代码,记得修改,不对,
        //cv::imwrite("OnDecodeFrame.jpg", bgr_mat);
        debug_count =  debug_count + 1;
        char decode_name[20] = {};

        if((debug_count < 3000) && (debug_count % 2 == 0))
        {
            sprintf(decode_name, "OnDecodeFrame_%d.jpg",  debug_count);
            //cv::imwrite(decode_name, bgr_mat);
        }
        
#endif

        //std::chrono::high_resolution_clock::time_point tnow = std::chrono::high_resolution_clock::now();
        int ret = SourceRender::Process(data, std::move(wrapper), frame_id_++, param_);
        if (ret < 0) {
            LOGE(SOURCE) << "[FileHandlerImpl] OnDecodeFrame(): [" << stream_id_ << "]: Render frame failed";
            return;
        }
        //std::chrono::high_resolution_clock::time_point tpost = std::chrono::high_resolution_clock::now();
        //std::cout << "<<<<<< 0000000000000000000000000000000SourceRender::Process  time = " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost - tnow).count() * 1000 << " ms" << std::endl;

        //std::chrono::high_resolution_clock::time_point tnow1 = std::chrono::high_resolution_clock::now();
        this->SendFrameInfo(data);
       // std::chrono::high_resolution_clock::time_point tpost1 = std::chrono::high_resolution_clock::now();
       // std::cout << "<<<<<< 111111111111111111this->SendFrameInfo  time = " << std::chrono::duration_cast<std::chrono::duration<double>>(tpost1 - tnow1).count() * 1000 << " ms" << std::endl;
    }

然后

std::shared_ptr<CNFrameInfo> CreateFrameInfo(bool eos = false) {
    std::shared_ptr<CNFrameInfo> data;
    int retry_cnt = 1;
    while (1) {
      data = handler_->CreateFrameInfo(eos);
      if (data != nullptr) break;
      if (CreateInterrupt()) break;
      std::this_thread::sleep_for(std::chrono::microseconds(5 * retry_cnt));
      retry_cnt = std::min(retry_cnt * 2, 20);
    }
    if (!eos) {
      auto dataframe = std::make_shared<CNDataFrame>();
      if (!dataframe) {
        return nullptr;
      }
      auto inferobjs = std::make_shared<CNInferObjs>();
      if (!inferobjs) {
        return nullptr;
      }
      data->collection.Add(kCNDataFrameTag, dataframe);
      data->collection.Add(kCNInferObjsTag, inferobjs);
    }
    return data;
  }

最终是

std::shared_ptr<CNFrameInfo> CNFrameInfo::Create(const std::string &stream_id, bool eos,
                                                 std::shared_ptr<CNFrameInfo> payload) {
  if (stream_id == "") {
    LOGE(CORE) << "CNFrameInfo::Create() stream_id is empty string.";
    return nullptr;
  }
  std::shared_ptr<CNFrameInfo> ptr(new (std::nothrow) CNFrameInfo());
  if (!ptr) {
    LOGE(CORE) << "CNFrameInfo::Create() new CNFrameInfo failed.";
    return nullptr;
  }
  ptr->stream_id = stream_id;
  ptr->payload = payload;
  if (eos) {
    ptr->flags |= static_cast<size_t>(cnstream::CNFrameFlag::CN_FRAME_FLAG_EOS);
    if (!ptr->payload) {
      std::lock_guard<std::mutex> guard(s_eos_lock_);
      s_stream_eos_map_[stream_id] = false;
    }
    return ptr;
  }

  return ptr;
}

这样data里面就有stream_id了。

参考文献:

手写EventBus(观察者模式、源码阅读、反射) - 简书

CNStream流处理多路并发Pipeline框架整体介绍-CSDN博客

寒武纪CNStream用户手册 — 寒武纪CNStream用户手册 6.2.0 文档

GitHub - Cambricon/CNStream: CNStream is a streaming framework for building Cambricon machine learning pipelines http://forum.cambricon.com https://gitee.com/SolutionSDK/CNStream

CNStream/docs/release_document/latest/Cambricon-CNStream-User-Guide-CN-vlatest.pdf at master · Cambricon/CNStream · GitHub

C++内存池Memory Pool的高级实现、代码详解、CMake构建工程、应用实例-CSDN博客

aclStream流处理多路并发Pipeline框架中VEncode Module代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理-CSDN博客

 CNStream代码中C++反射机制的使用-CSDN博客

aclStream流处理多路并发Pipeline框架中 视频解码 代码调用流程整理、类的层次关系整理、回调函数赋值和调用流程整理-CSDN博客

EventBus的C++实现、代码分析-CSDN博客


原文地址:https://blog.csdn.net/u013171226/article/details/142824304

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!