Files
VideoPipe/nodes/vp_sync_node.cpp
2026-06-03 12:43:14 +08:00

189 lines
8.6 KiB
C++

#include "vp_sync_node.h"
#include <iostream>
namespace vp_nodes {
vp_sync_node::vp_sync_node(std::string node_name, vp_sync_mode mode, int timeout):
vp_node(node_name),
mode(mode),
timeout(timeout) {
assert(timeout > 0);
this->initialized();
}
vp_sync_node::~vp_sync_node() {
deinitialized();
}
std::shared_ptr<vp_objects::vp_meta> vp_sync_node::handle_frame_meta(std::shared_ptr<vp_objects::vp_frame_meta> meta) {
if (all_indexs_last_syned.count(meta->channel_index) == 0) {
all_indexs_last_syned[meta->channel_index] = -1; // initialize by -1
}
auto& index_last_synced = all_indexs_last_syned[meta->channel_index];
auto& meta_waiting_for_sync = all_meta_waiting_for_sync[meta->channel_index];
// discard because it has been already synced or timeout before
if (meta->frame_index <= index_last_synced) {
// warn log for discard
VP_WARN(vp_utils::string_format("[%s][channel%d] discard for frame_index:[%d], because it has been synced or timeout before...", node_name.c_str(), meta->channel_index, meta->frame_index));
return nullptr;
}
bool need_wait = true;
bool start_of_queue = true;
for (auto i = meta_waiting_for_sync.begin(); i != meta_waiting_for_sync.end();) {
auto meta_type = (*i)->meta_type;
// it is control meta AND at the start of queue, push to downstream directly
if (meta_type == vp_objects::vp_meta_type::CONTROL) {
if (start_of_queue) {
auto waiting_control_meta = dynamic_pointer_cast<vp_objects::vp_control_meta>((*i));
pendding_meta(waiting_control_meta);
i = meta_waiting_for_sync.erase(i);
VP_DEBUG(vp_utils::string_format("[%s][channel%d] sync control meta for control_uid:[%s], now cache size of sync is:[%d]", node_name.c_str(), meta->channel_index, waiting_control_meta->control_uid.c_str(), meta_waiting_for_sync.size()));
}
else {
// skip
i++;
}
}
else {
// it is frame meta
auto waiting_meta = dynamic_pointer_cast<vp_objects::vp_frame_meta>((*i));
auto frame_index = waiting_meta->frame_index;
auto frame_period = 1000.0 / meta->fps;
auto delta = (meta->frame_index - frame_index) * frame_period;
// timeout logic
if (delta >= timeout) {
pendding_meta(waiting_meta);
index_last_synced = frame_index;
i = meta_waiting_for_sync.erase(i);
// warn log for timeout
VP_WARN(vp_utils::string_format("[%s][channel%d] timeout for frame_index:[%d], push to downstream directly...", node_name.c_str(), meta->channel_index, waiting_meta->frame_index));
}
else {
// sync logic
if (frame_index == meta->frame_index) {
sync(waiting_meta, meta);
pendding_meta(waiting_meta);
index_last_synced = frame_index;
i = meta_waiting_for_sync.erase(i);
// do not need wait for sync
need_wait = false;
VP_DEBUG(vp_utils::string_format("[%s][channel%d] sync frame meta for frame_index:[%d], now cache size of sync is:[%d]", node_name.c_str(), meta->channel_index, waiting_meta->frame_index, meta_waiting_for_sync.size()));
break;
}
// skip
i++;
start_of_queue = false;
}
}
}
if (need_wait) {
meta_waiting_for_sync.push_back(meta);
}
return nullptr; // important
}
std::shared_ptr<vp_objects::vp_meta> vp_sync_node::handle_control_meta(std::shared_ptr<vp_objects::vp_control_meta> meta) {
if (all_control_uids_last_synced.count(meta->channel_index) == 0) {
all_control_uids_last_synced[meta->channel_index] = ""; // initialize by empty
}
auto& control_uid_last_synced = all_control_uids_last_synced[meta->channel_index];
auto& meta_waiting_for_sync = all_meta_waiting_for_sync[meta->channel_index];
if (control_uid_last_synced != meta->control_uid) {
meta_waiting_for_sync.push_back(meta);
control_uid_last_synced = meta->control_uid;
}
return nullptr; // important
}
void vp_sync_node::sync(std::shared_ptr<vp_objects::vp_frame_meta> des, std::shared_ptr<vp_objects::vp_frame_meta> src) {
// point to the same vp_frame_meta object
if (des == src) {
return;
}
/*
* normal data to sync:
* 1. osd frame
* 2. mask
*/
if (des->osd_frame.empty() && !src->osd_frame.empty()) {
des->osd_frame = src->osd_frame;
}
if (des->mask.empty() && !src->mask.empty()) {
des->mask = src->mask;
}
/*
* infer data to sync:
* 1. targets
* 2. face targets
* 3. text targets
* 4. pose targets
*/
if (mode == vp_sync_mode::MERGE) {
// merge target lists between 2 vp_frame_meta objects
// insert pointers directly
des->targets.insert(des->targets.end(), src->targets.begin(), src->targets.end());
des->face_targets.insert(des->face_targets.end(), src->face_targets.begin(), src->face_targets.end());
des->text_targets.insert(des->text_targets.end(), src->text_targets.begin(), src->text_targets.end());
des->pose_targets.insert(des->pose_targets.end(), src->pose_targets.begin(), src->pose_targets.end());
}
else {
// update properties of targets
// first make sure the size of targets are equal
assert(des->targets.size() == src->targets.size());
assert(des->face_targets.size() == src->face_targets.size());
assert(des->text_targets.size() == src->text_targets.size());
assert(des->pose_targets.size() == src->pose_targets.size());
for (int i = 0; i < src->targets.size(); i++) {
auto& des_target = des->targets[i];
auto& src_target = src->targets[i];
if (des_target->track_id == -1 && src_target->track_id != -1) {
des_target->track_id = src_target->track_id;
}
if (des_target->tracks.size() == 0 && src_target->tracks.size() != 0) {
des_target->tracks = src_target->tracks;
}
if (des_target->mask.empty() && !src_target->mask.empty()) {
des_target->mask = src_target->mask;
}
if (des_target->embeddings.size() == 0 && src_target->embeddings.size() != 0) {
des_target->embeddings = src_target->embeddings;
}
des_target->secondary_class_ids.insert(des_target->secondary_class_ids.end(), src_target->secondary_class_ids.begin(), src_target->secondary_class_ids.end());
des_target->secondary_labels.insert(des_target->secondary_labels.end(), src_target->secondary_labels.begin(), src_target->secondary_labels.end());
des_target->secondary_scores.insert(des_target->secondary_scores.end(), src_target->secondary_scores.begin(), src_target->secondary_scores.end());
des_target->sub_targets.insert(des_target->sub_targets.end(), src_target->sub_targets.begin(), src_target->sub_targets.end());
}
for (int i = 0; i < src->face_targets.size(); i++) {
auto& des_face_target = des->face_targets[i];
auto& src_face_target = src->face_targets[i];
if (des_face_target->track_id == -1 && src_face_target->track_id != -1) {
des_face_target->track_id = src_face_target->track_id;
}
if (des_face_target->tracks.size() == 0 && src_face_target->tracks.size() != 0) {
des_face_target->tracks = src_face_target->tracks;
}
if (des_face_target->embeddings.size() == 0 && src_face_target->embeddings.size() != 0) {
des_face_target->embeddings = src_face_target->embeddings;
}
}
}
}
}