自学内容网 自学内容网

并行编程实战——TBB框架的应用之二Supra中节点的管理

一、Supra中的节点

SUPRA中的节点定义与TBB中的原生节点定义有所不同,在上文也提到了,它等同于在TBB原生节点上层的进一步抽象。可以这样理解,SUPRA中的节点是一个类似通信逻辑的节点,它包含TBB中的相关节点定义和连接处理方式。为了区分SUPRA中TBB节点的通信实例,采用了类似于网络通信的端口设计,很有新意。
在SUPRA节点中,定义了抽象节点、输入节点和输出节点以及功能节点。一般情况下,可以简单理解为数据的输入控制节点即为输入节点,输出节点即为输出节点。而抽象节点做为所有节点的基类。而功能节点则是不同于输入和输出节点的其它节点,它一般直接继承于抽象节点,这样会更容易控制。这种思想和TBB中的节点风格基本保持了一致。
但有所不同的是,在Supra的节点内其实是完全可以自定义任意的TBB节点的,比如在Supra的输入节点内就使用的广播节点。同时,这样的节点可以定义多个。

二、Supra节点的管理

这里先只看Supra中的节点创建,而在此节点中创建TBB节点的先不进行分析。
1、创建和删除
Supra的节点主要是通过配置文件创建,这类创建行为可叫做静态创建;另外,为了实现一些特定的功能可以动态的创建一些节点类。这种情况相对来说比较少见。
2、连接和断开
在Supra项目中,比较有特点是对节点的管理。它通过XML配置文件来实现对节点的创建(包括相关的参数以及名称、ID等)。然后利用节点的不同类型在程序中利用工厂创建 ;而对于TBB中对节点间的管理,它也进行了一层抽象,使用XML中配置的连接“connection”中的“from=>to”节进行控制,以其中的参数节点ID以及节点的连接端口port向程序提供节点的连接状态处理。也就是说,在程序启动时,一个基础的节点图就创建和准备流动了。看一下相关的配置文件:

<?xml version="1.0"?>
<supra_config>
<devices>
<inputs>
<input type="UltrasoundInterfaceCephasonicsCC" id="US-Cep">
<param name="txVoltage" type="double">
60
</param>
<param name="txApertureSizeY" type="uint32_t">
1
</param>
</input>
</inputs>
<outputs>
<output type="OpenIGTLinkOutputDevice" id="IGTL" />
<output type="MetaImageOutputDevice" id="MHD_Raw">
<param name="createSequences" type="bool">
1
</param>
<param name="filename" type="string">
rawData
</param>
</output>
</outputs>
<nodes>
<node type="BeamformingNode" id="BEAM">
<param name="windowParameter" type="double">
0.5
</param>
</node>
<node type="TemporalFilterNode" id="FILT">
</node>
<node type="LogCompressorNode" id="LOGC">
<param name="dynamicRange" type="double">
40
</param>
</node>
<node type="ScanConverterNode" id="SCAN">
</node>
</nodes>
</devices>
<connections>
<connection>
<from id="US-Cep" port="0" />
<to id="BEAM" port="0" />
</connection>
<connection>
<from id="BEAM" port="0" />
<to id="DEMO" port="0" />
</connection>
</connections>
</supra_config>

注意:上述配置进行了删减,可参看项目提供的原始配置文件
当然,为了灵活性Supra也提供了动态连接和删除节点的方法,请参看下面的代码

void SupraManager::connect(string fromID, size_t fromPort, string toID, size_t toPort)
{
if (nodeExists(fromID) && nodeExists(toID))
{
shared_ptr<AbstractNode> fromNode = m_nodes[fromID];
shared_ptr<AbstractNode> toNode = m_nodes[toID];
if (fromNode->getNumOutputs() > fromPort && toNode->getNumInputs() > toPort)
{
auto connTuple = std::make_tuple(fromID, fromPort, toID, toPort);
if (m_nodeConnections.count(connTuple) == 0)
{
m_nodeConnections[connTuple] = true;
tbb::flow::make_edge(
*(dynamic_cast<tbb::flow::sender<std::shared_ptr<RecordObject> >*>(fromNode->getOutput(fromPort))),
*(dynamic_cast<tbb::flow::receiver<std::shared_ptr<RecordObject> >*>(toNode->getInput(toPort))));
logging::log_log("SupraManager: Added connection from (", fromID, ", ", fromPort, ") to (", toID, ", ", toPort, ").");
}
else
{
logging::log_error("SupraManager: Could not add connection from (", fromID, ", ", fromPort, ") to (", toID, ", ", toPort, "). It already exists.");
}
}
else {
logging::log_error("SupraManager: Could not add connection from (", fromID, ", ", fromPort, ") to (", toID, ", ", toPort, "). One of the ports does not exist.");
}
}
else {
logging::log_error("SupraManager: Could not add connection from (", fromID, ", ", fromPort, ") to (", toID, ", ", toPort, "). One node does not exist.");
}
}

