4#include <iguana/algorithms/AlgorithmSequence.h>
5#ifdef USE_HIPO_MULTITHREADED_CHAIN
7#include <hipo4/chain.h>
8#include <hipo4/reader.h>
9#include <unordered_set>
13inline void FailXCheckAlgorithm() noexcept(false)
15 throw std::runtime_error(
"Algorithm cross check test (xcheckalgo) failed");
18inline void InvestigateBanks(hipo::banklist& banks,
iguana::Logger& log)
20#ifdef USE_HIPO_MULTITHREADED_CHAIN
21 log.
Error(
"Num banks: {}", banks.size());
23 int saved_stdout = dup(STDOUT_FILENO);
24 dup2(STDERR_FILENO, STDOUT_FILENO);
26 for(
auto& bank : banks) {
27 printf(
"Investigate bank '%s':\n", bank.getSchema().getName().c_str());
28 printf(
" - checksum 0x%zx\n", bank.checksum());
29 printf(
" - num rows filtered (total): %zu (%d)\n", bank.getRowList().size(), bank.getRows());
33 dup2(saved_stdout, STDOUT_FILENO);
35 log.
Error(
"-----------------------------------------------------------------");
41inline int XCheckAlgorithm(
42 std::string
const algo_name,
43 std::vector<std::string>
const prerequisite_algos,
44 std::vector<std::string>
const bank_names,
45 std::string
const data_file,
47 int const num_threads,
49 std::string
const log_level,
50 std::optional<int> investigate_evnum = std::nullopt)
53#ifdef USE_HIPO_MULTITHREADED_CHAIN
56 iguana::GlobalConcurrencyModel =
"memoize";
63 if(algo_name.empty() || bank_names.empty()) {
64 log.
Error(
"need algorithm name and banks");
67 if(data_file.empty()) {
68 log.
Error(
"need a data file for command 'xcheckalgo");
73 hipo::banklist::size_type
const run_config_bank_idx = 0;
74 std::vector<std::string> bank_names_ordered;
75 bank_names_ordered.push_back(
"RUN::config");
76 for(
auto const& name : bank_names) {
77 if(name !=
"RUN::config")
78 bank_names_ordered.push_back(name);
97 using checksum_list_t = std::vector<std::size_t>;
98 using xcheck_elem_t = std::pair<evnum_t, checksum_list_t>;
99 using xcheck_t = std::vector<xcheck_elem_t>;
106 xcheck_t st_checksums;
107 std::size_t st_num_banks;
108 std::vector<std::string> st_bank_names;
111 log.
Info(
"Run single-threaded reader...");
114 hipo::reader r(data_file.c_str(), {data_tag});
115 auto banks = r.getBanks(bank_names_ordered);
116 num_events = r.getEntries();
120 for(
auto const& prerequisite_algo : prerequisite_algos)
121 seq.Add(prerequisite_algo);
123 seq.SetLogLevel(algo_name, log_level);
124 seq.iguana::Algorithm::SetLogLevel(log_level);
130 st_num_banks = banks.size();
131 for(
auto& bank : banks)
132 st_bank_names.push_back(bank.getSchema().getName());
135 while(r.next(banks)) {
137 auto& run_config_bank = banks[run_config_bank_idx];
138 auto const evnum = run_config_bank.getInt(
"event", 0);
142 if(random_run.has_value())
143 run_config_bank.putInt(
"run", 0, random_run.value());
148 checksum_list_t checksums;
149 for(
auto const& bank : banks)
150 checksums.push_back(bank.checksum());
151 st_checksums.push_back({evnum, checksums});
153 if(investigate_evnum && investigate_evnum.value() == evnum) {
154 log.
Error(
"Example single-threaded event {} that failed cross check:", evnum);
155 InvestigateBanks(banks, log);
169 xcheck_t mt_checksums;
170 std::size_t mt_num_banks;
171 std::vector<std::string> mt_bank_names;
174 log.
Info(
"Run multi-threaded reader...");
178 hipo::chain ch(num_threads,
false,
false);
179 ch.set_tags({data_tag});
181 auto banks = ch.getBanks(bank_names_ordered);
185 for(
auto const& prerequisite_algo : prerequisite_algos)
186 seq.Add(prerequisite_algo);
188 seq.SetLogLevel(algo_name, log_level);
189 seq.iguana::Algorithm::SetLogLevel(log_level);
195 mt_num_banks = banks.size();
196 for(
auto& bank : banks)
197 mt_bank_names.push_back(bank.getSchema().getName());
200 auto processor = [&](
auto& banks,
int file_idx,
long event_idx) {
202 auto& run_config_bank = banks[run_config_bank_idx];
203 auto const evnum = run_config_bank.getInt(
"event", 0);
207 if(random_run.has_value())
208 run_config_bank.putInt(
"run", 0, random_run.value());
213 checksum_list_t checksums;
214 for(
auto const& bank : banks)
215 checksums.push_back(bank.checksum());
216 std::lock_guard<std::mutex>
const lock(mtx);
217 mt_checksums.push_back({evnum, checksums});
219 if(investigate_evnum && investigate_evnum.value() == evnum) {
220 log.
Error(
"Example multi-threaded event {} that failed cross check:", evnum);
221 InvestigateBanks(banks, log);
226 ch.process(banks, processor, 100);
238 if(investigate_evnum)
241 log.
Info(
"Cross checking...");
244 if(st_num_banks != mt_num_banks) {
245 log.
Error(
"readers processed different numbers of banks");
246 FailXCheckAlgorithm();
248 auto const num_banks = mt_num_banks;
251 for(
decltype(mt_bank_names)::size_type b = 0; b < mt_bank_names.size(); b++) {
252 if(mt_bank_names.at(b) != st_bank_names.at(b)) {
253 log.
Error(
"readers processed different bank list");
254 FailXCheckAlgorithm();
257 auto bank_names_full = mt_bank_names;
260 if(st_checksums.empty() || mt_checksums.empty()) {
261 log.
Error(
"no events were processed");
262 FailXCheckAlgorithm();
266 if(st_checksums.size() != mt_checksums.size()) {
267 log.
Error(
"number of processed events differ");
268 log.
Error(
" st_checksums.size() = {}", st_checksums.size());
269 log.
Error(
" mt_checksums.size() = {}", mt_checksums.size());
270 FailXCheckAlgorithm();
274 auto sort_ftn = [](xcheck_elem_t
const& a, xcheck_elem_t
const& b) {
275 if(a.second.empty() || b.second.empty())
276 throw std::runtime_error(
"empty checksum vector");
277 return a.first < b.first;
279 std::sort(st_checksums.begin(), st_checksums.end(), sort_ftn);
280 std::sort(mt_checksums.begin(), mt_checksums.end(), sort_ftn);
283 auto dupe_check = [&log](xcheck_t
const& xcheck, std::string id) {
284 std::unordered_set<evnum_t> seen;
285 std::vector<evnum_t> dupes;
286 for(
auto const& it : xcheck) {
287 auto evnum = it.first;
288 if(!seen.insert(evnum).second)
289 dupes.push_back(evnum);
291 if(dupes.size() > 0) {
292 std::sort(dupes.begin(), dupes.end());
293 log.
Warn(
"duplicate events found in {}:",
id);
294 log.
Warn(
" {{{}}}", fmt::join(dupes,
", "));
298 auto const st_dupes = dupe_check(st_checksums,
"single-threaded reading");
299 auto const mt_dupes = dupe_check(st_checksums,
"multi-threaded reading");
300 if(st_dupes != mt_dupes) {
301 log.
Error(
"list of duplicate events differs between readers");
302 FailXCheckAlgorithm();
304 auto const& dupes = st_dupes;
305 if((
double)dupes.size() / num_events > 1e-4) {
306 log.
Error(
"there are too many duplicate events in these data");
307 FailXCheckAlgorithm();
311 unsigned int num_failed = 0;
312 for(xcheck_t::size_type i = 0; i < mt_checksums.size(); i++) {
313 bool fail_event =
false;
314 auto const& st_ev = st_checksums.at(i);
315 auto const& mt_ev = mt_checksums.at(i);
316 auto const& st_evnum = st_ev.first;
317 auto const& mt_evnum = mt_ev.first;
318 auto const& st_checksums = st_ev.second;
319 auto const& mt_checksums = mt_ev.second;
321 if(st_evnum != mt_evnum) {
322 log.
Error(
"event number differs");
323 log.
Error(
" single-threaded evnum: {}", st_evnum);
324 log.
Error(
" multi-threaded evnum: {}", mt_evnum);
327 auto const& evnum = st_evnum;
329 if(st_checksums.size() != num_banks) {
330 log.
Error(
"found event from single-threaded reader with unexpected number of banks");
331 FailXCheckAlgorithm();
333 if(mt_checksums.size() != num_banks) {
334 log.
Error(
"found event from multi-threaded reader with unexpected number of banks");
335 FailXCheckAlgorithm();
338 if(std::find(dupes.begin(), dupes.end(), evnum) != dupes.end()) {
339 log.
Warn(
"not crosschecking duplicate event {}", evnum);
343 for(std::size_t b = 0; b < num_banks; b++) {
344 if(st_checksums.at(b) != mt_checksums.at(b)) {
345 log.
Error(
"xcheck fail: bank {:?} differs", bank_names_full.at(b));
346 log.
Error(
" event number: {}", evnum);
347 log.
Error(
" single-threaded checksum: 0x{:x}", st_checksums.at(b));
348 log.
Error(
" multi-threaded checksum: 0x{:x}", mt_checksums.at(b));
353 if(fail_event && num_failed == 0) {
354 log.
Error(
"=========== showing all banks for this failure ===========");
355 auto r = XCheckAlgorithm(
367 log.
Error(
"=========== proceeding with cross check of remaining events ===========");
372 if(num_failed > 100) {
373 log.
Error(
"too many events failed, aborting cross check now");
374 FailXCheckAlgorithm();
380 log.
Error(
"{} events failed cross check", num_failed);
381 FailXCheckAlgorithm();
385 log.
Info(
"Cross check success");
389 throw std::runtime_error(
"called `xcheckalgo` but HIPO version does not support it");
std::optional< int > RunNumMod(int const &evnum)
Algorithm: An algorithm that can run a sequence of algorithms
void Info(std::string_view message, const VALUES... vals) const
Printout a log message at the info level.
void Error(std::string_view message, const VALUES... vals) const
Printout a log message at the error level.
void SetLevel(std::string_view lev)
void Warn(std::string_view message, const VALUES... vals) const
Printout a log message at the warn level.