3#include <hipo4/reader.h>
4#include <iguana/algorithms/AlgorithmSequence.h>
7inline int TestMultithreading(
8 std::string
const command,
9 std::string
const algo_name,
10 std::vector<std::string>
const prerequisite_algos,
11 std::vector<std::string>
const bank_names,
12 std::string
const data_file,
14 int const num_threads,
15 std::string
const concurrency_model,
17 std::string
const log_level)
21 log.SetLevel(log_level);
24 if(algo_name.empty() || bank_names.empty()) {
25 log.Error(
"need algorithm name and banks");
28 if(data_file.empty()) {
29 log.Error(
"need a data file for command {:?}", command);
34 if(!concurrency_model.empty())
35 iguana::GlobalConcurrencyModel = concurrency_model;
38 std::optional<hipo::banklist::size_type> run_config_bank_idx{};
40 for(hipo::banklist::size_type idx = 0; idx < bank_names.size(); idx++) {
41 if(bank_names.at(idx) ==
"RUN::config") {
42 run_config_bank_idx = idx;
49 int const default_frame_size = 50;
50 int num_events_per_thread = (int)std::round((
double)num_events / num_threads);
51 int num_events_per_frame = num_events > 0 ? std::min(num_events_per_thread, default_frame_size) : default_frame_size;
52 int num_frames_per_thread = num_events > 0 ? (int)std::ceil((
double)num_events_per_thread / num_events_per_frame) : 0;
53 int num_events_actual = num_events_per_frame * num_frames_per_thread * num_threads;
54 log.Info(
"num_events_per_thread = {}", num_events_per_thread);
55 log.Info(
"num_events_per_frame = {}", num_events_per_frame);
56 log.Info(
"num_frames_per_thread = {}", num_frames_per_thread);
58 log.Info(
"=> will actually process num_events = {}", num_events_actual);
59 if(num_events != num_events_actual)
60 log.Warn(
"argument's num_events ({}) differs from the actual num_events that will be processed ({})",
61 num_events, num_events_actual);
64 log.Info(
"=> will actually process num_events = ALL OF THEM");
68 hipo::readerstream stream;
69 stream.open(data_file.c_str());
78 num_events_per_thread,
80 run_config_bank_idx](
int order) {
82 std::vector<hipo::event> events;
83 for(
int i = 0; i < num_events_per_frame; i++)
84 events.push_back(hipo::event());
88 for(
auto const& bank_name : bank_names)
89 banks.push_back(hipo::bank(stream.dictionary().getSchema(bank_name.c_str()), 48));
93 for(
auto const& prerequisite_algo : prerequisite_algos)
94 seq.
Add(prerequisite_algo);
96 seq.
SetName(
"TEST thread " + std::to_string(order));
99 seq.iguana::Algorithm::SetLogLevel(log_level);
105 std::vector<int> golden_runs = {
124 std::mt19937 rng(std::random_device{}());
125 std::uniform_int_distribution<std::size_t> random_num(0, 9);
126 std::uniform_int_distribution<std::size_t> random_golden_run(0, golden_runs.size() - 1);
128 seq.
GetLog()->Warn(
"==================================================================================================");
129 seq.
GetLog()->Warn(
"RUN NUMBER VARIATION IS ENABLED, DO NOT ATTEMPT TO CROSS CHECK RESULTS WITH SINGLE-THREADED TESTS!");
130 seq.
GetLog()->Warn(
"==================================================================================================");
135 while(nProcessed < num_events_per_thread || num_events_per_thread == 0) {
140 for(
auto& event : events) {
141 if(event.getSize() > 16) {
146 for(
auto& bank : banks)
152 if(vary_run && run_config_bank_idx.has_value()) {
153 if(random_num(rng) < 3) {
154 auto runnum = golden_runs[random_golden_run(rng)];
155 seq.
GetLog()->Trace(
"RANDOMLY VARIED RUN NUMBER TO BE {}", runnum);
156 banks[run_config_bank_idx.value()].putInt(
"run", 0, runnum);
171 seq.
GetLog()->Warn(
"=========================================================================================================");
172 seq.
GetLog()->Warn(
"RANDOM RUN NUMBER VARIATION IS ENABLED, DO NOT ATTEMPT TO CROSS CHECK RESULTS WITH SINGLE-THREADED TESTS!");
173 seq.
GetLog()->Warn(
"=========================================================================================================");
176 seq.
GetLog()->Info(
"nProcessed = {}", nProcessed);
181 stream.run(ftn, num_threads);
Algorithm: An algorithm that can run a sequence of algorithms
void Add(std::string const &algo_class_name, std::string const &algo_instance_name="")
void SetName(std::string_view name)
void PrintSequence(Logger::Level level=Logger::info) const
void SetLogLevel(std::string const &algo_instance_name, std::string const &lev)
Set an algorithm log level.
virtual bool Run(hipo::banklist &banks) const final
Run Function: Process an event's hipo::banklist
virtual void Stop() final
Stop Function: Finalize this algorithm after all events are processed.
virtual void Start(hipo::banklist &banks) final
Start Function: Initialize this algorithm before any events are processed, with the intent to process...
std::unique_ptr< Logger > & GetLog()