#include "vp_node.h" namespace vp_nodes { vp_node::vp_node(std::string node_name): node_name(node_name) { } vp_node::~vp_node() { } // there is only one thread poping data from the in_queue, we don't use lock here when poping. // there is only one thread pushing data to the out_queue, we don't use lock here when pushing. void vp_node::handle_run() { // cache for batch handling if need std::vector> frame_meta_batch_cache; while (alive) { // wait for producer, make sure in_queue is not empty. this->in_queue_semaphore.wait(); VP_DEBUG(vp_utils::string_format("[%s] before handling meta, in_queue.size()==>%d", node_name.c_str(), in_queue.size())); auto in_meta = this->in_queue.front(); // dead flag if (in_meta == nullptr) { continue; } // handling hooker activated if need invoke_meta_handling_hooker(node_name, in_queue.size(), in_meta); std::shared_ptr out_meta; auto batch_complete = false; // call handlers if (in_meta->meta_type == vp_objects::vp_meta_type::CONTROL) { auto meta_2_handle = std::dynamic_pointer_cast(in_meta); out_meta = this->handle_control_meta(meta_2_handle); } else if (in_meta->meta_type == vp_objects::vp_meta_type::FRAME) { auto meta_2_handle = std::dynamic_pointer_cast(in_meta); // one by one if (frame_meta_handle_batch == 1) { out_meta = this->handle_frame_meta(meta_2_handle); } else { // batch by batch frame_meta_batch_cache.push_back(meta_2_handle); if (frame_meta_batch_cache.size() >= frame_meta_handle_batch) { // cache complete this->handle_frame_meta(frame_meta_batch_cache); batch_complete = true; } else { // cache not complete, do nothing VP_DEBUG(vp_utils::string_format("[%s] handle meta with batch, frame_meta_batch_cache.size()==>%d", node_name.c_str(), frame_meta_batch_cache.size())); } } } else { throw "invalid meta type!"; } this->in_queue.pop(); VP_DEBUG(vp_utils::string_format("[%s] after handling meta, in_queue.size()==>%d", node_name.c_str(), in_queue.size())); // one by one mode // return nullptr means do not push it to next nodes(such as in des nodes). if (out_meta != nullptr && node_type() != vp_node_type::DES) { VP_DEBUG(vp_utils::string_format("[%s] before handling meta, out_queue.size()==>%d", node_name.c_str(), out_queue.size())); this->out_queue.push(out_meta); // handled hooker activated if need invoke_meta_handled_hooker(node_name, out_queue.size(), out_meta); // notify consumer of out_queue this->out_queue_semaphore.signal(); VP_DEBUG(vp_utils::string_format("[%s] after handling meta, out_queue.size()==>%d", node_name.c_str(), out_queue.size())); } // batch by batch mode if (batch_complete && node_type() != vp_node_type::DES) { // push to out_queue one by one for (auto& i: frame_meta_batch_cache) { VP_DEBUG(vp_utils::string_format("[%s] before handling meta, out_queue.size()==>%d", node_name.c_str(), out_queue.size())); this->out_queue.push(i); // handled hooker activated if need invoke_meta_handled_hooker(node_name, out_queue.size(), i); // notify consumer of out_queue this->out_queue_semaphore.signal(); VP_DEBUG(vp_utils::string_format("[%s] after handling meta, out_queue.size()==>%d", node_name.c_str(), out_queue.size())); } // clean cache for the next batch frame_meta_batch_cache.clear(); } } // send dead flag for dispatch_thread this->out_queue.push(nullptr); this->out_queue_semaphore.signal(); } // there is only one thread poping from the out_queue, we don't use lock here when poping. void vp_node::dispatch_run() { while (alive) { // wait for producer, make sure out_queue is not empty. this->out_queue_semaphore.wait(); VP_DEBUG(vp_utils::string_format("[%s] before dispatching meta, out_queue.size()==>%d", node_name.c_str(), out_queue.size())); auto out_meta = this->out_queue.front(); // dead flag if (out_meta == nullptr) { continue; } // leaving hooker activated if need invoke_meta_leaving_hooker(node_name, out_queue.size(), out_meta); // do something.. this->push_meta(out_meta); this->out_queue.pop(); VP_DEBUG(vp_utils::string_format("[%s] after dispatching meta, out_queue.size()==>%d", node_name.c_str(), out_queue.size())); } } std::shared_ptr vp_node::handle_frame_meta(std::shared_ptr meta) { return meta; } std::shared_ptr vp_node::handle_control_meta(std::shared_ptr meta) { return meta; } void vp_node::handle_frame_meta(const std::vector>& meta_with_batch) { } void vp_node::meta_flow(std::shared_ptr meta) { if (meta == nullptr) { return; } std::lock_guard guard(this->in_queue_lock); VP_DEBUG(vp_utils::string_format("[%s] before meta flow, in_queue.size()==>%d", node_name.c_str(), in_queue.size())); this->in_queue.push(meta); // arriving hooker activated if need invoke_meta_arriving_hooker(node_name, in_queue.size(), meta); // notify consumer of in_queue this->in_queue_semaphore.signal(); VP_DEBUG(vp_utils::string_format("[%s] after meta flow, in_queue.size()==>%d", node_name.c_str(), in_queue.size())); } void vp_node::detach() { for(auto i : this->pre_nodes) { i->remove_subscriber(shared_from_this()); } this->pre_nodes.clear(); } void vp_node::detach_from(std::vector pre_node_names) { for (auto i = this->pre_nodes.begin(); i != this->pre_nodes.end();) { if (std::find(pre_node_names.begin(), pre_node_names.end(), (*i)->node_name) != pre_node_names.end()) { (*i)->remove_subscriber(shared_from_this()); i = this->pre_nodes.erase(i); } else { i++; } } } void vp_node::detach_recursively() { detach(); auto nodes = next_nodes(); for (auto& n: nodes) { n->detach_recursively(); } } void vp_node::attach_to(std::vector> pre_nodes) { // can not attach src node to any previous nodes if (this->node_type() == vp_node_type::SRC) { throw vp_excepts::vp_invalid_calling_error("SRC nodes must not have any previous nodes!"); } // can not attach any nodes to des node for(auto i : pre_nodes) { if (i->node_type() == vp_node_type::DES) { throw vp_excepts::vp_invalid_calling_error("DES nodes must not have any next nodes!"); } i->add_subscriber(shared_from_this()); this->pre_nodes.push_back(i); } } void vp_node::initialized() { // start threads since all resources have been initialized this->handle_thread = std::thread(&vp_node::handle_run, this); this->dispatch_thread = std::thread(&vp_node::dispatch_run, this); } void vp_node::deinitialized() { // send dead flag alive = false; { std::lock_guard guard(this->in_queue_lock); this->in_queue.push(nullptr); this->in_queue_semaphore.signal(); } // wait for threads exits in vp_node if (handle_thread.joinable()) { handle_thread.join(); } if (dispatch_thread.joinable()) { dispatch_thread.join(); } } vp_node_type vp_node::node_type() { // return vp_node_type::MID by default // need override in child class return vp_node_type::MID; } std::vector> vp_node::next_nodes() { std::vector> next_nodes; std::lock_guard guard(this->subscribers_lock); for(auto & i: this->subscribers) { next_nodes.push_back(std::dynamic_pointer_cast(i)); } return next_nodes; } std::string vp_node::to_string() { // return node_name by default return node_name; } void vp_node::pendding_meta(std::shared_ptr meta) { this->out_queue.push(meta); // handled hooker activated if need invoke_meta_handled_hooker(node_name, out_queue.size(), meta); // notify consumer of out_queue this->out_queue_semaphore.signal(); } }