void SupraManager::disconnect(string fromID, size_t fromPort, string toID, size_t toPort)
{
if (nodeExists(fromID) && nodeExists(toID))
{
shared_ptr<AbstractNode> fromNode = m_nodes[fromID];
shared_ptr<AbstractNode> toNode = m_nodes[toID];
if (fromNode->getNumOutputs() > fromPort && toNode->getNumInputs() > toPort)
{
auto connTuple = std::make_tuple(fromID, fromPort, toID, toPort);
if (m_nodeConnections.count(connTuple) == 1 &&
m_nodeConnections[connTuple])
{
m_nodeConnections.erase(connTuple);
tbb::flow::remove_edge(
*(dynamic_cast<tbb::flow::sender<std::shared_ptr<RecordObject> >*>(fromNode->getOutput(fromPort))),
*(dynamic_cast<tbb::flow::receiver<std::shared_ptr<RecordObject> >*>(toNode->getInput(toPort))));
logging::log_log("SupraManager: Removed connection from (", fromID, ", ", fromPort, ") to (", toID, ", ", toPort, ").");
}
else
{
logging::log_error("SupraManager: Could not remove connection from (", fromID, ", ", fromPort, ") to (", toID, ", ", toPort, "). It does not exist.");
}
}
else {
logging::log_error("SupraManager: Could not remove connection from (", fromID, ", ", fromPort, ") to (", toID, ", ", toPort, "). One of the ports does not exist.");
}
}
else {
logging::log_error("SupraManager: Could not remove connection from (", fromID, ", ", fromPort, ") to (", toID, ", ", toPort, "). One node does not exist.");
}
}

3、应用
在节点通过配置文件进行创建并连接后,程序启动时,就可以按照设定的节点数据流动方向进行处理。这和TBB的节点处理从逻辑上讲是一致的。每一个节点内通过注册相关的节点处理函数,便可以自动引出数据流处理后再自动由TBB流向下一个节点。
需要注意的是,在Supra节点中,不再存在强制的哪类节点为一个输入或一个输出之类的要求。但正常的情况下尽可能还是保持与TBB节点的设计需求一致为好。

三、不同节点的自由数据流动

Supra中本身其节点并没有任何并行的功能,其实现最终仍然是调用TBB中的节点来实现。所以,此处就涉及到了,如何从Supar中的节点到TBB节点的数据映射和流动的处理。
1、动态创建不同类型的TBB节点

class AbstractNode
{
protected:
typedef tbb::flow::function_node<std::shared_ptr<RecordObject>, std::shared_ptr<RecordObject>, tbb::flow::rejecting> NodeTypeDiscarding;
typedef tbb::flow::function_node<std::shared_ptr<RecordObject>, std::shared_ptr<RecordObject>, tbb::flow::queueing> NodeTypeQueueing;
typedef tbb::flow::function_node<std::shared_ptr<RecordObject>, tbb::flow::continue_msg, tbb::flow::rejecting> NodeTypeOneSidedDiscarding;
typedef tbb::flow::function_node<std::shared_ptr<RecordObject>, tbb::flow::continue_msg, tbb::flow::queueing> NodeTypeOneSidedQueueing;

public:
/// Base constructor for all nodes
AbstractNode(const std::string & nodeID, bool queueing)
: m_nodeID(nodeID)
, m_queueing(queueing)
{
m_configurationDictionary.setValueRangeDictionary(&m_valueRangeDictionary);
}
};

在上面的抽象节点中,Supra定义了不同的TBB节点类型。在大的类型上,只使用了功能节点,但在功能节点中又处理了不同的消息和对消息的处理控制参数,从而实现一些特定的功能。比如使用tbb::flow::queueing,表示可以缓存消息。其它的细节参看前面的TBB相关说明即可。而在其它节点中又可以根据情况自定义TBB相关节点:

class ScanConverterNode : public AbstractNode {
private:
typedef tbb::flow::broadcast_node<std::shared_ptr<RecordObject> > MaskOutputNodeType;

public:
};

此处又定义了一个广播节点。
其应用方式也很简单,通过端口来动态获取相关节点信息:

