Iguana LATEST
Implementation Guardian of Analysis Algorithms
Loading...
Searching...
No Matches
XCheckAlgorithm.h
1#pragma once
2// cross check an algorithm running it with different ways
3
4#include <iguana/algorithms/AlgorithmSequence.h>
5#ifdef USE_HIPO_MULTITHREADED_CHAIN
6#include "Fuzzing.h"
7#include <hipo4/chain.h>
8#include <hipo4/reader.h>
9#include <unordered_set>
10#endif
11#include <unistd.h>
12
13inline void FailXCheckAlgorithm() noexcept(false)
14{
15 throw std::runtime_error("Algorithm cross check test (xcheckalgo) failed");
16}
17
18inline void InvestigateBanks(hipo::banklist& banks, iguana::Logger& log)
19{
20#ifdef USE_HIPO_MULTITHREADED_CHAIN
21 log.Error("Num banks: {}", banks.size());
22 // redirect stdout fd to stderr fd, since `hipo::bank::show` uses `printf`
23 int saved_stdout = dup(STDOUT_FILENO);
24 dup2(STDERR_FILENO, STDOUT_FILENO);
25 // show banks
26 for(auto& bank : banks) {
27 printf("Investigate bank '%s':\n", bank.getSchema().getName().c_str());
28 printf(" - checksum 0x%zx\n", bank.checksum());
29 printf(" - num rows filtered (total): %zu (%d)\n", bank.getRowList().size(), bank.getRows());
30 bank.show();
31 }
32 // restore stderr redirection
33 dup2(saved_stdout, STDOUT_FILENO);
34 close(saved_stdout);
35 log.Error("-----------------------------------------------------------------");
36#endif
37}
38
39// NOTE: the cross check must process ALL events, otherwise the multi-threaded reader may
40// sample a DIFFERENT set of events than the single-threaded reader
41inline int XCheckAlgorithm(
42 std::string const algo_name,
43 std::vector<std::string> const prerequisite_algos,
44 std::vector<std::string> const bank_names,
45 std::string const data_file,
46 int data_tag,
47 int const num_threads,
48 bool const vary_run,
49 std::string const log_level,
50 std::optional<int> investigate_evnum = std::nullopt)
51{
52
53#ifdef USE_HIPO_MULTITHREADED_CHAIN
54
55 // set concurrency model; see `Fuzzing` constructor for why this is necessary
56 iguana::GlobalConcurrencyModel = "memoize";
57
58 // set logger for this test
59 iguana::Logger log("test");
60 log.SetLevel(log_level);
61
62 // check arguments
63 if(algo_name.empty() || bank_names.empty()) {
64 log.Error("need algorithm name and banks");
65 return 1;
66 }
67 if(data_file.empty()) {
68 log.Error("need a data file for command 'xcheckalgo");
69 return 1;
70 }
71
72 // put `RUN::config` bank up front
73 hipo::banklist::size_type const run_config_bank_idx = 0;
74 std::vector<std::string> bank_names_ordered;
75 bank_names_ordered.push_back("RUN::config");
76 for(auto const& name : bank_names) {
77 if(name != "RUN::config")
78 bank_names_ordered.push_back(name);
79 }
80
81 // setup run number variation
82 Fuzzing fuzz;
83
84 // mutex
85 std::mutex mtx;
86
87 // total number of events
88 int num_events;
89
90 // checksum data structure `xcheck_t`:
91 // {
92 // { event1 number, { bank1 checksum, bank2 checksum, ... },
93 // { event2 number, { bank1 checksum, bank2 checksum, ... },
94 // ...,
95 // }
96 using evnum_t = int;
97 using checksum_list_t = std::vector<std::size_t>;
98 using xcheck_elem_t = std::pair<evnum_t, checksum_list_t>;
99 using xcheck_t = std::vector<xcheck_elem_t>;
100
101 // ----------------------------------------------------------------------------------
102 // run single threaded ("st")
103 // ----------------------------------------------------------------------------------
104
105 // xcheck data structures
106 xcheck_t st_checksums;
107 std::size_t st_num_banks;
108 std::vector<std::string> st_bank_names;
109
110 // test scope
111 log.Info("Run single-threaded reader...");
112 {
113 // open the HIPO file
114 hipo::reader r(data_file.c_str(), {data_tag});
115 auto banks = r.getBanks(bank_names_ordered);
116 num_events = r.getEntries();
117
118 // define the algorithm
120 for(auto const& prerequisite_algo : prerequisite_algos)
121 seq.Add(prerequisite_algo);
122 seq.Add(algo_name);
123 seq.SetLogLevel(algo_name, log_level);
124 seq.iguana::Algorithm::SetLogLevel(log_level);
125
126 // start the algorithm
127 seq.Start(banks);
128
129 // get bank info
130 st_num_banks = banks.size();
131 for(auto& bank : banks)
132 st_bank_names.push_back(bank.getSchema().getName());
133
134 // event loop
135 while(r.next(banks)) { // NOTE: must cross check ALL events
136 // get event number
137 auto& run_config_bank = banks[run_config_bank_idx];
138 auto const evnum = run_config_bank.getInt("event", 0);
139 // run number variation
140 if(vary_run) {
141 auto random_run = fuzz.RunNumMod(evnum);
142 if(random_run.has_value())
143 run_config_bank.putInt("run", 0, random_run.value());
144 }
145 // run the algorithm
146 seq.Run(banks);
147 // bank checksums
148 checksum_list_t checksums;
149 for(auto const& bank : banks)
150 checksums.push_back(bank.checksum());
151 st_checksums.push_back({evnum, checksums});
152 // investigate an event
153 if(investigate_evnum && investigate_evnum.value() == evnum) {
154 log.Error("Example single-threaded event {} that failed cross check:", evnum);
155 InvestigateBanks(banks, log);
156 }
157 }
158
159 // stop the algorithm
160 seq.Stop();
161 }
162
163
164 // ----------------------------------------------------------------------------------
165 // run multi threaded ("mt")
166 // ----------------------------------------------------------------------------------
167
168 // xcheck data structures
169 xcheck_t mt_checksums;
170 std::size_t mt_num_banks;
171 std::vector<std::string> mt_bank_names;
172
173 // test scope
174 log.Info("Run multi-threaded reader...");
175 {
176
177 // start the HIPO chain
178 hipo::chain ch(num_threads, false, false);
179 ch.set_tags({data_tag});
180 ch.add(data_file);
181 auto banks = ch.getBanks(bank_names_ordered);
182
183 // define the algorithm
185 for(auto const& prerequisite_algo : prerequisite_algos)
186 seq.Add(prerequisite_algo);
187 seq.Add(algo_name);
188 seq.SetLogLevel(algo_name, log_level);
189 seq.iguana::Algorithm::SetLogLevel(log_level);
190
191 // start the algorithm
192 seq.Start(banks);
193
194 // get bank info
195 mt_num_banks = banks.size();
196 for(auto& bank : banks)
197 mt_bank_names.push_back(bank.getSchema().getName());
198
199 // define the event processor
200 auto processor = [&](auto& banks, int file_idx, long event_idx) {
201 // get event number
202 auto& run_config_bank = banks[run_config_bank_idx];
203 auto const evnum = run_config_bank.getInt("event", 0);
204 // run number variation
205 if(vary_run) {
206 auto random_run = fuzz.RunNumMod(evnum);
207 if(random_run.has_value())
208 run_config_bank.putInt("run", 0, random_run.value());
209 }
210 // run the iguana algorithm
211 seq.Run(banks);
212 // bank checksums
213 checksum_list_t checksums;
214 for(auto const& bank : banks)
215 checksums.push_back(bank.checksum());
216 std::lock_guard<std::mutex> const lock(mtx);
217 mt_checksums.push_back({evnum, checksums});
218 // investigate an event
219 if(investigate_evnum && investigate_evnum.value() == evnum) {
220 log.Error("Example multi-threaded event {} that failed cross check:", evnum);
221 InvestigateBanks(banks, log);
222 }
223 };
224
225 // process events
226 ch.process(banks, processor, 100); // NOTE: must cross check ALL events
227
228 // stop the algorithm
229 seq.Stop();
230 }
231
232 // ----------------------------------------------------------------------------------
233 // cross check
234 // ----------------------------------------------------------------------------------
235
236 // stop now, if investigating an event; return 0 since we are not cross checking
237 // in this case
238 if(investigate_evnum)
239 return 0;
240 // otherwise proceed with the cross check
241 log.Info("Cross checking...");
242
243 // xcheck the number of banks processed
244 if(st_num_banks != mt_num_banks) {
245 log.Error("readers processed different numbers of banks");
246 FailXCheckAlgorithm();
247 }
248 auto const num_banks = mt_num_banks;
249
250 // xcheck the bank names
251 for(decltype(mt_bank_names)::size_type b = 0; b < mt_bank_names.size(); b++) {
252 if(mt_bank_names.at(b) != st_bank_names.at(b)) {
253 log.Error("readers processed different bank list");
254 FailXCheckAlgorithm();
255 }
256 }
257 auto bank_names_full = mt_bank_names;
258
259 // checksum lists should have some events
260 if(st_checksums.empty() || mt_checksums.empty()) {
261 log.Error("no events were processed");
262 FailXCheckAlgorithm();
263 }
264
265 // checksum lists must have the same size, otherwise they are definitely not equal
266 if(st_checksums.size() != mt_checksums.size()) {
267 log.Error("number of processed events differ");
268 log.Error(" st_checksums.size() = {}", st_checksums.size());
269 log.Error(" mt_checksums.size() = {}", mt_checksums.size());
270 FailXCheckAlgorithm();
271 }
272
273 // sort checksum lists by event number
274 auto sort_ftn = [](xcheck_elem_t const& a, xcheck_elem_t const& b) {
275 if(a.second.empty() || b.second.empty())
276 throw std::runtime_error("empty checksum vector");
277 return a.first < b.first;
278 };
279 std::sort(st_checksums.begin(), st_checksums.end(), sort_ftn);
280 std::sort(mt_checksums.begin(), mt_checksums.end(), sort_ftn);
281
282 // check for any duplicate event numbers; they are very rare, and may come from EVIO files...
283 auto dupe_check = [&log](xcheck_t const& xcheck, std::string id) {
284 std::unordered_set<evnum_t> seen;
285 std::vector<evnum_t> dupes;
286 for(auto const& it : xcheck) {
287 auto evnum = it.first;
288 if(!seen.insert(evnum).second)
289 dupes.push_back(evnum);
290 }
291 if(dupes.size() > 0) {
292 std::sort(dupes.begin(), dupes.end());
293 log.Warn("duplicate events found in {}:", id);
294 log.Warn(" {{{}}}", fmt::join(dupes, ", "));
295 }
296 return dupes;
297 };
298 auto const st_dupes = dupe_check(st_checksums, "single-threaded reading");
299 auto const mt_dupes = dupe_check(st_checksums, "multi-threaded reading");
300 if(st_dupes != mt_dupes) {
301 log.Error("list of duplicate events differs between readers");
302 FailXCheckAlgorithm();
303 }
304 auto const& dupes = st_dupes;
305 if((double)dupes.size() / num_events > 1e-4) {
306 log.Error("there are too many duplicate events in these data");
307 FailXCheckAlgorithm();
308 }
309
310 // cross check checksum lists
311 unsigned int num_failed = 0;
312 for(xcheck_t::size_type i = 0; i < mt_checksums.size(); i++) {
313 bool fail_event = false;
314 auto const& st_ev = st_checksums.at(i);
315 auto const& mt_ev = mt_checksums.at(i);
316 auto const& st_evnum = st_ev.first;
317 auto const& mt_evnum = mt_ev.first;
318 auto const& st_checksums = st_ev.second;
319 auto const& mt_checksums = mt_ev.second;
320 // check event number
321 if(st_evnum != mt_evnum) {
322 log.Error("event number differs");
323 log.Error(" single-threaded evnum: {}", st_evnum);
324 log.Error(" multi-threaded evnum: {}", mt_evnum);
325 fail_event = true;
326 }
327 auto const& evnum = st_evnum;
328 // check expected number of banks
329 if(st_checksums.size() != num_banks) {
330 log.Error("found event from single-threaded reader with unexpected number of banks");
331 FailXCheckAlgorithm();
332 }
333 if(mt_checksums.size() != num_banks) {
334 log.Error("found event from multi-threaded reader with unexpected number of banks");
335 FailXCheckAlgorithm();
336 }
337 // skip duplicate events
338 if(std::find(dupes.begin(), dupes.end(), evnum) != dupes.end()) {
339 log.Warn("not crosschecking duplicate event {}", evnum);
340 continue;
341 }
342 // loop over banks' checksums, cross checking each
343 for(std::size_t b = 0; b < num_banks; b++) {
344 if(st_checksums.at(b) != mt_checksums.at(b)) {
345 log.Error("xcheck fail: bank {:?} differs", bank_names_full.at(b));
346 log.Error(" event number: {}", evnum);
347 log.Error(" single-threaded checksum: 0x{:x}", st_checksums.at(b));
348 log.Error(" multi-threaded checksum: 0x{:x}", mt_checksums.at(b));
349 fail_event = true;
350 }
351 }
352 // investigate the first failed event: re-run `XCheckAlgorithm` and show banks for this event
353 if(fail_event && num_failed == 0) {
354 log.Error("=========== showing all banks for this failure ===========");
355 auto r = XCheckAlgorithm(
356 algo_name,
357 prerequisite_algos,
358 bank_names,
359 data_file,
360 data_tag,
361 num_threads,
362 vary_run,
363 log_level,
364 st_evnum);
365 if(r != 0)
366 return r; // return nonzero if the investigation failed
367 log.Error("=========== proceeding with cross check of remaining events ===========");
368 }
369 // fail early if too many events fail
370 if(fail_event) {
371 num_failed++;
372 if(num_failed > 100) {
373 log.Error("too many events failed, aborting cross check now");
374 FailXCheckAlgorithm();
375 }
376 }
377 }
378 // fail if any events failed
379 if(num_failed > 0) {
380 log.Error("{} events failed cross check", num_failed);
381 FailXCheckAlgorithm();
382 }
383
384 // ----------------------------------------------------------------------------------
385 log.Info("Cross check success");
386 return 0;
387
388#else
389 throw std::runtime_error("called `xcheckalgo` but HIPO version does not support it");
390#endif
391}
fuzzing tests
Definition Fuzzing.h:8
std::optional< int > RunNumMod(int const &evnum)
Definition Fuzzing.h:32
Algorithm: An algorithm that can run a sequence of algorithms
Simple logger service.
Definition Logger.h:19
void Info(std::string_view message, const VALUES... vals) const
Printout a log message at the info level.
Definition Logger.h:94
void Error(std::string_view message, const VALUES... vals) const
Printout a log message at the error level.
Definition Logger.h:100
void SetLevel(std::string_view lev)
void Warn(std::string_view message, const VALUES... vals) const
Printout a log message at the warn level.
Definition Logger.h:97