Skip to content

Commit 0ee51a0

Browse files
committed
DPL: add test for routing messages
1 parent 96e2f45 commit 0ee51a0

File tree

1 file changed

+74
-0
lines changed

1 file changed

+74
-0
lines changed

Framework/Core/test/test_ForwardInputs.cxx

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,80 @@ TEST_CASE("ForwardInputsSplitPayload")
616616
CHECK(result[1].Size() == 3);
617617
}
618618

619+
TEST_CASE("ForwardInputsSplitPayloadNoMessageSet")
620+
{
621+
o2::header::DataHeader dh;
622+
dh.dataOrigin = "TST";
623+
dh.dataDescription = "A";
624+
dh.subSpecification = 0;
625+
dh.splitPayloadIndex = 2;
626+
dh.splitPayloadParts = 2;
627+
628+
o2::header::DataHeader dh2;
629+
dh2.dataOrigin = "TST";
630+
dh2.dataDescription = "B";
631+
dh2.subSpecification = 0;
632+
dh2.splitPayloadIndex = 0;
633+
dh2.splitPayloadParts = 1;
634+
635+
o2::framework::DataProcessingHeader dph{0, 1};
636+
637+
std::vector<fair::mq::Channel> channels{
638+
fair::mq::Channel("from_A_to_B"),
639+
fair::mq::Channel("from_A_to_C"),
640+
};
641+
642+
bool consume = true;
643+
bool copyByDefault = true;
644+
FairMQDeviceProxy proxy;
645+
std::vector<ForwardRoute> routes{
646+
ForwardRoute{
647+
.timeslice = 0,
648+
.maxTimeslices = 1,
649+
.matcher = {"binding", ConcreteDataMatcher{"TST", "B", 0}},
650+
.channel = "from_A_to_B",
651+
.policy = nullptr,
652+
},
653+
ForwardRoute{
654+
.timeslice = 0,
655+
.maxTimeslices = 1,
656+
.matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
657+
.channel = "from_A_to_C",
658+
.policy = nullptr,
659+
}};
660+
661+
auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
662+
for (auto& channel : channels) {
663+
if (channel.GetName() == channelName) {
664+
return channel;
665+
}
666+
}
667+
throw std::runtime_error("Channel not found");
668+
};
669+
670+
proxy.bind({}, {}, routes, findChannelByName, nullptr);
671+
672+
auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
673+
fair::mq::MessagePtr payload1(transport->CreateMessage());
674+
fair::mq::MessagePtr payload2(transport->CreateMessage());
675+
auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
676+
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
677+
std::vector<std::unique_ptr<fair::mq::Message>> messages;
678+
messages.push_back(std::move(header));
679+
messages.push_back(std::move(payload1));
680+
messages.push_back(std::move(payload2));
681+
auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph});
682+
messages.push_back(std::move(header2));
683+
messages.push_back(transport->CreateMessage());
684+
685+
std::vector<fair::mq::Parts> result(2);
686+
auto span = std::span(messages);
687+
o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, span, result, copyByDefault, consume);
688+
REQUIRE(result.size() == 2); // Two routes
689+
CHECK(result[0].Size() == 2); // No messages on this route
690+
CHECK(result[1].Size() == 3);
691+
}
692+
619693
TEST_CASE("ForwardInputEOSSingleRoute")
620694
{
621695
o2::framework::SourceInfoHeader sih{};

0 commit comments

Comments
 (0)