virtual size_t getNumInputs() { return 1; }
virtual size_t getNumOutputs() { return 2; }

virtual tbb::flow::graph_node * getInput(size_t index) {
if (index == 0)
{
return m_node.get();
}
return nullptr;
};

virtual tbb::flow::graph_node * getOutput(size_t index) {
if (index == 0)
{
return m_node.get();
}
if (index == 1)
{
return m_maskOutputNode.get();
}
return nullptr;
};
private:
std::unique_ptr<MaskOutputNodeType> m_maskOutputNode;

2、Supra节点中对TBB节点的管理
Supra中通过配置文件利用工厂来处理其自身节点类型的创建,然后在创建过程中,动态处理节点内部的TBB节点,包括图等参数的管理:
从配置文件到工厂类的处理:

void SupraManager::readFromXml(const char * configXmlFilename, bool queueing)
{
tinyxml2::XMLDocument doc;
XMLError errLoad = doc.LoadFile(configXmlFilename);
logging::log_error_if(errLoad != tinyxml2::XML_SUCCESS,
"SupraManager::readFromXml: Could not open file '", configXmlFilename, "'. Error value was: ", errLoad);

//Read the list of objects from the XML file
XMLElement* configElement = doc.FirstChildElement("supra_config");
logging::log_error_if(!configElement,
"SupraManager::readFromXml: Error reading config file '", configXmlFilename, "'. It did not contain the root element <supra_config>");
if (configElement)
{
XMLElement* devicesElement = configElement->FirstChildElement("devices");
//Inputs
if (devicesElement)
{
XMLElement* inputsElement = devicesElement->FirstChildElement("inputs");

if (inputsElement)
{
readInputDevicesFromXml(inputsElement);
}
//Outputs
XMLElement* outputsElement = devicesElement->FirstChildElement("outputs");
if (outputsElement)
{
readOutputDevicesFromXml(outputsElement, queueing);
}
//Other nodes
XMLElement* nodesElement = devicesElement->FirstChildElement("nodes");
if (nodesElement)
{
readNodesFromXml(nodesElement, queueing);
}
}

//Read the edges from the XML file
XMLElement* connectionsElement = configElement->FirstChildElement("connections");
if (connectionsElement)
{
readConnectionsFromXml(connectionsElement);
}
}
}

void SupraManager::readInputDevicesFromXml(tinyxml2::XMLElement * inputsElement)
{
XMLElement* nextInput = inputsElement->FirstChildElement("input");
while (nextInput)
{
//find type and id of input element
string inputType = nextInput->Attribute("type");
string inputID = nextInput->Attribute("id");

size_t numPorts = 1;
if (nextInput->Attribute("ports"))
{
numPorts = std::stoi(nextInput->Attribute("ports"));
}

//create input element
auto in = InterfaceFactory::createInputDevice(m_graph, inputID, inputType, numPorts);

if (in)
{
//load config for this element
ConfigurationDictionary dict(nextInput);
in->changeConfig(dict);

//store input node
bool couldAdd = addNode(inputID, in, inputType);
logging::log_warn_if(!couldAdd, "SupraManager: Node '", inputID, "' already existed. Did not add it to collection.");
if (couldAdd)
{
m_inputDevices[inputID] = in;
}
}

//advance to next element
nextInput = nextInput->NextSiblingElement("input");
}
}

void SupraManager::readOutputDevicesFromXml(tinyxml2::XMLElement * outputsElement, bool queueing)
{
XMLElement* nextOutput = outputsElement->FirstChildElement("output");
while (nextOutput)
{
//find type and id of input element
string outputType = nextOutput->Attribute("type");
string outputID = nextOutput->Attribute("id");

//create input element
auto out = InterfaceFactory::createOutputDevice(m_graph, outputID, outputType, queueing);

if (out)
{
//load config for this element
ConfigurationDictionary dict(nextOutput);
out->changeConfig(dict);

//store output node
bool couldAdd = addNode(outputID, out, outputType);
logging::log_warn_if(!couldAdd, "SupraManager: Node '", outputID, "' already existed. Did not add it to collection.");
if (couldAdd)
{
m_outputDevices[outputID] = out;
}
}

//advance to next element
nextOutput = nextOutput->NextSiblingElement("output");
}
}

