HIPO4 C++ Library 4.4.1
Columnar I/O library for CLAS12 physics data
Loading...
Searching...
No Matches
chain.h
Go to the documentation of this file.
1//******************************************************************************
2//* ██╗ ██╗██╗██████╗ ██████╗ ██╗ ██╗ ██████╗ *
3//* ██║ ██║██║██╔══██╗██╔═══██╗ ██║ ██║ ██╔═████╗ *
4//* ███████║██║██████╔╝██║ ██║ ███████║ ██║██╔██║ *
5//* ██╔══██║██║██╔═══╝ ██║ ██║ ╚════██║ ████╔╝██║ *
6//* ██║ ██║██║██║ ╚██████╔╝ ██║██╗╚██████╔╝ *
7//* ╚═╝ ╚═╝╚═╝╚═╝ ╚═════╝ ╚═╝╚═╝ ╚═════╝ *
8//************************ Jefferson National Lab (2017) ***********************
34
35#pragma once
36
37#include <atomic>
38#include <filesystem>
39#include <memory>
40#include <mutex>
41#include <optional>
42#include <regex>
43#include <string>
44#include <string_view>
45#include <vector>
46
47#include <fmt/format.h>
48#include "reader.h"
49#include "record.h"
50#include "dictionary.h"
51#include "bank.h"
52#include "event.h"
53#include "progresstracker.hpp"
54#include "threadpool.hpp"
55
56namespace hipo {
57
58//=============================================================================
59// FileInfo - Metadata for a single file in the chain
60//=============================================================================
61
65struct FileInfo {
66 std::string filename;
67 int index{0};
68 long total_events{-1}; // -1 = not yet loaded
69 int num_records{-1}; // -1 = not yet loaded
70 std::uintmax_t file_size{0};
71 bool is_valid{true};
72 std::string error_message;
73
74 FileInfo() = default;
75 FileInfo(std::string_view name, int idx)
76 : filename(name), index(idx) {}
77
79 [[nodiscard]] bool metadata_loaded() const noexcept {
80 return total_events >= 0;
81 }
82
84 [[nodiscard]] std::string size_string() const {
85 if (file_size > 1024ULL * 1024 * 1024)
86 return fmt::format("{:.1f} GB", file_size / (1024.0 * 1024 * 1024));
87 if (file_size > 1024 * 1024)
88 return fmt::format("{:.1f} MB", file_size / (1024.0 * 1024));
89 if (file_size > 1024)
90 return fmt::format("{:.1f} KB", file_size / 1024.0);
91 return fmt::format("{} B", file_size);
92 }
93};
94
95// Legacy alias
97
98//=============================================================================
99// ChainStatistics - Processing statistics
100//=============================================================================
101
106 std::atomic<long> total_events{0};
107 std::atomic<long> events_processed{0};
108 std::atomic<long> events_skipped{0};
109 std::chrono::steady_clock::time_point start_time;
110 std::chrono::steady_clock::time_point end_time;
111
112 void reset() noexcept {
113 total_events = 0;
115 events_skipped = 0;
116 start_time = std::chrono::steady_clock::now();
117 }
118
119 [[nodiscard]] double elapsed_seconds() const noexcept {
120 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
122 return elapsed.count() / 1000.0;
123 }
124
125 [[nodiscard]] double throughput() const noexcept {
126 double elapsed = elapsed_seconds();
127 return elapsed > 0 ? static_cast<double>(events_processed) / elapsed : 0.0;
128 }
129};
130
131// Legacy alias
133
134//=============================================================================
135// chain_event - Event wrapper with dictionary for bank access
136//=============================================================================
137
142public:
143 chain_event() : m_event(nullptr), m_dict(nullptr) {}
144 chain_event(event* ev, dictionary* dict) : m_event(ev), m_dict(dict) {}
145
147 bank getBank(const std::string& name) {
148 if (!m_event || !m_dict) {
149 throw std::runtime_error("Invalid chain_event (no event data)");
150 }
151 bank b(m_dict->getSchema(name.c_str()));
152 m_event->read(b);
153 return b;
154 }
155
157 bank get_bank(const std::string& name) { return getBank(name); }
158
160 void readBanks(banklist& list) {
161 if (!m_event) {
162 throw std::runtime_error("Invalid chain_event (no event data)");
163 }
164 for (auto& b : list) {
165 m_event->read(b);
166 }
167 }
168
170 void read_banks(banklist& list) { readBanks(list); }
171
173 event* raw() { return m_event; }
174 const event* raw() const { return m_event; }
175
177 explicit operator bool() const { return m_event != nullptr && m_dict != nullptr; }
178
179private:
180 event* m_event;
181 dictionary* m_dict;
182};
183
184//=============================================================================
185// ChainIterator - Iterator for sequential access across all files
186//=============================================================================
187
188class chain; // Forward declaration
189
196public:
202
203 using iterator_category = std::input_iterator_tag;
205 using difference_type = std::ptrdiff_t;
208
209 ChainIterator() : m_chain(nullptr), m_exhausted(true) {}
210
211 ChainIterator(chain* ch, bool at_end);
212
214 void operator++(int) { ++(*this); }
215
216 [[nodiscard]] EventData operator*() {
217 return {chain_event(&m_current_event, m_current_dict.get()),
218 m_current_file_idx, m_current_event_idx};
219 }
220
221 [[nodiscard]] bool operator==(const ChainIterator& other) const noexcept {
222 if (m_exhausted && other.m_exhausted) return true;
223 if (m_exhausted != other.m_exhausted) return false;
224 return m_current_file_idx == other.m_current_file_idx &&
225 m_current_event_idx == other.m_current_event_idx;
226 }
227
228 [[nodiscard]] bool operator!=(const ChainIterator& other) const noexcept {
229 return !(*this == other);
230 }
231
232private:
233 chain* m_chain;
234 std::unique_ptr<reader> m_current_reader;
235 std::unique_ptr<dictionary> m_current_dict;
236 event m_current_event;
237 int m_current_file_idx{0};
238 long m_current_event_idx{0};
239 bool m_exhausted{false};
240
241 bool advance_to_next_event();
242 bool open_next_file();
243};
244
245//=============================================================================
246// chain - Main class for chaining multiple HIPO files
247//=============================================================================
248
274class chain {
275public:
278
279 //=========================================================================
280 // Construction
281 //=========================================================================
282
289 explicit chain(int threads = 0, bool progress = true, bool verbose = false)
290 : m_num_threads(threads == 0 ? static_cast<int>(std::thread::hardware_concurrency()) : threads),
291 m_show_progress(progress),
292 m_verbose(verbose),
293 m_thread_pool(m_num_threads, "ChainWorker") {
294 if (m_verbose) {
295 fmt::print("[chain] Initialized with {} threads\n", m_num_threads);
296 }
297 }
298
299 //=========================================================================
300 // File Management
301 //=========================================================================
302
308 int add(std::string_view filename) {
309 if (!std::filesystem::exists(filename)) {
310 fmt::print(stderr, "[chain] Warning: File not found: {}\n", filename);
311 return static_cast<int>(m_files.size());
312 }
313 std::lock_guard lock(m_mutex);
314 m_files.emplace_back(filename, static_cast<int>(m_files.size()));
315 m_metadata_loaded = false;
316 if (m_verbose) {
317 fmt::print("[chain] Added file {}: {}\n", m_files.size(), filename);
318 }
319 return static_cast<int>(m_files.size());
320 }
321
327 int add(std::vector<std::string>& filenames) {
328 for (const auto& f : filenames) add(f);
329 return static_cast<int>(m_files.size());
330 }
331
337 int add_pattern(std::string_view pattern) {
338 int added = 0;
339 std::filesystem::path pattern_path(pattern);
340 std::filesystem::path dir = pattern_path.parent_path();
341 if (dir.empty()) dir = ".";
342
343 std::string filename_pattern = pattern_path.filename().string();
344 std::string regex_pattern = std::regex_replace(filename_pattern,
345 std::regex(R"(\*)"), ".*");
346 regex_pattern = std::regex_replace(regex_pattern,
347 std::regex(R"(\?)"), ".");
348
349 try {
350 std::regex file_regex(regex_pattern);
351 std::vector<std::string> matched_files;
352
353 for (const auto& entry : std::filesystem::directory_iterator(dir)) {
354 if (entry.is_regular_file()) {
355 std::string fname = entry.path().filename().string();
356 if (std::regex_match(fname, file_regex)) {
357 matched_files.push_back(entry.path().string());
358 }
359 }
360 }
361
362 // Sort for deterministic ordering
363 std::sort(matched_files.begin(), matched_files.end());
364 for (const auto& f : matched_files) {
365 add(f);
366 added++;
367 }
368 } catch (const std::exception& e) {
369 fmt::print(stderr, "[chain] Error scanning directory: {}\n", e.what());
370 }
371
372 if (m_verbose) {
373 fmt::print("[chain] Pattern '{}' matched {} files\n", pattern, added);
374 }
375 return added;
376 }
377
381 void clear() {
382 std::lock_guard lock(m_mutex);
383 m_files.clear();
384 m_stats.reset();
385 m_metadata_loaded = false;
386 if (m_verbose) fmt::print("[chain] Cleared all files\n");
387 }
388
389 //=========================================================================
390 // File Information
391 //=========================================================================
392
394 [[nodiscard]] std::size_t size() const noexcept { return m_files.size(); }
395
397 [[nodiscard]] bool empty() const noexcept { return m_files.empty(); }
398
400 [[nodiscard]] int get_nb_files() const noexcept {
401 return static_cast<int>(m_files.size());
402 }
403
405 [[nodiscard]] const FileInfo& operator[](std::size_t index) const {
406 if (index >= m_files.size()) {
407 throw std::out_of_range("chain: File index out of range");
408 }
409 return m_files[index];
410 }
411
413 [[nodiscard]] const FileInfo& file_info(int index) const {
414 return (*this)[static_cast<std::size_t>(index)];
415 }
416
418 [[nodiscard]] std::vector<FileInfo>& files() noexcept {
419 return m_files;
420 }
421
422 //=========================================================================
423 // Configuration
424 //=========================================================================
425
427 void set_tags(const std::vector<long>& tags) { m_tags = tags; }
428
430 void set_threads(int n) {
431 m_num_threads = (n == 0) ? static_cast<int>(std::thread::hardware_concurrency()) : n;
432 }
433
435 void set_progress(bool show) { m_show_progress = show; }
436
438 void set_verbose(bool verbose) { m_verbose = verbose; }
439
440 //=========================================================================
441 // Validation and Scanning
442 //=========================================================================
443
449 void open(bool validate_all = true) {
450 if (m_files.empty()) {
451 throw std::runtime_error("chain: No files added to chain");
452 }
453
454 if (m_verbose) {
455 fmt::print("[chain] Validating {} files...\n", m_files.size());
456 }
457
458 int valid_count = 0;
459 long total_events = 0;
460
461 for (auto& file : m_files) {
462 if (!std::filesystem::exists(file.filename)) {
463 file.is_valid = false;
464 file.error_message = "File not found";
465 continue;
466 }
467
468 file.file_size = std::filesystem::file_size(file.filename);
469
470 if (validate_all) {
471 try {
472 reader temp_reader;
473 temp_reader.setTags(m_tags);
474 temp_reader.open(file.filename.c_str());
475
476 file.total_events = temp_reader.getEntries();
477 file.num_records = temp_reader.getNRecords();
478 total_events += file.total_events;
479 valid_count++;
480
481 if (m_verbose) {
482 fmt::print(" [{}] {} - OK ({} events)\n",
483 file.index, file.filename, file.total_events);
484 }
485 } catch (const std::exception& e) {
486 file.is_valid = false;
487 file.error_message = e.what();
488 }
489 } else {
490 valid_count++;
491 }
492 }
493
494 m_stats.total_events = total_events;
495 m_metadata_loaded = validate_all;
496
497 if (m_verbose) {
498 fmt::print("[chain] Valid files: {}/{}, Total events: {}\n",
499 valid_count, m_files.size(), total_events);
500 }
501
502 if (valid_count == 0) {
503 throw std::runtime_error("chain: No valid files in chain");
504 }
505 }
506
508 void scan() {
509 if (m_files.empty()) {
510 fmt::print("[chain] No files in chain\n");
511 return;
512 }
513
514 std::unique_ptr<ProgressTracker> progress;
515 if (m_show_progress) {
517 config.label = "Scanning files";
518 config.show_eta = false;
519 progress = std::make_unique<ProgressTracker>(m_files.size(), config);
520 progress->start();
521 }
522
523 long total_events = 0;
524 int total_records = 0;
525 std::uintmax_t total_size = 0;
526 int valid_files = 0;
527
528 for (auto& file : m_files) {
529 try {
530 if (!std::filesystem::exists(file.filename)) {
531 std::lock_guard stdout_lock{detail::get_stdout_mutex()};
532 fmt::print(" [{}] {} - FILE NOT FOUND\n", file.index, file.filename);
533 continue;
534 }
535
536 file.file_size = std::filesystem::file_size(file.filename);
537 total_size += file.file_size;
538
539 reader temp_reader;
540 temp_reader.setTags(m_tags);
541 temp_reader.open(file.filename.c_str());
542
543 file.num_records = temp_reader.getNRecords();
544 file.total_events = temp_reader.getEntries();
545 total_records += file.num_records;
546 total_events += file.total_events;
547 valid_files++;
548
549 {
550 std::lock_guard stdout_lock{detail::get_stdout_mutex()};
551 fmt::print(" [{}] {}\n Records: {}, Events: {}, Size: {}\n",
552 file.index, file.filename, file.num_records,
553 file.total_events, file.size_string());
554 }
555
556 } catch (const std::exception& e) {
557 std::lock_guard stdout_lock{detail::get_stdout_mutex()};
558 fmt::print(" [{}] {} - ERROR: {}\n", file.index, file.filename, e.what());
559 }
560
561 if (progress) progress->increment();
562 }
563
564 if (progress) progress->finish();
565
566 fmt::print("\nScan Summary\n");
567 fmt::print(" Total files: {}\n", m_files.size());
568 fmt::print(" Valid files: {}\n", valid_files);
569 fmt::print(" Total records: {}\n", total_records);
570 fmt::print(" Total events: {}\n", total_events);
571
572 FileInfo dummy;
573 dummy.file_size = total_size;
574 fmt::print(" Total size: {}\n", dummy.size_string());
575
576 if (valid_files > 0) {
577 fmt::print(" Avg events/file: {}\n", total_events / valid_files);
578 }
579 }
580
582 void list() const {
583 fmt::print("Chain file list ({} files)\n", m_files.size());
584 for (const auto& file : m_files) {
585 fmt::print(" [{}] {}", file.index, file.filename);
586 if (file.metadata_loaded()) {
587 fmt::print(" ({} events, {} records)", file.total_events, file.num_records);
588 }
589 if (!file.is_valid) {
590 fmt::print(" [INVALID: {}]", file.error_message);
591 }
592 fmt::print("\n");
593 }
594 }
595
596 //=========================================================================
597 // Bank Creation
598 //=========================================================================
599
606 banklist getBanks(const std::vector<std::string>& names) {
607 if (m_files.empty()) {
608 throw std::runtime_error("[chain] No files added, cannot create banklist");
609 }
610 reader temp_reader;
611 temp_reader.open(m_files[0].filename.c_str());
612 return temp_reader.getBanks(names);
613 }
614
616 banklist get_banks(const std::vector<std::string>& names) { return getBanks(names); }
617
618 //=========================================================================
619 // Range-based Iteration
620 //=========================================================================
621
632 [[nodiscard]] iterator begin() { return ChainIterator(this, false); }
633
635 [[nodiscard]] iterator end() { return ChainIterator(this, true); }
636
637 //=========================================================================
638 // Parallel Processing
639 //=========================================================================
640
658 template<typename ProcessFunc>
659 void process(ProcessFunc&& process_func, double percentage = 100.0) {
660 load_metadata_if_needed();
661 long target = static_cast<long>(
662 m_stats.total_events * std::clamp(percentage, 0.0, 100.0) / 100.0);
663 process_impl(std::forward<ProcessFunc>(process_func), target,
664 fmt::format("Processing ({:.1f}%)", std::clamp(percentage, 0.0, 100.0)));
665 }
666
681 template<typename ProcessFunc>
682 void process(ProcessFunc&& process_func, long num_events) {
683 load_metadata_if_needed();
684 long target = std::min(num_events, static_cast<long>(m_stats.total_events.load()));
685 process_impl(std::forward<ProcessFunc>(process_func), target,
686 fmt::format("Processing ({} events)", target));
687 }
688
709 template<typename ProcessFunc>
710 void process(const banklist& banks, ProcessFunc&& process_func, double percentage = 100.0) {
711 if (m_files.empty()) {
712 fmt::print(stderr, "[chain] Warning: No files to process.\n");
713 return;
714 }
715
716 percentage = std::clamp(percentage, 0.0, 100.0) / 100.0;
717
718 // Ensure metadata is loaded
719 load_metadata_if_needed();
720
721 long target_events = static_cast<long>(m_stats.total_events * percentage);
722
723 // Build work list
724 struct WorkItem {
725 int file_idx;
726 int num_records;
727 long max_events;
728 };
729 std::vector<WorkItem> work_list;
730 long events_accumulated = 0;
731
732 for (const auto& file : m_files) {
733 if (!file.is_valid || events_accumulated >= target_events) continue;
734 long events_needed = target_events - events_accumulated;
735 long events_from_file = std::min(events_needed, file.total_events);
736 if (events_from_file > 0) {
737 work_list.push_back({file.index, file.num_records, events_from_file});
738 events_accumulated += events_from_file;
739 }
740 }
741
742 m_stats.reset();
743 m_stats.total_events = events_accumulated;
744
745 if (m_verbose) {
746 fmt::print("[chain] Processing {} events from {} files ({} threads, record-level parallelism, banklist)\n",
747 events_accumulated, work_list.size(), m_num_threads);
748 }
749
750 std::unique_ptr<ProgressTracker> progress;
751 if (m_show_progress && m_stats.total_events > 0) {
753 config.label = fmt::format("Processing ({:.1f}%)", percentage * 100.0);
754 config.show_eta = true;
755 config.show_rate = true;
756 progress = std::make_unique<ProgressTracker>(m_stats.total_events, config);
757 progress->start();
758 }
759
760 // Process one file at a time with all threads using record-level parallelism
761 for (const auto& work : work_list) {
762 const auto& file = m_files[work.file_idx];
763 std::atomic<int> next_record{0};
764 std::atomic<long> events_processed_in_file{0};
765 std::vector<std::future<void>> futures;
766
767 for (int t = 0; t < m_num_threads; t++) {
768 futures.push_back(m_thread_pool.submit([&, this]() {
769 try {
770 // Each thread has its own reader, record buffer, and banklist copy
771 reader file_reader;
772 file_reader.setTags(m_tags);
773 file_reader.open(file.filename.c_str());
774
775 record rec;
776 event temp_event;
777 banklist thread_banks = banks; // thread-local copy
778
779 while (true) {
780 int rec_idx = next_record.fetch_add(1);
781 if (rec_idx >= work.num_records) break;
782
783 if (events_processed_in_file.load() >= work.max_events) break;
784
785 if (!file_reader.loadRecord(rec, rec_idx)) continue;
786
787 int events_in_record = rec.getEventCount();
788
789 for (int evt_in_rec = 0; evt_in_rec < events_in_record; evt_in_rec++) {
790 long current_count = events_processed_in_file.fetch_add(1);
791 if (current_count >= work.max_events) {
792 events_processed_in_file.fetch_sub(1);
793 break;
794 }
795
796 rec.readHipoEvent(temp_event, evt_in_rec);
797 for (auto& b : thread_banks) {
798 temp_event.read(b);
799 }
800 process_func(thread_banks, file.index, current_count);
801 m_stats.events_processed++;
802 if (progress) progress->increment();
803 }
804 }
805 } catch (const std::exception& e) {
806 std::lock_guard stdout_lock{detail::get_stdout_mutex()};
807 fmt::print(stderr, "[chain] Thread error: {}\n", e.what());
808 }
809 }));
810 }
811
812 for (auto& f : futures) f.get();
813 }
814
815 m_stats.end_time = std::chrono::steady_clock::now();
816 if (progress) progress->finish();
817 if (m_verbose) print_statistics();
818 }
819
830 template<typename ProcessFunc>
831 void process_filtered(ProcessFunc&& process_func,
832 const std::vector<std::string>& required_banks,
833 double percentage = 100.0) {
834 load_metadata_if_needed();
835 long target = static_cast<long>(
836 m_stats.total_events * std::clamp(percentage, 0.0, 100.0) / 100.0);
837 process_filtered_impl(std::forward<ProcessFunc>(process_func),
838 required_banks, target);
839 }
840
851 template<typename ProcessFunc>
852 void process_filtered(ProcessFunc&& process_func,
853 const std::vector<std::string>& required_banks,
854 long num_events) {
855 load_metadata_if_needed();
856 process_filtered_impl(std::forward<ProcessFunc>(process_func),
857 required_banks, num_events);
858 }
859
865 template<typename FileFunc>
866 void for_each_file(FileFunc&& func) {
867 for (const auto& file : m_files) {
868 if (!file.is_valid) continue;
869 try {
870 reader file_reader;
871 file_reader.setTags(m_tags);
872 file_reader.open(file.filename.c_str());
873 func(file_reader, file);
874 } catch (const std::exception& e) {
875 fmt::print(stderr, "[chain] Error processing {}: {}\n",
876 file.filename, e.what());
877 }
878 }
879 }
880
881 //=========================================================================
882 // Statistics and Information
883 //=========================================================================
884
886 [[nodiscard]] const ChainStatistics& statistics() const noexcept { return m_stats; }
887
889 [[nodiscard]] long total_events() {
890 load_metadata_if_needed();
891 return m_stats.total_events;
892 }
893
895 [[nodiscard]] long total_events_count() const { return m_stats.total_events.load(); }
896
898 void print_statistics() const {
899 fmt::print("\nChain Statistics\n");
900 fmt::print(" Files: {}\n", m_files.size());
901 fmt::print(" Total events: {}\n", m_stats.total_events.load());
902 fmt::print(" Events processed: {}\n", m_stats.events_processed.load());
903 fmt::print(" Processing time: {:.2f} seconds\n", m_stats.elapsed_seconds());
904 if (m_stats.elapsed_seconds() > 0) {
905 fmt::print(" Throughput: {:.1f} events/sec\n", m_stats.throughput());
906 }
907 }
908
911 fmt::print("\nChain File Info ({} files)\n", m_files.size());
912 fmt::print("================\n");
913 for (const auto& file : m_files) {
914 fmt::print("\nFile [{}]: {}\n", file.index, file.filename);
915 try {
916 reader temp_reader;
917 temp_reader.setTags(m_tags);
918 temp_reader.open(file.filename.c_str());
919 temp_reader.about();
920 } catch (const std::exception& e) {
921 fmt::print(" Error: {}\n", e.what());
922 }
923 }
924 }
925
926 //=========================================================================
927 // Configuration Access
928 //=========================================================================
929
931 [[nodiscard]] bool any_has_config(std::string_view name) {
932 for (const auto& file : m_files) {
933 if (!file.is_valid) continue;
934 try {
935 reader temp_reader;
936 temp_reader.setTags(m_tags);
937 temp_reader.open(file.filename.c_str());
938
939 std::map<std::string, std::string> config;
940 temp_reader.readUserConfig(config);
941 if (config.find(std::string(name)) != config.end()) {
942 return true;
943 }
944 } catch (...) {}
945 }
946 return false;
947 }
948
950 [[nodiscard]] std::optional<std::string> get_config(std::string_view name) {
951 for (const auto& file : m_files) {
952 if (!file.is_valid) continue;
953 try {
954 reader temp_reader;
955 temp_reader.setTags(m_tags);
956 temp_reader.open(file.filename.c_str());
957
958 std::map<std::string, std::string> config;
959 temp_reader.readUserConfig(config);
960 auto it = config.find(std::string(name));
961 if (it != config.end()) {
962 return it->second;
963 }
964 } catch (...) {}
965 }
966 return std::nullopt;
967 }
968
969 //=========================================================================
970 // Advanced Access
971 //=========================================================================
972
974 [[nodiscard]] ThreadPool& threadpool() noexcept { return m_thread_pool; }
975
976private:
977 //=========================================================================
978 // Private Members
979 //=========================================================================
980
981 std::vector<FileInfo> m_files;
982 std::vector<long> m_tags;
983 int m_num_threads;
984 bool m_show_progress;
985 bool m_verbose;
986 bool m_metadata_loaded{false};
987 ThreadPool m_thread_pool;
988 ChainStatistics m_stats;
989 mutable std::mutex m_mutex;
990
991 //=========================================================================
992 // Private Methods
993 //=========================================================================
994
996 void load_metadata_if_needed() {
997 if (m_metadata_loaded) return;
998
999 if (m_verbose) fmt::print("[chain] Loading file metadata...\n");
1000
1001 long total = 0;
1002 for (auto& file : m_files) {
1003 if (!file.is_valid) continue;
1004 if (file.total_events < 0) {
1005 try {
1006 reader temp_reader;
1007 temp_reader.setTags(m_tags);
1008 temp_reader.open(file.filename.c_str());
1009
1010 file.total_events = temp_reader.getEntries();
1011 file.num_records = temp_reader.getNRecords();
1012 } catch (const std::exception& e) {
1013 file.is_valid = false;
1014 file.error_message = e.what();
1015 file.total_events = 0;
1016 }
1017 }
1018 total += file.total_events;
1019 }
1020 m_stats.total_events = total;
1021 m_metadata_loaded = true;
1022 }
1023
1025 template<typename ProcessFunc>
1026 void process_impl(ProcessFunc&& process_func, long target_events,
1027 const std::string& label) {
1028 if (m_files.empty()) {
1029 fmt::print(stderr, "[chain] Warning: No files to process.\n");
1030 return;
1031 }
1032
1033 // Build work list - tracking records per file
1034 struct WorkItem {
1035 int file_idx;
1036 int num_records;
1037 long max_events;
1038 };
1039 std::vector<WorkItem> work_list;
1040 long events_accumulated = 0;
1041
1042 for (const auto& file : m_files) {
1043 if (!file.is_valid || events_accumulated >= target_events) continue;
1044 long events_needed = target_events - events_accumulated;
1045 long events_from_file = std::min(events_needed, file.total_events);
1046 if (events_from_file > 0) {
1047 work_list.push_back({file.index, file.num_records, events_from_file});
1048 events_accumulated += events_from_file;
1049 }
1050 }
1051
1052 m_stats.reset();
1053 m_stats.total_events = events_accumulated;
1054
1055 if (m_verbose) {
1056 fmt::print("[chain] Processing {} events from {} files ({} threads, record-level parallelism)\n",
1057 events_accumulated, work_list.size(), m_num_threads);
1058 }
1059
1060 std::unique_ptr<ProgressTracker> progress;
1061 if (m_show_progress && m_stats.total_events > 0) {
1063 config.label = label;
1064 config.show_eta = true;
1065 config.show_rate = true;
1066 progress = std::make_unique<ProgressTracker>(m_stats.total_events, config);
1067 progress->start();
1068 }
1069
1070 for (const auto& work : work_list) {
1071 const auto& file = m_files[work.file_idx];
1072 std::atomic<int> next_record{0};
1073 std::atomic<long> events_processed_in_file{0};
1074 std::vector<std::future<void>> futures;
1075
1076 for (int t = 0; t < m_num_threads; t++) {
1077 futures.push_back(m_thread_pool.submit([&, this]() {
1078 try {
1079 reader file_reader;
1080 file_reader.setTags(m_tags);
1081 file_reader.open(file.filename.c_str());
1082
1083 dictionary dict;
1084 file_reader.readDictionary(dict);
1085
1086 record rec;
1087 event temp_event;
1088
1089 while (true) {
1090 int rec_idx = next_record.fetch_add(1);
1091 if (rec_idx >= work.num_records) break;
1092 if (events_processed_in_file.load() >= work.max_events) break;
1093
1094 if (!file_reader.loadRecord(rec, rec_idx)) continue;
1095
1096 int events_in_record = rec.getEventCount();
1097
1098 for (int evt_in_rec = 0; evt_in_rec < events_in_record; evt_in_rec++) {
1099 long current_count = events_processed_in_file.fetch_add(1);
1100 if (current_count >= work.max_events) {
1101 events_processed_in_file.fetch_sub(1);
1102 break;
1103 }
1104
1105 rec.readHipoEvent(temp_event, evt_in_rec);
1106 chain_event evt(&temp_event, &dict);
1107 process_func(evt, file.index, current_count);
1108 m_stats.events_processed++;
1109 if (progress) progress->increment();
1110 }
1111 }
1112 } catch (const std::exception& e) {
1113 std::lock_guard stdout_lock{detail::get_stdout_mutex()};
1114 fmt::print(stderr, "[chain] Thread error: {}\n", e.what());
1115 }
1116 }));
1117 }
1118
1119 for (auto& f : futures) f.get();
1120 }
1121
1122 m_stats.end_time = std::chrono::steady_clock::now();
1123 if (progress) progress->finish();
1124 if (m_verbose) print_statistics();
1125 }
1126
1128 template<typename ProcessFunc>
1129 void process_filtered_impl(ProcessFunc&& process_func,
1130 const std::vector<std::string>& required_banks,
1131 long target_events) {
1132 if (m_files.empty()) {
1133 fmt::print(stderr, "[chain] Warning: No files to process.\n");
1134 return;
1135 }
1136
1137 m_stats.reset();
1138 m_stats.total_events = target_events;
1139
1140 if (m_verbose) {
1141 fmt::print("[chain] Filtered processing: requiring banks [");
1142 for (size_t i = 0; i < required_banks.size(); i++) {
1143 fmt::print("{}{}", required_banks[i],
1144 i < required_banks.size() - 1 ? ", " : "");
1145 }
1146 fmt::print("] (record-level parallelism)\n");
1147 }
1148
1149 std::unique_ptr<ProgressTracker> progress;
1150 if (m_show_progress && target_events > 0) {
1152 config.label = "Filtered processing";
1153 config.show_eta = true;
1154 progress = std::make_unique<ProgressTracker>(target_events, config);
1155 progress->start();
1156 }
1157
1158 std::atomic<long> events_found{0};
1159
1160 for (const auto& file : m_files) {
1161 if (!file.is_valid || events_found >= target_events) continue;
1162
1163 std::atomic<int> next_record{0};
1164 std::vector<std::future<void>> futures;
1165
1166 for (int t = 0; t < m_num_threads && events_found < target_events; t++) {
1167 futures.push_back(m_thread_pool.submit([&, this]() {
1168 try {
1169 reader file_reader;
1170 file_reader.setTags(m_tags);
1171 file_reader.open(file.filename.c_str());
1172
1173 dictionary dict;
1174 file_reader.readDictionary(dict);
1175
1176 std::vector<bank> banks;
1177 banks.reserve(required_banks.size());
1178 for (const auto& name : required_banks) {
1179 banks.emplace_back(dict.getSchema(name.c_str()));
1180 }
1181
1182 record rec;
1183 event temp_event;
1184
1185 while (events_found < target_events) {
1186 int rec_idx = next_record.fetch_add(1);
1187 if (rec_idx >= file.num_records) break;
1188
1189 if (!file_reader.loadRecord(rec, rec_idx)) continue;
1190
1191 int events_in_record = rec.getEventCount();
1192
1193 for (int evt_in_rec = 0; evt_in_rec < events_in_record; evt_in_rec++) {
1194 if (events_found >= target_events) break;
1195
1196 rec.readHipoEvent(temp_event, evt_in_rec);
1197
1198 bool has_all_banks = true;
1199 for (auto& b : banks) {
1200 temp_event.read(b);
1201 if (b.getRows() == 0) {
1202 has_all_banks = false;
1203 break;
1204 }
1205 }
1206
1207 if (has_all_banks) {
1208 long evt_count = events_found.fetch_add(1);
1209 if (evt_count >= target_events) break;
1210
1211 chain_event evt(&temp_event, &dict);
1212 process_func(evt, file.index, evt_count);
1213 m_stats.events_processed++;
1214 if (progress) progress->increment();
1215 }
1216 }
1217 }
1218 } catch (const std::exception& e) {
1219 std::lock_guard stdout_lock{detail::get_stdout_mutex()};
1220 fmt::print(stderr, "[chain] Thread error: {}\n", e.what());
1221 }
1222 }));
1223 }
1224 for (auto& f : futures) f.get();
1225 }
1226
1227 m_stats.end_time = std::chrono::steady_clock::now();
1228 if (progress) progress->finish();
1229 if (m_verbose) {
1230 fmt::print("[chain] Found {} matching events\n", events_found.load());
1231 print_statistics();
1232 }
1233 }
1234
1235 // Allow ChainIterator to access private members
1236 friend class ChainIterator;
1237};
1238
1239//=============================================================================
1240// ChainIterator Implementation
1241//=============================================================================
1242
1243inline ChainIterator::ChainIterator(chain* ch, bool at_end)
1244 : m_chain(ch), m_exhausted(at_end) {
1245 if (!at_end && ch && !ch->m_files.empty()) {
1246 if (open_next_file()) {
1247 advance_to_next_event();
1248 } else {
1249 m_exhausted = true;
1250 }
1251 }
1252}
1253
1255 if (!m_exhausted) {
1256 m_current_event_idx++;
1257 if (!advance_to_next_event()) {
1258 m_exhausted = true;
1259 }
1260 }
1261 return *this;
1262}
1263
1264inline bool ChainIterator::advance_to_next_event() {
1265 while (m_current_reader) {
1266 if (m_current_reader->next(m_current_event)) {
1267 return true;
1268 }
1269 // Current file exhausted, try next
1270 m_current_file_idx++;
1271 m_current_event_idx = 0;
1272 if (!open_next_file()) {
1273 return false;
1274 }
1275 }
1276 return false;
1277}
1278
1279inline bool ChainIterator::open_next_file() {
1280 while (m_current_file_idx < static_cast<int>(m_chain->m_files.size())) {
1281 const auto& file = m_chain->m_files[m_current_file_idx];
1282 if (!file.is_valid) {
1283 m_current_file_idx++;
1284 continue;
1285 }
1286 try {
1287 m_current_reader = std::make_unique<reader>();
1288 m_current_reader->setTags(m_chain->m_tags);
1289 m_current_reader->open(file.filename.c_str());
1290
1291 m_current_dict = std::make_unique<dictionary>();
1292 m_current_reader->readDictionary(*m_current_dict);
1293 return true;
1294 } catch (const std::exception& e) {
1295 fmt::print(stderr, "[chain] Error opening {}: {}\n",
1296 file.filename, e.what());
1297 m_current_file_idx++;
1298 }
1299 }
1300 m_current_reader.reset();
1301 m_current_dict.reset();
1302 return false;
1303}
1304
1305} // namespace hipo
A ThreadPool implementation.
Definition threadpool.hpp:22
auto submit(Callable &&callable, Args &&... args) -> std::future< std::invoke_result_t< std::decay_t< Callable >, std::decay_t< Args >... > >
Submits a task for asynchronous execution.
Definition threadpool.hpp:128
Iterator for traversing events across all files in a chain.
Definition chain.h:195
std::input_iterator_tag iterator_category
Definition chain.h:203
void operator++(int)
Definition chain.h:214
ChainIterator()
Definition chain.h:209
std::ptrdiff_t difference_type
Definition chain.h:205
EventData operator*()
Definition chain.h:216
bool operator==(const ChainIterator &other) const noexcept
Definition chain.h:221
bool operator!=(const ChainIterator &other) const noexcept
Definition chain.h:228
ChainIterator & operator++()
Definition chain.h:1254
Definition bank.h:210
Event wrapper that provides bank access via dictionary.
Definition chain.h:141
chain_event()
Definition chain.h:143
void read_banks(banklist &list)
Alias for readBanks (snake_case)
Definition chain.h:170
event * raw()
Access underlying event.
Definition chain.h:173
const event * raw() const
Definition chain.h:174
bank getBank(const std::string &name)
Get a bank by name, reading it from the event.
Definition chain.h:147
bank get_bank(const std::string &name)
Alias for getBank (snake_case)
Definition chain.h:157
chain_event(event *ev, dictionary *dict)
Definition chain.h:144
void readBanks(banklist &list)
Read event data into an existing banklist.
Definition chain.h:160
Chain multiple HIPO files for unified processing.
Definition chain.h:274
int add(std::vector< std::string > &filenames)
Add multiple files to the chain.
Definition chain.h:327
iterator end()
Get end iterator.
Definition chain.h:635
void clear()
Remove all files from the chain.
Definition chain.h:381
void process_filtered(ProcessFunc &&process_func, const std::vector< std::string > &required_banks, double percentage=100.0)
Process events with bank filtering (record-level parallelism)
Definition chain.h:831
int get_nb_files() const noexcept
Legacy alias for size()
Definition chain.h:400
const ChainStatistics & statistics() const noexcept
Get processing statistics.
Definition chain.h:886
std::vector< FileInfo > & files() noexcept
Get all file infos.
Definition chain.h:418
bool empty() const noexcept
Check if chain is empty.
Definition chain.h:397
banklist getBanks(const std::vector< std::string > &names)
Create a banklist from bank names using the first file's dictionary.
Definition chain.h:606
void list() const
Print list of files in chain.
Definition chain.h:582
int add(std::string_view filename)
Add a single file to the chain.
Definition chain.h:308
std::optional< std::string > get_config(std::string_view name)
Get configuration from first file that has it.
Definition chain.h:950
void show_all_info()
Show detailed info for all files.
Definition chain.h:910
bool any_has_config(std::string_view name)
Check if any file has a configuration key.
Definition chain.h:931
void set_progress(bool show)
Enable/disable progress display.
Definition chain.h:435
const FileInfo & file_info(int index) const
Legacy alias for operator[].
Definition chain.h:413
void set_verbose(bool verbose)
Enable/disable verbose output.
Definition chain.h:438
void for_each_file(FileFunc &&func)
Apply a function to each file (for file-level operations)
Definition chain.h:866
void print_statistics() const
Print processing statistics.
Definition chain.h:898
long total_events()
Get total event count (loads metadata if needed)
Definition chain.h:889
void process(ProcessFunc &&process_func, double percentage=100.0)
Process events in parallel across all files (record-level parallelism)
Definition chain.h:659
void set_tags(const std::vector< long > &tags)
Set event tags for filtering.
Definition chain.h:427
void process(const banklist &banks, ProcessFunc &&process_func, double percentage=100.0)
Process events in parallel using a banklist (record-level parallelism)
Definition chain.h:710
void process_filtered(ProcessFunc &&process_func, const std::vector< std::string > &required_banks, long num_events)
Process filtered events with an absolute event count.
Definition chain.h:852
void scan()
Scan and display detailed information about all files.
Definition chain.h:508
int add_pattern(std::string_view pattern)
Add files matching a glob pattern.
Definition chain.h:337
banklist get_banks(const std::vector< std::string > &names)
Alias for getBanks (snake_case)
Definition chain.h:616
void open(bool validate_all=true)
Validate and optionally load metadata for all files.
Definition chain.h:449
const FileInfo & operator[](std::size_t index) const
Get file info by index.
Definition chain.h:405
void set_threads(int n)
Set number of processing threads.
Definition chain.h:430
ThreadPool & threadpool() noexcept
Get the thread pool for advanced use.
Definition chain.h:974
iterator begin()
Get iterator to first event.
Definition chain.h:632
chain(int threads=0, bool progress=true, bool verbose=false)
Construct a chain with specified thread count.
Definition chain.h:289
friend class ChainIterator
Definition chain.h:1236
void process(ProcessFunc &&process_func, long num_events)
Process a specific number of events in parallel across all files.
Definition chain.h:682
std::size_t size() const noexcept
Number of files in the chain.
Definition chain.h:394
long total_events_count() const
Legacy alias.
Definition chain.h:895
Collection of schema definitions, typically read from a HIPO file header.
Definition dictionary.h:157
schema & getSchema(const char *name)
Definition dictionary.h:167
Definition event.h:62
void read(hipo::bank &b)
Definition event.cpp:63
Definition reader.h:197
std::vector< hipo::bank > getBanks(std::vector< std::string > names)
Definition reader.cpp:279
void setTags(int tag)
Definition reader.h:248
int getEntries()
Definition reader.h:268
int getNRecords() const
Definition reader.h:264
void open(const char *filename)
Open file, if file stream is open, it is closed first.
Definition reader.cpp:78
void about()
Definition reader.cpp:68
void readUserConfig(std::map< std::string, std::string > &mapConfig)
Definition reader.cpp:289
std::recursive_mutex & get_stdout_mutex()
Returns the global recursive mutex used to synchronize stdout access.
Definition progresstracker.hpp:19
HIPO namespace is used for the classes that read/write files and records.
Definition bank.cpp:45
std::vector< bank > banklist
Definition bank.h:678
Definition progresstracker.hpp:30
std::string label
Definition progresstracker.hpp:36
bool show_rate
Definition progresstracker.hpp:33
bool show_eta
Definition progresstracker.hpp:32
Definition chain.h:197
chain_event event
Definition chain.h:198
int file_index
Definition chain.h:199
long event_index
Definition chain.h:200
Thread-safe statistics for chain processing.
Definition chain.h:105
std::chrono::steady_clock::time_point end_time
Definition chain.h:110
double elapsed_seconds() const noexcept
Definition chain.h:119
std::atomic< long > total_events
Definition chain.h:106
std::atomic< long > events_processed
Definition chain.h:107
std::chrono::steady_clock::time_point start_time
Definition chain.h:109
std::atomic< long > events_skipped
Definition chain.h:108
double throughput() const noexcept
Definition chain.h:125
void reset() noexcept
Definition chain.h:112
Metadata container for a file in the chain.
Definition chain.h:65
std::string filename
Definition chain.h:66
bool is_valid
Definition chain.h:71
int num_records
Definition chain.h:69
std::string size_string() const
Get human-readable file size string.
Definition chain.h:84
int index
Definition chain.h:67
std::uintmax_t file_size
Definition chain.h:70
FileInfo()=default
long total_events
Definition chain.h:68
bool metadata_loaded() const noexcept
Check if metadata has been loaded.
Definition chain.h:79
std::string error_message
Definition chain.h:72
FileInfo(std::string_view name, int idx)
Definition chain.h:75