47#include <fmt/format.h>
86 return fmt::format(
"{:.1f} GB",
file_size / (1024.0 * 1024 * 1024));
88 return fmt::format(
"{:.1f} MB",
file_size / (1024.0 * 1024));
90 return fmt::format(
"{:.1f} KB",
file_size / 1024.0);
116 start_time = std::chrono::steady_clock::now();
120 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
122 return elapsed.count() / 1000.0;
148 if (!m_event || !m_dict) {
149 throw std::runtime_error(
"Invalid chain_event (no event data)");
162 throw std::runtime_error(
"Invalid chain_event (no event data)");
164 for (
auto& b : list) {
173 event*
raw() {
return m_event; }
174 const event*
raw()
const {
return m_event; }
177 explicit operator bool()
const {
return m_event !=
nullptr && m_dict !=
nullptr; }
217 return {
chain_event(&m_current_event, m_current_dict.get()),
218 m_current_file_idx, m_current_event_idx};
222 if (m_exhausted && other.m_exhausted)
return true;
223 if (m_exhausted != other.m_exhausted)
return false;
224 return m_current_file_idx == other.m_current_file_idx &&
225 m_current_event_idx == other.m_current_event_idx;
229 return !(*
this == other);
234 std::unique_ptr<reader> m_current_reader;
235 std::unique_ptr<dictionary> m_current_dict;
236 event m_current_event;
237 int m_current_file_idx{0};
238 long m_current_event_idx{0};
239 bool m_exhausted{
false};
241 bool advance_to_next_event();
242 bool open_next_file();
289 explicit chain(
int threads = 0,
bool progress =
true,
bool verbose =
false)
290 : m_num_threads(threads == 0 ? static_cast<int>(std::thread::hardware_concurrency()) : threads),
291 m_show_progress(progress),
293 m_thread_pool(m_num_threads,
"ChainWorker") {
295 fmt::print(
"[chain] Initialized with {} threads\n", m_num_threads);
308 int add(std::string_view filename) {
309 if (!std::filesystem::exists(filename)) {
310 fmt::print(stderr,
"[chain] Warning: File not found: {}\n", filename);
311 return static_cast<int>(m_files.size());
313 std::lock_guard lock(m_mutex);
314 m_files.emplace_back(filename,
static_cast<int>(m_files.size()));
315 m_metadata_loaded =
false;
317 fmt::print(
"[chain] Added file {}: {}\n", m_files.size(), filename);
319 return static_cast<int>(m_files.size());
327 int add(std::vector<std::string>& filenames) {
328 for (
const auto& f : filenames)
add(f);
329 return static_cast<int>(m_files.size());
339 std::filesystem::path pattern_path(pattern);
340 std::filesystem::path dir = pattern_path.parent_path();
341 if (dir.empty()) dir =
".";
343 std::string filename_pattern = pattern_path.filename().string();
344 std::string regex_pattern = std::regex_replace(filename_pattern,
345 std::regex(R
"(\*)"), ".*");
346 regex_pattern = std::regex_replace(regex_pattern,
347 std::regex(R
"(\?)"), ".");
350 std::regex file_regex(regex_pattern);
351 std::vector<std::string> matched_files;
353 for (
const auto& entry : std::filesystem::directory_iterator(dir)) {
354 if (entry.is_regular_file()) {
355 std::string fname = entry.path().filename().string();
356 if (std::regex_match(fname, file_regex)) {
357 matched_files.push_back(entry.path().string());
363 std::sort(matched_files.begin(), matched_files.end());
364 for (
const auto& f : matched_files) {
368 }
catch (
const std::exception& e) {
369 fmt::print(stderr,
"[chain] Error scanning directory: {}\n", e.what());
373 fmt::print(
"[chain] Pattern '{}' matched {} files\n", pattern, added);
382 std::lock_guard lock(m_mutex);
385 m_metadata_loaded =
false;
386 if (m_verbose) fmt::print(
"[chain] Cleared all files\n");
394 [[nodiscard]] std::size_t
size() const noexcept {
return m_files.size(); }
397 [[nodiscard]]
bool empty() const noexcept {
return m_files.empty(); }
401 return static_cast<int>(m_files.size());
406 if (index >= m_files.size()) {
407 throw std::out_of_range(
"chain: File index out of range");
409 return m_files[index];
414 return (*
this)[
static_cast<std::size_t
>(index)];
418 [[nodiscard]] std::vector<FileInfo>&
files() noexcept {
427 void set_tags(
const std::vector<long>& tags) { m_tags = tags; }
431 m_num_threads = (n == 0) ?
static_cast<int>(std::thread::hardware_concurrency()) : n;
449 void open(
bool validate_all =
true) {
450 if (m_files.empty()) {
451 throw std::runtime_error(
"chain: No files added to chain");
455 fmt::print(
"[chain] Validating {} files...\n", m_files.size());
461 for (
auto& file : m_files) {
462 if (!std::filesystem::exists(file.filename)) {
463 file.is_valid =
false;
464 file.error_message =
"File not found";
468 file.file_size = std::filesystem::file_size(file.filename);
474 temp_reader.
open(file.filename.c_str());
482 fmt::print(
" [{}] {} - OK ({} events)\n",
483 file.index, file.filename, file.total_events);
485 }
catch (
const std::exception& e) {
486 file.is_valid =
false;
487 file.error_message = e.what();
495 m_metadata_loaded = validate_all;
498 fmt::print(
"[chain] Valid files: {}/{}, Total events: {}\n",
502 if (valid_count == 0) {
503 throw std::runtime_error(
"chain: No valid files in chain");
509 if (m_files.empty()) {
510 fmt::print(
"[chain] No files in chain\n");
514 std::unique_ptr<ProgressTracker> progress;
515 if (m_show_progress) {
517 config.
label =
"Scanning files";
519 progress = std::make_unique<ProgressTracker>(m_files.size(), config);
524 int total_records = 0;
525 std::uintmax_t total_size = 0;
528 for (
auto& file : m_files) {
530 if (!std::filesystem::exists(file.filename)) {
532 fmt::print(
" [{}] {} - FILE NOT FOUND\n", file.index, file.filename);
536 file.file_size = std::filesystem::file_size(file.filename);
537 total_size += file.file_size;
541 temp_reader.
open(file.filename.c_str());
545 total_records += file.num_records;
551 fmt::print(
" [{}] {}\n Records: {}, Events: {}, Size: {}\n",
552 file.index, file.filename, file.num_records,
553 file.total_events, file.size_string());
556 }
catch (
const std::exception& e) {
558 fmt::print(
" [{}] {} - ERROR: {}\n", file.index, file.filename, e.what());
561 if (progress) progress->increment();
564 if (progress) progress->finish();
566 fmt::print(
"\nScan Summary\n");
567 fmt::print(
" Total files: {}\n", m_files.size());
568 fmt::print(
" Valid files: {}\n", valid_files);
569 fmt::print(
" Total records: {}\n", total_records);
574 fmt::print(
" Total size: {}\n", dummy.
size_string());
576 if (valid_files > 0) {
577 fmt::print(
" Avg events/file: {}\n",
total_events / valid_files);
583 fmt::print(
"Chain file list ({} files)\n", m_files.size());
584 for (
const auto& file : m_files) {
585 fmt::print(
" [{}] {}", file.index, file.filename);
586 if (file.metadata_loaded()) {
587 fmt::print(
" ({} events, {} records)", file.total_events, file.num_records);
589 if (!file.is_valid) {
590 fmt::print(
" [INVALID: {}]", file.error_message);
607 if (m_files.empty()) {
608 throw std::runtime_error(
"[chain] No files added, cannot create banklist");
611 temp_reader.
open(m_files[0].filename.c_str());
658 template<
typename ProcessFunc>
659 void process(ProcessFunc&& process_func,
double percentage = 100.0) {
660 load_metadata_if_needed();
661 long target =
static_cast<long>(
662 m_stats.
total_events * std::clamp(percentage, 0.0, 100.0) / 100.0);
663 process_impl(std::forward<ProcessFunc>(process_func), target,
664 fmt::format(
"Processing ({:.1f}%)", std::clamp(percentage, 0.0, 100.0)));
681 template<
typename ProcessFunc>
682 void process(ProcessFunc&& process_func,
long num_events) {
683 load_metadata_if_needed();
684 long target = std::min(num_events,
static_cast<long>(m_stats.
total_events.load()));
685 process_impl(std::forward<ProcessFunc>(process_func), target,
686 fmt::format(
"Processing ({} events)", target));
709 template<
typename ProcessFunc>
710 void process(
const banklist& banks, ProcessFunc&& process_func,
double percentage = 100.0) {
711 if (m_files.empty()) {
712 fmt::print(stderr,
"[chain] Warning: No files to process.\n");
716 percentage = std::clamp(percentage, 0.0, 100.0) / 100.0;
719 load_metadata_if_needed();
721 long target_events =
static_cast<long>(m_stats.
total_events * percentage);
729 std::vector<WorkItem> work_list;
730 long events_accumulated = 0;
732 for (
const auto& file : m_files) {
733 if (!file.is_valid || events_accumulated >= target_events)
continue;
734 long events_needed = target_events - events_accumulated;
735 long events_from_file = std::min(events_needed, file.total_events);
736 if (events_from_file > 0) {
737 work_list.push_back({file.index, file.num_records, events_from_file});
738 events_accumulated += events_from_file;
746 fmt::print(
"[chain] Processing {} events from {} files ({} threads, record-level parallelism, banklist)\n",
747 events_accumulated, work_list.size(), m_num_threads);
750 std::unique_ptr<ProgressTracker> progress;
753 config.
label = fmt::format(
"Processing ({:.1f}%)", percentage * 100.0);
756 progress = std::make_unique<ProgressTracker>(m_stats.
total_events, config);
761 for (
const auto& work : work_list) {
762 const auto& file = m_files[work.file_idx];
763 std::atomic<int> next_record{0};
764 std::atomic<long> events_processed_in_file{0};
765 std::vector<std::future<void>> futures;
767 for (
int t = 0; t < m_num_threads; t++) {
768 futures.push_back(m_thread_pool.
submit([&,
this]() {
772 file_reader.setTags(m_tags);
773 file_reader.open(file.filename.c_str());
777 banklist thread_banks = banks;
780 int rec_idx = next_record.fetch_add(1);
781 if (rec_idx >= work.num_records) break;
783 if (events_processed_in_file.load() >= work.max_events) break;
785 if (!file_reader.loadRecord(rec, rec_idx)) continue;
787 int events_in_record = rec.getEventCount();
789 for (int evt_in_rec = 0; evt_in_rec < events_in_record; evt_in_rec++) {
790 long current_count = events_processed_in_file.fetch_add(1);
791 if (current_count >= work.max_events) {
792 events_processed_in_file.fetch_sub(1);
796 rec.readHipoEvent(temp_event, evt_in_rec);
797 for (auto& b : thread_banks) {
800 process_func(thread_banks, file.index, current_count);
801 m_stats.events_processed++;
802 if (progress) progress->increment();
805 } catch (
const std::exception& e) {
807 fmt::print(stderr,
"[chain] Thread error: {}\n", e.what());
812 for (
auto& f : futures) f.get();
815 m_stats.end_time = std::chrono::steady_clock::now();
816 if (progress) progress->finish();
830 template<
typename ProcessFunc>
832 const std::vector<std::string>& required_banks,
833 double percentage = 100.0) {
834 load_metadata_if_needed();
835 long target =
static_cast<long>(
836 m_stats.total_events * std::clamp(percentage, 0.0, 100.0) / 100.0);
837 process_filtered_impl(std::forward<ProcessFunc>(process_func),
838 required_banks, target);
851 template<
typename ProcessFunc>
853 const std::vector<std::string>& required_banks,
855 load_metadata_if_needed();
856 process_filtered_impl(std::forward<ProcessFunc>(process_func),
857 required_banks, num_events);
865 template<
typename FileFunc>
867 for (
const auto& file : m_files) {
868 if (!file.is_valid)
continue;
872 file_reader.
open(file.filename.c_str());
873 func(file_reader, file);
874 }
catch (
const std::exception& e) {
875 fmt::print(stderr,
"[chain] Error processing {}: {}\n",
876 file.filename, e.what());
890 load_metadata_if_needed();
891 return m_stats.total_events;
899 fmt::print(
"\nChain Statistics\n");
900 fmt::print(
" Files: {}\n", m_files.size());
901 fmt::print(
" Total events: {}\n", m_stats.total_events.load());
902 fmt::print(
" Events processed: {}\n", m_stats.events_processed.load());
903 fmt::print(
" Processing time: {:.2f} seconds\n", m_stats.elapsed_seconds());
904 if (m_stats.elapsed_seconds() > 0) {
905 fmt::print(
" Throughput: {:.1f} events/sec\n", m_stats.throughput());
911 fmt::print(
"\nChain File Info ({} files)\n", m_files.size());
912 fmt::print(
"================\n");
913 for (
const auto& file : m_files) {
914 fmt::print(
"\nFile [{}]: {}\n", file.index, file.filename);
918 temp_reader.
open(file.filename.c_str());
920 }
catch (
const std::exception& e) {
921 fmt::print(
" Error: {}\n", e.what());
932 for (
const auto& file : m_files) {
933 if (!file.is_valid)
continue;
937 temp_reader.
open(file.filename.c_str());
939 std::map<std::string, std::string> config;
941 if (config.find(std::string(name)) != config.end()) {
950 [[nodiscard]] std::optional<std::string>
get_config(std::string_view name) {
951 for (
const auto& file : m_files) {
952 if (!file.is_valid)
continue;
956 temp_reader.
open(file.filename.c_str());
958 std::map<std::string, std::string> config;
960 auto it = config.find(std::string(name));
961 if (it != config.end()) {
981 std::vector<FileInfo> m_files;
982 std::vector<long> m_tags;
984 bool m_show_progress;
986 bool m_metadata_loaded{
false};
988 ChainStatistics m_stats;
989 mutable std::mutex m_mutex;
996 void load_metadata_if_needed() {
997 if (m_metadata_loaded)
return;
999 if (m_verbose) fmt::print(
"[chain] Loading file metadata...\n");
1002 for (
auto& file : m_files) {
1003 if (!file.is_valid)
continue;
1004 if (file.total_events < 0) {
1007 temp_reader.setTags(m_tags);
1008 temp_reader.open(file.filename.c_str());
1010 file.total_events = temp_reader.getEntries();
1011 file.num_records = temp_reader.getNRecords();
1012 }
catch (
const std::exception& e) {
1013 file.is_valid =
false;
1014 file.error_message = e.what();
1015 file.total_events = 0;
1018 total += file.total_events;
1020 m_stats.total_events = total;
1021 m_metadata_loaded =
true;
1025 template<
typename ProcessFunc>
1026 void process_impl(ProcessFunc&& process_func,
long target_events,
1027 const std::string& label) {
1028 if (m_files.empty()) {
1029 fmt::print(stderr,
"[chain] Warning: No files to process.\n");
1039 std::vector<WorkItem> work_list;
1040 long events_accumulated = 0;
1042 for (
const auto& file : m_files) {
1043 if (!file.is_valid || events_accumulated >= target_events)
continue;
1044 long events_needed = target_events - events_accumulated;
1045 long events_from_file = std::min(events_needed, file.total_events);
1046 if (events_from_file > 0) {
1047 work_list.push_back({file.index, file.num_records, events_from_file});
1048 events_accumulated += events_from_file;
1053 m_stats.total_events = events_accumulated;
1056 fmt::print(
"[chain] Processing {} events from {} files ({} threads, record-level parallelism)\n",
1057 events_accumulated, work_list.size(), m_num_threads);
1060 std::unique_ptr<ProgressTracker> progress;
1061 if (m_show_progress && m_stats.total_events > 0) {
1063 config.
label = label;
1066 progress = std::make_unique<ProgressTracker>(m_stats.total_events, config);
1070 for (
const auto& work : work_list) {
1071 const auto& file = m_files[work.file_idx];
1072 std::atomic<int> next_record{0};
1073 std::atomic<long> events_processed_in_file{0};
1074 std::vector<std::future<void>> futures;
1076 for (
int t = 0; t < m_num_threads; t++) {
1077 futures.push_back(m_thread_pool.
submit([&,
this]() {
1080 file_reader.setTags(m_tags);
1081 file_reader.open(file.filename.c_str());
1084 file_reader.readDictionary(dict);
1090 int rec_idx = next_record.fetch_add(1);
1091 if (rec_idx >= work.num_records) break;
1092 if (events_processed_in_file.load() >= work.max_events) break;
1094 if (!file_reader.loadRecord(rec, rec_idx)) continue;
1096 int events_in_record = rec.getEventCount();
1098 for (int evt_in_rec = 0; evt_in_rec < events_in_record; evt_in_rec++) {
1099 long current_count = events_processed_in_file.fetch_add(1);
1100 if (current_count >= work.max_events) {
1101 events_processed_in_file.fetch_sub(1);
1105 rec.readHipoEvent(temp_event, evt_in_rec);
1106 chain_event evt(&temp_event, &dict);
1107 process_func(evt, file.index, current_count);
1108 m_stats.events_processed++;
1109 if (progress) progress->increment();
1112 } catch (
const std::exception& e) {
1114 fmt::print(stderr,
"[chain] Thread error: {}\n", e.what());
1119 for (
auto& f : futures) f.get();
1122 m_stats.end_time = std::chrono::steady_clock::now();
1123 if (progress) progress->finish();
1124 if (m_verbose) print_statistics();
1128 template<
typename ProcessFunc>
1129 void process_filtered_impl(ProcessFunc&& process_func,
1130 const std::vector<std::string>& required_banks,
1131 long target_events) {
1132 if (m_files.empty()) {
1133 fmt::print(stderr,
"[chain] Warning: No files to process.\n");
1138 m_stats.total_events = target_events;
1141 fmt::print(
"[chain] Filtered processing: requiring banks [");
1142 for (
size_t i = 0; i < required_banks.size(); i++) {
1143 fmt::print(
"{}{}", required_banks[i],
1144 i < required_banks.size() - 1 ?
", " :
"");
1146 fmt::print(
"] (record-level parallelism)\n");
1149 std::unique_ptr<ProgressTracker> progress;
1150 if (m_show_progress && target_events > 0) {
1152 config.
label =
"Filtered processing";
1154 progress = std::make_unique<ProgressTracker>(target_events, config);
1158 std::atomic<long> events_found{0};
1160 for (
const auto& file : m_files) {
1161 if (!file.is_valid || events_found >= target_events)
continue;
1163 std::atomic<int> next_record{0};
1164 std::vector<std::future<void>> futures;
1166 for (
int t = 0; t < m_num_threads && events_found < target_events; t++) {
1167 futures.push_back(m_thread_pool.
submit([&,
this]() {
1170 file_reader.setTags(m_tags);
1171 file_reader.open(file.filename.c_str());
1174 file_reader.readDictionary(dict);
1176 std::vector<bank> banks;
1177 banks.reserve(required_banks.size());
1178 for (const auto& name : required_banks) {
1179 banks.emplace_back(dict.getSchema(name.c_str()));
1185 while (events_found < target_events) {
1186 int rec_idx = next_record.fetch_add(1);
1187 if (rec_idx >= file.num_records) break;
1189 if (!file_reader.loadRecord(rec, rec_idx)) continue;
1191 int events_in_record = rec.getEventCount();
1193 for (int evt_in_rec = 0; evt_in_rec < events_in_record; evt_in_rec++) {
1194 if (events_found >= target_events) break;
1196 rec.readHipoEvent(temp_event, evt_in_rec);
1198 bool has_all_banks = true;
1199 for (auto& b : banks) {
1201 if (b.getRows() == 0) {
1202 has_all_banks = false;
1207 if (has_all_banks) {
1208 long evt_count = events_found.fetch_add(1);
1209 if (evt_count >= target_events) break;
1211 chain_event evt(&temp_event, &dict);
1212 process_func(evt, file.index, evt_count);
1213 m_stats.events_processed++;
1214 if (progress) progress->increment();
1218 } catch (
const std::exception& e) {
1220 fmt::print(stderr,
"[chain] Thread error: {}\n", e.what());
1224 for (
auto& f : futures) f.get();
1227 m_stats.end_time = std::chrono::steady_clock::now();
1228 if (progress) progress->finish();
1230 fmt::print(
"[chain] Found {} matching events\n", events_found.load());
1243inline ChainIterator::ChainIterator(
chain* ch,
bool at_end)
1244 : m_chain(ch), m_exhausted(at_end) {
1245 if (!at_end && ch && !ch->m_files.empty()) {
1246 if (open_next_file()) {
1247 advance_to_next_event();
1256 m_current_event_idx++;
1257 if (!advance_to_next_event()) {
1264inline bool ChainIterator::advance_to_next_event() {
1265 while (m_current_reader) {
1266 if (m_current_reader->next(m_current_event)) {
1270 m_current_file_idx++;
1271 m_current_event_idx = 0;
1272 if (!open_next_file()) {
1279inline bool ChainIterator::open_next_file() {
1280 while (m_current_file_idx <
static_cast<int>(m_chain->m_files.size())) {
1281 const auto& file = m_chain->m_files[m_current_file_idx];
1282 if (!file.is_valid) {
1283 m_current_file_idx++;
1287 m_current_reader = std::make_unique<reader>();
1288 m_current_reader->setTags(m_chain->m_tags);
1289 m_current_reader->open(file.filename.c_str());
1291 m_current_dict = std::make_unique<dictionary>();
1292 m_current_reader->readDictionary(*m_current_dict);
1294 }
catch (
const std::exception& e) {
1295 fmt::print(stderr,
"[chain] Error opening {}: {}\n",
1296 file.filename, e.what());
1297 m_current_file_idx++;
1300 m_current_reader.reset();
1301 m_current_dict.reset();
A ThreadPool implementation.
Definition threadpool.hpp:22
auto submit(Callable &&callable, Args &&... args) -> std::future< std::invoke_result_t< std::decay_t< Callable >, std::decay_t< Args >... > >
Submits a task for asynchronous execution.
Definition threadpool.hpp:128
Iterator for traversing events across all files in a chain.
Definition chain.h:195
std::input_iterator_tag iterator_category
Definition chain.h:203
void operator++(int)
Definition chain.h:214
ChainIterator()
Definition chain.h:209
std::ptrdiff_t difference_type
Definition chain.h:205
EventData operator*()
Definition chain.h:216
bool operator==(const ChainIterator &other) const noexcept
Definition chain.h:221
bool operator!=(const ChainIterator &other) const noexcept
Definition chain.h:228
ChainIterator & operator++()
Definition chain.h:1254
Event wrapper that provides bank access via dictionary.
Definition chain.h:141
chain_event()
Definition chain.h:143
void read_banks(banklist &list)
Alias for readBanks (snake_case)
Definition chain.h:170
event * raw()
Access underlying event.
Definition chain.h:173
const event * raw() const
Definition chain.h:174
bank getBank(const std::string &name)
Get a bank by name, reading it from the event.
Definition chain.h:147
bank get_bank(const std::string &name)
Alias for getBank (snake_case)
Definition chain.h:157
chain_event(event *ev, dictionary *dict)
Definition chain.h:144
void readBanks(banklist &list)
Read event data into an existing banklist.
Definition chain.h:160
Chain multiple HIPO files for unified processing.
Definition chain.h:274
int add(std::vector< std::string > &filenames)
Add multiple files to the chain.
Definition chain.h:327
iterator end()
Get end iterator.
Definition chain.h:635
void clear()
Remove all files from the chain.
Definition chain.h:381
void process_filtered(ProcessFunc &&process_func, const std::vector< std::string > &required_banks, double percentage=100.0)
Process events with bank filtering (record-level parallelism)
Definition chain.h:831
int get_nb_files() const noexcept
Legacy alias for size()
Definition chain.h:400
const ChainStatistics & statistics() const noexcept
Get processing statistics.
Definition chain.h:886
std::vector< FileInfo > & files() noexcept
Get all file infos.
Definition chain.h:418
bool empty() const noexcept
Check if chain is empty.
Definition chain.h:397
banklist getBanks(const std::vector< std::string > &names)
Create a banklist from bank names using the first file's dictionary.
Definition chain.h:606
void list() const
Print list of files in chain.
Definition chain.h:582
int add(std::string_view filename)
Add a single file to the chain.
Definition chain.h:308
std::optional< std::string > get_config(std::string_view name)
Get configuration from first file that has it.
Definition chain.h:950
void show_all_info()
Show detailed info for all files.
Definition chain.h:910
bool any_has_config(std::string_view name)
Check if any file has a configuration key.
Definition chain.h:931
void set_progress(bool show)
Enable/disable progress display.
Definition chain.h:435
const FileInfo & file_info(int index) const
Legacy alias for operator[].
Definition chain.h:413
void set_verbose(bool verbose)
Enable/disable verbose output.
Definition chain.h:438
void for_each_file(FileFunc &&func)
Apply a function to each file (for file-level operations)
Definition chain.h:866
void print_statistics() const
Print processing statistics.
Definition chain.h:898
long total_events()
Get total event count (loads metadata if needed)
Definition chain.h:889
void process(ProcessFunc &&process_func, double percentage=100.0)
Process events in parallel across all files (record-level parallelism)
Definition chain.h:659
void set_tags(const std::vector< long > &tags)
Set event tags for filtering.
Definition chain.h:427
void process(const banklist &banks, ProcessFunc &&process_func, double percentage=100.0)
Process events in parallel using a banklist (record-level parallelism)
Definition chain.h:710
void process_filtered(ProcessFunc &&process_func, const std::vector< std::string > &required_banks, long num_events)
Process filtered events with an absolute event count.
Definition chain.h:852
void scan()
Scan and display detailed information about all files.
Definition chain.h:508
int add_pattern(std::string_view pattern)
Add files matching a glob pattern.
Definition chain.h:337
banklist get_banks(const std::vector< std::string > &names)
Alias for getBanks (snake_case)
Definition chain.h:616
void open(bool validate_all=true)
Validate and optionally load metadata for all files.
Definition chain.h:449
const FileInfo & operator[](std::size_t index) const
Get file info by index.
Definition chain.h:405
void set_threads(int n)
Set number of processing threads.
Definition chain.h:430
ThreadPool & threadpool() noexcept
Get the thread pool for advanced use.
Definition chain.h:974
iterator begin()
Get iterator to first event.
Definition chain.h:632
chain(int threads=0, bool progress=true, bool verbose=false)
Construct a chain with specified thread count.
Definition chain.h:289
friend class ChainIterator
Definition chain.h:1236
void process(ProcessFunc &&process_func, long num_events)
Process a specific number of events in parallel across all files.
Definition chain.h:682
std::size_t size() const noexcept
Number of files in the chain.
Definition chain.h:394
long total_events_count() const
Legacy alias.
Definition chain.h:895
Collection of schema definitions, typically read from a HIPO file header.
Definition dictionary.h:157
schema & getSchema(const char *name)
Definition dictionary.h:167
void read(hipo::bank &b)
Definition event.cpp:63
std::vector< hipo::bank > getBanks(std::vector< std::string > names)
Definition reader.cpp:279
void setTags(int tag)
Definition reader.h:248
int getEntries()
Definition reader.h:268
int getNRecords() const
Definition reader.h:264
void open(const char *filename)
Open file, if file stream is open, it is closed first.
Definition reader.cpp:78
void about()
Definition reader.cpp:68
void readUserConfig(std::map< std::string, std::string > &mapConfig)
Definition reader.cpp:289
std::recursive_mutex & get_stdout_mutex()
Returns the global recursive mutex used to synchronize stdout access.
Definition progresstracker.hpp:19
HIPO namespace is used for the classes that read/write files and records.
Definition bank.cpp:45
std::vector< bank > banklist
Definition bank.h:678
Definition progresstracker.hpp:30
std::string label
Definition progresstracker.hpp:36
bool show_rate
Definition progresstracker.hpp:33
bool show_eta
Definition progresstracker.hpp:32
chain_event event
Definition chain.h:198
int file_index
Definition chain.h:199
long event_index
Definition chain.h:200
Thread-safe statistics for chain processing.
Definition chain.h:105
std::chrono::steady_clock::time_point end_time
Definition chain.h:110
double elapsed_seconds() const noexcept
Definition chain.h:119
std::atomic< long > total_events
Definition chain.h:106
std::atomic< long > events_processed
Definition chain.h:107
std::chrono::steady_clock::time_point start_time
Definition chain.h:109
std::atomic< long > events_skipped
Definition chain.h:108
double throughput() const noexcept
Definition chain.h:125
void reset() noexcept
Definition chain.h:112
Metadata container for a file in the chain.
Definition chain.h:65
std::string filename
Definition chain.h:66
bool is_valid
Definition chain.h:71
int num_records
Definition chain.h:69
std::string size_string() const
Get human-readable file size string.
Definition chain.h:84
int index
Definition chain.h:67
std::uintmax_t file_size
Definition chain.h:70
long total_events
Definition chain.h:68
bool metadata_loaded() const noexcept
Check if metadata has been loaded.
Definition chain.h:79
std::string error_message
Definition chain.h:72
FileInfo(std::string_view name, int idx)
Definition chain.h:75