void SupraManager::readNodesFromXml(tinyxml2::XMLElement * nodesElement, bool queueing)
{
XMLElement* nextNode = nodesElement->FirstChildElement("node");
while (nextNode)
{
//find type and id of node element
string nodeType = nextNode->Attribute("type");
string nodeID = nextNode->Attribute("id");

//create node
auto node = InterfaceFactory::createNode(m_graph, nodeID, nodeType, queueing);

if (node)
{
//load parameters for this element
ConfigurationDictionary dict(nextNode);
node->changeConfig(dict);

//store node
bool couldAdd = addNode(nodeID, node, nodeType);
logging::log_warn_if(!couldAdd, "SupraManager: Node '", nodeID, "' already existed. Did not add it to collection.");
}

//advance to next element
nextNode = nextNode->NextSiblingElement("node");
}
}

void SupraManager::readConnectionsFromXml(tinyxml2::XMLElement * connectionsElement)
{
XMLElement* nextConnection = connectionsElement->FirstChildElement("connection");
while (nextConnection)
{
//from
XMLElement* fromElement = nextConnection->FirstChildElement("from");
string fromID = fromElement->Attribute("id");
int fromPort = 0;
if (fromElement->QueryIntAttribute("port", &fromPort) != XML_SUCCESS)
{
logging::log_error("SupraManager: Error parsing the port attribute of a connection from '", fromID, "'.");
}

//to
XMLElement* toElement = nextConnection->FirstChildElement("to");
string toID = toElement->Attribute("id");
int toPort = 0;
if (toElement->QueryIntAttribute("port", &toPort) != XML_SUCCESS)
{
logging::log_error("SupraManager: Error parsing the port attribute of a connection to '", toID, "'.");
}

//create the connection
connect(fromID, fromPort, toID, toPort);

//advance to next element
nextConnection = nextConnection->NextSiblingElement("connection");
}
}

内部TBB节点的创建:

shared_ptr<tbb::flow::graph> InterfaceFactory::createGraph() {
  new tbb::flow::graph();
  return make_shared<tbb::flow::graph>();
}

shared_ptr<AbstractInput> InterfaceFactory::createInputDevice(shared_ptr<tbb::flow::graph> pG,
                                                              const std::string &nodeID, std::string deviceType,
                                                              size_t numPorts) {
  shared_ptr<AbstractInput> retVal = shared_ptr<AbstractInput>(nullptr);

  if (numPorts > 1 && deviceType != "UltrasoundInterfaceCephasonicsCC") {
    logging::log_warn("InterfaceFactory: More than one port currently not "
                      "supported for input device " +
                      deviceType + ".");
  }
......
#ifdef HAVE_DEVICE_ULTRASONIX
  if (deviceType == "UltrasoundInterfaceUltrasonix") {
    retVal = make_shared<UltrasoundInterfaceUltrasonix>(*pG, nodeID);
  }
#endif // HAVE_DEVICE_ULTRASONIX
#ifdef HAVE_DEVICE_CEPHASONICS
  if (deviceType == "UltrasoundInterfaceCephasonics") {
    retVal = make_shared<UsIntCephasonicsBmode>(*pG, nodeID);
  }
  if (deviceType == "UltrasoundInterfaceCephasonicsBTCC") {
    retVal = make_shared<UsIntCephasonicsBtcc>(*pG, nodeID);
  }
#ifdef HAVE_CUDA
  if (deviceType == "UltrasoundInterfaceCephasonicsCC") {
    retVal = make_shared<UsIntCephasonicsCc>(*pG, nodeID, numPorts);
  }
#endif // HAVE_CUDA
......
  return retVal;
}

shared_ptr<AbstractOutput> InterfaceFactory::createOutputDevice(shared_ptr<tbb::flow::graph> pG,
                                                                const std::string &nodeID, std::string deviceType,
                                                                bool queueing) {
  shared_ptr<AbstractOutput> retVal = shared_ptr<AbstractOutput>(nullptr);
......
#ifdef HAVE_DEVICE_ROSIMAGE_OUTPUT
  if (deviceType == "RosImageOutputDevice") {
    retVal = make_shared<RosImageOutputDevice>(*pG, nodeID, queueing);
  }
#endif // HAVE_DEVICE_ROSIMAGE_OUTPUT
#ifdef HAVE_DEVICE_ROS_EDEN_OUTPUT
  if (deviceType == "EdenImageOutputDevice") {
    retVal = make_shared<EdenImageOutputDevice>(*pG, nodeID, queueing);
  }
#endif // HAVE_DEVICE_ROS_EDEN_OUTPUT
......
  return retVal;
}

