Iguana LATEST
Implementation Guardian of Analysis Algorithms
Loading...
Searching...
No Matches
TestMultithreading.h
1// multithreaded test of an iguana algorithm
2
3#include <hipo4/reader.h>
4#include <iguana/algorithms/AlgorithmSequence.h>
5#include <random>
6
7inline int TestMultithreading(
8 std::string const command,
9 std::string const algo_name,
10 std::vector<std::string> const prerequisite_algos,
11 std::vector<std::string> const bank_names,
12 std::string const data_file,
13 int const num_events,
14 int const num_threads,
15 std::string const concurrency_model,
16 bool const vary_run,
17 std::string const log_level)
18{
19
20 iguana::Logger log("test");
21 log.SetLevel(log_level);
22
23 // check arguments
24 if(algo_name.empty() || bank_names.empty()) {
25 log.Error("need algorithm name and banks");
26 return 1;
27 }
28 if(data_file.empty()) {
29 log.Error("need a data file for command {:?}", command);
30 return 1;
31 }
32
33 // set the concurrency model
34 if(!concurrency_model.empty())
35 iguana::GlobalConcurrencyModel = concurrency_model;
36
37 // find the 'RUN::config' bank, if any
38 std::optional<hipo::banklist::size_type> run_config_bank_idx{};
39 if(vary_run) {
40 for(hipo::banklist::size_type idx = 0; idx < bank_names.size(); idx++) {
41 if(bank_names.at(idx) == "RUN::config") {
42 run_config_bank_idx = idx;
43 break;
44 }
45 }
46 }
47
48 // number of events per thread
49 int const default_frame_size = 50;
50 int num_events_per_thread = (int)std::round((double)num_events / num_threads);
51 int num_events_per_frame = num_events > 0 ? std::min(num_events_per_thread, default_frame_size) : default_frame_size;
52 int num_frames_per_thread = num_events > 0 ? (int)std::ceil((double)num_events_per_thread / num_events_per_frame) : 0;
53 int num_events_actual = num_events_per_frame * num_frames_per_thread * num_threads;
54 log.Info("num_events_per_thread = {}", num_events_per_thread);
55 log.Info("num_events_per_frame = {}", num_events_per_frame);
56 log.Info("num_frames_per_thread = {}", num_frames_per_thread);
57 if(num_events > 0) {
58 log.Info("=> will actually process num_events = {}", num_events_actual);
59 if(num_events != num_events_actual)
60 log.Warn("argument's num_events ({}) differs from the actual num_events that will be processed ({})",
61 num_events, num_events_actual);
62 }
63 else {
64 log.Info("=> will actually process num_events = ALL OF THEM");
65 }
66
67 // start the stream
68 hipo::readerstream stream;
69 stream.open(data_file.c_str());
70
71 // define the worker function
72 auto ftn = [&stream,
73 algo_name,
74 prerequisite_algos,
75 bank_names,
76 vary_run,
77 log_level,
78 num_events_per_thread,
79 num_events_per_frame,
80 run_config_bank_idx](int order) {
81 // fill a frame
82 std::vector<hipo::event> events;
83 for(int i = 0; i < num_events_per_frame; i++)
84 events.push_back(hipo::event());
85
86 // bank list
87 hipo::banklist banks;
88 for(auto const& bank_name : bank_names)
89 banks.push_back(hipo::bank(stream.dictionary().getSchema(bank_name.c_str()), 48));
90
91 // define the algorithm
93 for(auto const& prerequisite_algo : prerequisite_algos)
94 seq.Add(prerequisite_algo);
95 seq.Add(algo_name);
96 seq.SetName("TEST thread " + std::to_string(order));
97 seq.PrintSequence();
98 seq.SetLogLevel(algo_name, log_level);
99 seq.iguana::Algorithm::SetLogLevel(log_level);
100
101 // start the algorithm
102 seq.Start(banks);
103
104 // setup run number variation
105 std::vector<int> golden_runs = {
106 // FIXME: for now, just RG-A, since some algorithms can _only_ run on RG-A
107 4013, // rga_spring2018
108 5038, // rga_fall2018
109 // 5700, // rgk_fall2018
110 // 6302, // rgb_spring2019
111 6666, // rga_spring2019
112 // 11234, // rgb_fall2019
113 // 11567, // rgb_spring2020
114 // 12933, // rgf_summer2020
115 // 15833, // rgm_fall2021
116 // 16600, // rgc_spring2022
117 // 17407, // rgc_fall2022
118 // 17800, // rgc_spring2023
119 // 18779, // rgd_fall2023
120 // 19877, // rgk_2024
121 // 20522, // rge_2024
122 // 23050, // rgl_2025
123 };
124 std::mt19937 rng(std::random_device{}());
125 std::uniform_int_distribution<std::size_t> random_num(0, 9);
126 std::uniform_int_distribution<std::size_t> random_golden_run(0, golden_runs.size() - 1);
127 if(vary_run) {
128 seq.GetLog()->Warn("==================================================================================================");
129 seq.GetLog()->Warn("RUN NUMBER VARIATION IS ENABLED, DO NOT ATTEMPT TO CROSS CHECK RESULTS WITH SINGLE-THREADED TESTS!");
130 seq.GetLog()->Warn("==================================================================================================");
131 }
132
133 // loop over frames
134 int nProcessed = 0;
135 while(nProcessed < num_events_per_thread || num_events_per_thread == 0) {
136 stream.pull(events);
137
138 // loop over events in this frame
139 int nNonEmpty = 0;
140 for(auto& event : events) {
141 if(event.getSize() > 16) {
142 nNonEmpty++;
143 nProcessed++;
144
145 // read the banks
146 for(auto& bank : banks)
147 event.read(bank);
148
149 // occasionally vary the run number; so far, algorithms with data-dependent configuration
150 // parameters have dependence on run number, so this variation aims to improve thread
151 // sanitizer test coverage
152 if(vary_run && run_config_bank_idx.has_value()) {
153 if(random_num(rng) < 3) {
154 auto runnum = golden_runs[random_golden_run(rng)];
155 seq.GetLog()->Trace("RANDOMLY VARIED RUN NUMBER TO BE {}", runnum);
156 banks[run_config_bank_idx.value()].putInt("run", 0, runnum);
157 }
158 }
159
160 // run the iguana algorithm
161 seq.Run(banks);
162 }
163 }
164 if(nNonEmpty == 0)
165 break;
166 }
167
168 // stop the algorithm
169 seq.Stop();
170 if(vary_run) {
171 seq.GetLog()->Warn("=========================================================================================================");
172 seq.GetLog()->Warn("RANDOM RUN NUMBER VARIATION IS ENABLED, DO NOT ATTEMPT TO CROSS CHECK RESULTS WITH SINGLE-THREADED TESTS!");
173 seq.GetLog()->Warn("=========================================================================================================");
174 }
175
176 seq.GetLog()->Info("nProcessed = {}", nProcessed);
177 return nProcessed;
178 };
179
180 // run
181 stream.run(ftn, num_threads);
182 return 0;
183}
Algorithm: An algorithm that can run a sequence of algorithms
void Add(std::string const &algo_class_name, std::string const &algo_instance_name="")
void SetName(std::string_view name)
void PrintSequence(Logger::Level level=Logger::info) const
void SetLogLevel(std::string const &algo_instance_name, std::string const &lev)
Set an algorithm log level.
virtual bool Run(hipo::banklist &banks) const final
Run Function: Process an event's hipo::banklist
virtual void Stop() final
Stop Function: Finalize this algorithm after all events are processed.
virtual void Start(hipo::banklist &banks) final
Start Function: Initialize this algorithm before any events are processed, with the intent to process...
Simple logger service.
Definition Logger.h:18
std::unique_ptr< Logger > & GetLog()