shared_ptr<AbstractNode> InterfaceFactory::createNode(shared_ptr<tbb::flow::graph> pG, const std::string &nodeID,
                                                      std::string nodeType, bool queueing) {
  shared_ptr<AbstractNode> retVal = shared_ptr<AbstractNode>(nullptr);

  if (m_nodeCreators.count(nodeType) != 0) {
    retVal = m_nodeCreators[nodeType](*pG, nodeID, queueing);
  }

  logging::log_error_if(!((bool)retVal), "Error creating node. Requested type '", nodeType,
                        "' is unknown. Did you activate the corresponding "
                        "module in the build of the library?");
  logging::log_info_if((bool)retVal, "Created node '", nodeType, "' with ID '", nodeID, "'");
  return retVal;
}

std::vector<std::string> InterfaceFactory::getNodeTypes() {
  std::vector<std::string> nodeTypes(m_nodeCreators.size());
  std::transform(m_nodeCreators.begin(), m_nodeCreators.end(), nodeTypes.begin(),
                 [](std::pair<std::string, nodeCreationFunctionType> entry) { return entry.first; });
  return nodeTypes;
}

std::map<std::string, InterfaceFactory::nodeCreationFunctionType> InterfaceFactory::m_nodeCreators = {
    {"StreamSynchronizer", [](tbb::flow::graph &g, std::string nodeID,
                              bool queueing) { return make_shared<StreamSynchronizer>(g, nodeID, queueing); }},
    {"TemporalOffsetNode", [](tbb::flow::graph &g, std::string nodeID,
                              bool queueing) { return make_shared<TemporalOffsetNode>(g, nodeID, queueing); }},
    {"FrequencyLimiterNode", [](tbb::flow::graph &g, std::string nodeID,
                                bool queueing) { return make_shared<FrequencyLimiterNode>(g, nodeID, queueing); }},
    {"AutoQuitNode", [](tbb::flow::graph &g, std::string nodeID,
                        bool queueing) { return make_shared<AutoQuitNode>(g, nodeID, queueing); }},
    {"StreamSyncNode", [](tbb::flow::graph &g, std::string nodeID,
                          bool queueing) { return make_shared<StreamSyncNode>(g, nodeID, queueing); }},
    {"ImageProcessingCpuNode", [](tbb::flow::graph &g, std::string nodeID,
                                  bool queueing) { return make_shared<ImageProcessingCpuNode>(g, nodeID, queueing); }},
#ifdef HAVE_CUDA
    {"NoiseNode", [](tbb::flow::graph &g, std::string nodeID,
                     bool queueing) { return make_shared<NoiseNode>(g, nodeID, queueing); }},
    {"ImageProcessingCudaNode",
     [](tbb::flow::graph &g, std::string nodeID, bool queueing) {
       return make_shared<ImageProcessingCudaNode>(g, nodeID, queueing);
     }},
    {"ImageProcessingBufferCudaNode",
     [](tbb::flow::graph &g, std::string nodeID, bool queueing) {
       return make_shared<ImageProcessingBufferCudaNode>(g, nodeID, queueing);
     }},
    {"FilterSradCudaNode", [](tbb::flow::graph &g, std::string nodeID,
                              bool queueing) { return make_shared<FilterSradCudaNode>(g, nodeID, queueing); }},
    {"TimeGainCompensationNode",
     [](tbb::flow::graph &g, std::string nodeID, bool queueing) {
       return make_shared<TimeGainCompensationNode>(g, nodeID, queueing);
     }},
    {"DarkFilterThresholdingCudaNode",
     [](tbb::flow::graph &g, std::string nodeID,
        bool queueing) { return make_shared<DarkFilterThresholdingCudaNode>(g, nodeID, queueing); }},
    {"BilateralFilterCudaNode",
     [](tbb::flow::graph &g, std::string nodeID,
        bool queueing) { return make_shared<BilateralFilterCudaNode>(g, nodeID, queueing); }},
    {"MedianFilterCudaNode", [](tbb::flow::graph &g, std::string nodeID,
                                bool queueing) { return make_shared<MedianFilterCudaNode>(g, nodeID, queueing); }},
#endif

#ifdef HAVE_CUFFT
    {"HilbertEnvelopeNode", [](tbb::flow::graph &g, std::string nodeID,
                               bool queueing) { return make_shared<HilbertEnvelopeNode>(g, nodeID, queueing); }},
#endif
......
};

代码很容易理解,从XML的配置文件中读取相关创建和连接信息,再将每个节点的包含参数也读取出来。然后动态创建即可。
下面看一下Supra节点创建及内部TBB节点的动态创建:

ScanConverterNode::ScanConverterNode(tbb::flow::graph & graph, const std::string & nodeID, bool queueing)
: AbstractNode(nodeID, queueing)
, m_maskSent(false)
, m_parameterChangeRequiresInternalUpdate(false)
{
if (queueing)
{
m_node = unique_ptr<NodeTypeQueueing>(
new NodeTypeQueueing(graph, 1, [this](shared_ptr<RecordObject> inObj) -> shared_ptr<RecordObject> { return checkTypeAndConvert(inObj); }));
}
else
{
m_node = unique_ptr<NodeTypeDiscarding>(
new NodeTypeDiscarding(graph, 1, [this](shared_ptr<RecordObject> inObj) -> shared_ptr<RecordObject> { return checkTypeAndConvert(inObj); }));
}
m_maskOutputNode = unique_ptr<MaskOutputNodeType>(new MaskOutputNodeType(graph));
m_converter = unique_ptr<ScanConverter>(new ScanConverter());
m_callFrequency.setName("ScanConv");

m_valueRangeDictionary.set<double>("imageResolution", 0.01, 5, 0.1, "Forced image resolution");
m_valueRangeDictionary.set<bool>("imageResolutionForced", { true, false }, false, "Force image resolution");
m_valueRangeDictionary.set<DataType>("outputType", { TypeFloat, TypeInt16, TypeUint8}, TypeFloat, "Output type");

configurationChanged();
}

这下子明白了吧。然后是创建节点间的连接:

void SupraManager::connect(string fromID, size_t fromPort, string toID, size_t toPort)
{
if (nodeExists(fromID) && nodeExists(toID))
{
shared_ptr<AbstractNode> fromNode = m_nodes[fromID];
shared_ptr<AbstractNode> toNode = m_nodes[toID];
if (fromNode->getNumOutputs() > fromPort && toNode->getNumInputs() > toPort)
{
auto connTuple = std::make_tuple(fromID, fromPort, toID, toPort);
if (m_nodeConnections.count(connTuple) == 0)
{
m_nodeConnections[connTuple] = true;
tbb::flow::make_edge(
*(dynamic_cast<tbb::flow::sender<std::shared_ptr<RecordObject> >*>(fromNode->getOutput(fromPort))),
*(dynamic_cast<tbb::flow::receiver<std::shared_ptr<RecordObject> >*>(toNode->getInput(toPort))));
logging::log_log("SupraManager: Added connection from (", fromID, ", ", fromPort, ") to (", toID, ", ", toPort, ").");
}
......
}

void SupraManager::disconnect(string fromID, size_t fromPort, string toID, size_t toPort)
{
if (nodeExists(fromID) && nodeExists(toID))
{
shared_ptr<AbstractNode> fromNode = m_nodes[fromID];
shared_ptr<AbstractNode> toNode = m_nodes[toID];
if (fromNode->getNumOutputs() > fromPort && toNode->getNumInputs() > toPort)
{
auto connTuple = std::make_tuple(fromID, fromPort, toID, toPort);
if (m_nodeConnections.count(connTuple) == 1 &&
m_nodeConnections[connTuple])
{
m_nodeConnections.erase(connTuple);
tbb::flow::remove_edge(
*(dynamic_cast<tbb::flow::sender<std::shared_ptr<RecordObject> >*>(fromNode->getOutput(fromPort))),
*(dynamic_cast<tbb::flow::receiver<std::shared_ptr<RecordObject> >*>(toNode->getInput(toPort))));
logging::log_log("SupraManager: Removed connection from (", fromID, ", ", fromPort, ") to (", toID, ", ", toPort, ").");
}
......
}

重点看make_edge等相关即可。就明白了Supra节点到TBB节点的映射。

3、两类节点的共同应用
节点创建并连接成功后,就可以进入数据的按图的流向进行工作了
输入节点数据(Supra节点):

void UltrasoundInterfaceRawDataMock::initializeDevice()
{
if (getTimerFrequency() != m_frequency)
{
setUpTimer(m_frequency);
}

try
{
m_protoRawData = RxBeamformerParameters::readMetaDataForMock(m_mockMetadataFilename);

m_numel = m_protoRawData->getNumReceivedChannels()*m_protoRawData->getNumSamples()*m_protoRawData->getNumScanlines();

// initialize m_mockDataStreams and m_sequenceLengths by getting the file sizes of all datafiles
m_mockDataStramReadBuffers.resize(m_mockDataFilenames.size());
m_mockDataStreams.resize(m_mockDataFilenames.size());
m_sequenceLengths.resize(m_mockDataFilenames.size());
for (size_t k = 0; k < m_mockDataFilenames.size(); k++)
{
// In order to maximize reading performance, the ifstream needs a large read buffer
m_mockDataStramReadBuffers[k].resize(128 * 1024, '\0');
m_mockDataStreams[k] = std::shared_ptr<std::ifstream>(new std::ifstream);
m_mockDataStreams[k]->open(m_mockDataFilenames[k], std::ifstream::ate | std::ifstream::binary);
if (!m_mockDataStreams[k]->good())
{
logging::log_error("UltrasoundInterfaceRawDataMock: Error opening mock file ", m_mockDataFilenames[k]);
throw std::runtime_error("UltrasoundInterfaceRawDataMock: Error opening mock file ");
}
m_mockDataStreams[k]->rdbuf()->pubsetbuf(m_mockDataStramReadBuffers[k].data(), m_mockDataStramReadBuffers[k].size());
size_t filesizeBytes = m_mockDataStreams[k]->tellg();
m_mockDataStreams[k]->seekg(0);

m_sequenceLengths[k] = filesizeBytes / (m_numel * sizeof(int16_t));
}

readNextFrame();
m_ready = true;
}
catch (std::exception e)
{
logging::log_error("UltrasoundInterfaceRawDataMock: Caught exception preparing mock meta or mock file");
m_ready = false;
}
}
bool UltrasoundInterfaceRawDataMock::timerCallback() {
if (!m_frozen)
{
double timestamp = getCurrentTime();

m_callFrequency.measure();
shared_ptr<USRawData> pRawData = std::make_shared<USRawData>(
m_protoRawData->getNumScanlines(),
m_protoRawData->getNumElements(),
m_protoRawData->getElementLayout(),
m_protoRawData->getNumReceivedChannels(),
m_protoRawData->getNumSamples(),
m_protoRawData->getSamplingFrequency(),
m_pMockData,
m_protoRawData->getRxBeamformerParameters(),
m_protoRawData->getImageProperties(),
getCurrentTime(),
getCurrentTime());
addData<0>(pRawData);

if (!m_singleImage)
{
if (m_lastFrame)
{
setRunning(false);
}
else
{
readNextFrame();
}
}
m_callFrequency.measureEnd();
}
return getRunning();
}
void UltrasoundInterfaceRawDataMock::readNextFrame()
{
auto mockDataHost = make_shared<Container<int16_t> >(LocationHost, ContainerFactory::getNextStream(), m_numel);

m_mockDataStreams[m_sequenceIndex]->read(reinterpret_cast<char*>(mockDataHost->get()), m_numel * sizeof(int16_t));
m_pMockData = make_shared<Container<int16_t> >(LocationGpu, *mockDataHost);
// advance to the next image and sequence where required
m_frameIndex = (m_frameIndex + 1) % m_sequenceLengths[m_sequenceIndex];
if (m_frameIndex == 0)
{
m_mockDataStreams[m_sequenceIndex]->seekg(0);
m_sequenceIndex = (m_sequenceIndex + 1) % m_sequenceLengths.size();
if (m_sequenceIndex == 0 && m_streamSequenceOnce)
{
m_lastFrame = true;
}
}
}

上面的代码看起来挺复杂,其实非常简单。首先启动一个输入的数据控制定时器,输入节点在将模拟的图像数据读取后,通过广播节点发送出去:

//上面在定时器中调用的抽象节点中的addData:
bool addData(std::shared_ptr<RecordObject> data)
{
return m_pOutputNodes[index]->try_put(data);
}
std::vector<std::unique_ptr<tbb::flow::broadcast_node<std::shared_ptr<RecordObject> > > > m_pOutputNodes;

它又定义在其子类输入节点中:

AbstractInput(tbb::flow::graph& graph, const std::string & nodeID, size_t numPorts)
: AbstractNode(nodeID, false), m_numOutputs(numPorts)
{
m_pOutputNodes.resize(m_numOutputs);
for (size_t i = 0; i < m_numOutputs; i++)
{
m_pOutputNodes[i] = std::unique_ptr<tbb::flow::broadcast_node<std::shared_ptr<RecordObject> > >(
new tbb::flow::broadcast_node<std::shared_ptr<RecordObject> >(graph));
}
}

功能节点处理数据:

LogCompressorNode::LogCompressorNode(tbb::flow::graph & graph, const std::string & nodeID, bool queueing)
: AbstractNode(nodeID, queueing)
, m_dynamicRange(80)
, m_gain(1)
, m_inputMax(1024)
, m_editedImageProperties(nullptr)
{
if (queueing)
{
m_node = unique_ptr<NodeTypeQueueing>(
new NodeTypeQueueing(graph, 1, [this](shared_ptr<RecordObject> inObj) -> shared_ptr<RecordObject> { return checkTypeAndCompress(inObj); }));
}
else
{
m_node = unique_ptr<NodeTypeDiscarding>(
new NodeTypeDiscarding(graph, 1, [this](shared_ptr<RecordObject> inObj) -> shared_ptr<RecordObject> { return checkTypeAndCompress(inObj); }));
}

......
}
shared_ptr<RecordObject> LogCompressorNode::checkTypeAndCompress(shared_ptr<RecordObject> inObj)
{
shared_ptr<USImage> pImage = nullptr;
if (inObj && inObj->getType() == TypeUSImage)
{
shared_ptr<USImage> pInImage = dynamic_pointer_cast<USImage>(inObj);
if (pInImage)
{
unique_lock<mutex> l(m_mutex);
m_callFrequency.measure();

std::shared_ptr<ContainerBase> pImageCompressedData;
switch (pInImage->getDataType())
{
case TypeUint8:
pImageCompressedData = compressTemplated<uint8_t>(pInImage->getData<uint8_t>(), pInImage->getSize());
break;
case TypeInt16:
pImageCompressedData = compressTemplated<int16_t>(pInImage->getData<int16_t>(), pInImage->getSize());
break;
case TypeFloat:
pImageCompressedData = compressTemplated<float>(pInImage->getData<float>(), pInImage->getSize());
break;
default:
logging::log_error("LogCompressorNode: Input image type not supported");
break;
}
m_callFrequency.measureEnd();

if (pInImage->getImageProperties() != m_lastSeenImageProperties)
{
updateImageProperties(pInImage->getImageProperties());
}

pImage = make_shared<USImage>(
pInImage->getSize(),
pImageCompressedData,
m_editedImageProperties,
pInImage->getReceiveTimestamp(),
pInImage->getSyncTimestamp());
}
else {
logging::log_error("LogCompressorNode: could not cast object to USImage type, is it in suppored ElementType?");
}
}
return pImage;
}

输出节点数据:

//创建IGTL线程和连接
void OpenIGTLinkOutputDevice::waitAsyncForConnection()
{
if (m_pConnectionThread && m_pConnectionThread->joinable())
{
m_pConnectionThread->join();
}

m_pConnectionThread = unique_ptr<thread>(
new thread([this]() {
log_info("IGTL: waiting for connection");
m_clientConnection = m_server->WaitForConnection();
m_isConnected = true;
log_info("IGTL: got connection!");
}));
}
//从上一级节点接收数据
void OpenIGTLinkOutputDevice::writeData(std::shared_ptr<RecordObject> data)
{
if (m_isReady && getRunning() && m_isConnected)
{
m_callFrequency.measure();
sendMessage(data);
m_callFrequency.measureEnd();
}
}
       //不同条件处理方法不同
void OpenIGTLinkOutputDevice::sendMessage(shared_ptr<const RecordObject> data)
{
switch (data->getType())
{
case TypeSyncRecordObject:
sendSyncRecordMessage(data);
break;
case TypeTrackerDataSet:
sendTrackingMessage(data);
break;
case TypeUSImage:
sendImageMessage(data);
break;
case TypeRecordUnknown:
default:
break;
}
}
//通过IGTL发送出去
template <typename T>
void OpenIGTLinkOutputDevice::sendImageMessageTemplated(shared_ptr<const USImage> imageData)
{
......

size_t numElements = imageSize.x * imageSize.y * imageSize.z;
memcpy(pImageMsg->GetScalarPointer(), imageContainer->get(), numElements * sizeof(T));

pImageMsg->Pack();

int sendResult = m_clientConnection->Send(pImageMsg->GetPackPointer(), pImageMsg->GetPackSize());
if (sendResult == 0) //when it could not be sent
{
m_isConnected = false;
log_info("IGTL: Lost connection. Waiting for next connection.");
waitAsyncForConnection();
}
}
}

这样,一个完整的并行框架就建立起来了。

四、总结

其实在研读Supra代码的过程中,最大的受益并不是TBB技术的学习,而是在设计思想上的又一次提高。对如何实事求是,因地制宜的进行技术设计有了一个更清晰的认知和验证。所以说多看代码,多分析源码,受益匪浅!


原文地址:https://blog.csdn.net/fpcc/article/details/142884061

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!