HIPO  4.3.0
High Performance Output data format for experimental physics
chain.h
Go to the documentation of this file.
1 //******************************************************************************
2 //* ██╗ ██╗██╗██████╗ ██████╗ ██╗ ██╗ ██████╗ *
3 //* ██║ ██║██║██╔══██╗██╔═══██╗ ██║ ██║ ██╔═████╗ *
4 //* ███████║██║██████╔╝██║ ██║ ███████║ ██║██╔██║ *
5 //* ██╔══██║██║██╔═══╝ ██║ ██║ ╚════██║ ████╔╝██║ *
6 //* ██║ ██║██║██║ ╚██████╔╝ ██║██╗╚██████╔╝ *
7 //* ╚═╝ ╚═╝╚═╝╚═╝ ╚═════╝ ╚═╝╚═╝ ╚═════╝ *
8 //************************ Jefferson National Lab (2017) ***********************
34 
35 #pragma once
36 
37 #include <atomic>
38 #include <filesystem>
39 #include <memory>
40 #include <mutex>
41 #include <optional>
42 #include <regex>
43 #include <string>
44 #include <string_view>
45 #include <vector>
46 
47 #include <fmt/format.h>
48 #include "reader.h"
49 #include "record.h"
50 #include "dictionary.h"
51 #include "bank.h"
52 #include "event.h"
53 #include "progresstracker.hpp"
54 #include "threadpool.hpp"
55 
56 namespace hipo {
57 
58 //=============================================================================
59 // FileInfo - Metadata for a single file in the chain
60 //=============================================================================
61 
66 struct FileInfo {
67  std::string filename;
68  int index{0};
69  long total_events{-1}; // -1 = not yet loaded
70  int num_records{-1}; // -1 = not yet loaded
71  std::uintmax_t file_size{0};
72  bool is_valid{true};
73  std::string error_message;
74 
75  FileInfo() = default;
76  FileInfo(std::string_view name, int idx)
77  : filename(name), index(idx) {}
78 
80  [[nodiscard]] bool metadata_loaded() const noexcept {
81  return total_events >= 0;
82  }
83 
85  [[nodiscard]] std::string size_string() const {
86  if (file_size > 1024ULL * 1024 * 1024)
87  return fmt::format("{:.1f} GB", file_size / (1024.0 * 1024 * 1024));
88  if (file_size > 1024 * 1024)
89  return fmt::format("{:.1f} MB", file_size / (1024.0 * 1024));
90  if (file_size > 1024)
91  return fmt::format("{:.1f} KB", file_size / 1024.0);
92  return fmt::format("{} B", file_size);
93  }
94 };
95 
96 // Legacy alias
98 
99 //=============================================================================
100 // ChainStatistics - Processing statistics
101 //=============================================================================
102 
107  std::atomic<long> total_events{0};
108  std::atomic<long> events_processed{0};
109  std::atomic<long> events_skipped{0};
110  std::chrono::steady_clock::time_point start_time;
111  std::chrono::steady_clock::time_point end_time;
112 
113  void reset() noexcept {
114  total_events = 0;
115  events_processed = 0;
116  events_skipped = 0;
117  start_time = std::chrono::steady_clock::now();
118  }
119 
120  [[nodiscard]] double elapsed_seconds() const noexcept {
121  auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
122  end_time - start_time);
123  return elapsed.count() / 1000.0;
124  }
125 
126  [[nodiscard]] double throughput() const noexcept {
127  double elapsed = elapsed_seconds();
128  return elapsed > 0 ? static_cast<double>(events_processed) / elapsed : 0.0;
129  }
130 };
131 
132 // Legacy alias
134 
135 //=============================================================================
136 // chain_event - Event wrapper with dictionary for bank access
137 //=============================================================================
138 
142 class chain_event {
143 public:
144  chain_event() : m_event(nullptr), m_dict(nullptr) {}
145  chain_event(event* ev, dictionary* dict) : m_event(ev), m_dict(dict) {}
146 
148  bank getBank(const std::string& name) {
149  if (!m_event || !m_dict) {
150  throw std::runtime_error("Invalid chain_event (no event data)");
151  }
152  bank b(m_dict->getSchema(name.c_str()));
153  m_event->read(b);
154  return b;
155  }
156 
158  bank get_bank(const std::string& name) { return getBank(name); }
159 
161  event* raw() { return m_event; }
162  const event* raw() const { return m_event; }
163 
165  explicit operator bool() const { return m_event != nullptr && m_dict != nullptr; }
166 
167 private:
168  event* m_event;
169  dictionary* m_dict;
170 };
171 
172 //=============================================================================
173 // ChainIterator - Iterator for sequential access across all files
174 //=============================================================================
175 
176 class chain; // Forward declaration
177 
184 public:
185  struct EventData {
189  };
190 
191  using iterator_category = std::input_iterator_tag;
193  using difference_type = std::ptrdiff_t;
194  using pointer = EventData*;
196 
197  ChainIterator() : m_chain(nullptr), m_exhausted(true) {}
198 
199  ChainIterator(chain* ch, bool at_end);
200 
202  void operator++(int) { ++(*this); }
203 
204  [[nodiscard]] EventData operator*() {
205  return {chain_event(&m_current_event, m_current_dict.get()),
206  m_current_file_idx, m_current_event_idx};
207  }
208 
209  [[nodiscard]] bool operator==(const ChainIterator& other) const noexcept {
210  if (m_exhausted && other.m_exhausted) return true;
211  if (m_exhausted != other.m_exhausted) return false;
212  return m_current_file_idx == other.m_current_file_idx &&
213  m_current_event_idx == other.m_current_event_idx;
214  }
215 
216  [[nodiscard]] bool operator!=(const ChainIterator& other) const noexcept {
217  return !(*this == other);
218  }
219 
220 private:
221  chain* m_chain;
222  std::unique_ptr<reader> m_current_reader;
223  std::unique_ptr<dictionary> m_current_dict;
224  event m_current_event;
225  int m_current_file_idx{0};
226  long m_current_event_idx{0};
227  bool m_exhausted{false};
228 
229  bool advance_to_next_event();
230  bool open_next_file();
231 };
232 
233 //=============================================================================
234 // chain - Main class for chaining multiple HIPO files
235 //=============================================================================
236 
262 class chain {
263 public:
266 
267  //=========================================================================
268  // Construction
269  //=========================================================================
270 
277  explicit chain(int threads = 0, bool progress = true, bool verbose = false)
278  : m_num_threads(threads == 0 ? static_cast<int>(std::thread::hardware_concurrency()) : threads),
279  m_show_progress(progress),
280  m_verbose(verbose),
281  m_thread_pool(m_num_threads, "ChainWorker") {
282  if (m_verbose) {
283  fmt::print("[chain] Initialized with {} threads\n", m_num_threads);
284  }
285  }
286 
287  //=========================================================================
288  // File Management
289  //=========================================================================
290 
296  int add(std::string_view filename) {
297  if (!std::filesystem::exists(filename)) {
298  fmt::print(stderr, "[chain] Warning: File not found: {}\n", filename);
299  return static_cast<int>(m_files.size());
300  }
301  std::lock_guard lock(m_mutex);
302  m_files.emplace_back(filename, static_cast<int>(m_files.size()));
303  m_metadata_loaded = false;
304  if (m_verbose) {
305  fmt::print("[chain] Added file {}: {}\n", m_files.size(), filename);
306  }
307  return static_cast<int>(m_files.size());
308  }
309 
315  int add(std::vector<std::string>& filenames) {
316  for (const auto& f : filenames) add(f);
317  return static_cast<int>(m_files.size());
318  }
319 
325  int add_pattern(std::string_view pattern) {
326  int added = 0;
327  std::filesystem::path pattern_path(pattern);
328  std::filesystem::path dir = pattern_path.parent_path();
329  if (dir.empty()) dir = ".";
330 
331  std::string filename_pattern = pattern_path.filename().string();
332  std::string regex_pattern = std::regex_replace(filename_pattern,
333  std::regex(R"(\*)"), ".*");
334  regex_pattern = std::regex_replace(regex_pattern,
335  std::regex(R"(\?)"), ".");
336 
337  try {
338  std::regex file_regex(regex_pattern);
339  std::vector<std::string> matched_files;
340 
341  for (const auto& entry : std::filesystem::directory_iterator(dir)) {
342  if (entry.is_regular_file()) {
343  std::string fname = entry.path().filename().string();
344  if (std::regex_match(fname, file_regex)) {
345  matched_files.push_back(entry.path().string());
346  }
347  }
348  }
349 
350  // Sort for deterministic ordering
351  std::sort(matched_files.begin(), matched_files.end());
352  for (const auto& f : matched_files) {
353  add(f);
354  added++;
355  }
356  } catch (const std::exception& e) {
357  fmt::print(stderr, "[chain] Error scanning directory: {}\n", e.what());
358  }
359 
360  if (m_verbose) {
361  fmt::print("[chain] Pattern '{}' matched {} files\n", pattern, added);
362  }
363  return added;
364  }
365 
369  void clear() {
370  std::lock_guard lock(m_mutex);
371  m_files.clear();
372  m_stats.reset();
373  m_metadata_loaded = false;
374  if (m_verbose) fmt::print("[chain] Cleared all files\n");
375  }
376 
377  //=========================================================================
378  // File Information
379  //=========================================================================
380 
382  [[nodiscard]] std::size_t size() const noexcept { return m_files.size(); }
383 
385  [[nodiscard]] bool empty() const noexcept { return m_files.empty(); }
386 
388  [[nodiscard]] int get_nb_files() const noexcept {
389  return static_cast<int>(m_files.size());
390  }
391 
393  [[nodiscard]] const FileInfo& operator[](std::size_t index) const {
394  if (index >= m_files.size()) {
395  throw std::out_of_range("chain: File index out of range");
396  }
397  return m_files[index];
398  }
399 
401  [[nodiscard]] const FileInfo& file_info(int index) const {
402  return (*this)[static_cast<std::size_t>(index)];
403  }
404 
406  [[nodiscard]] std::vector<FileInfo>& files() noexcept {
407  return m_files;
408  }
409 
410  //=========================================================================
411  // Configuration
412  //=========================================================================
413 
415  void set_tags(const std::vector<long>& tags) { m_tags = tags; }
416 
418  void set_threads(int n) {
419  m_num_threads = (n == 0) ? static_cast<int>(std::thread::hardware_concurrency()) : n;
420  }
421 
423  void set_progress(bool show) { m_show_progress = show; }
424 
426  void set_verbose(bool verbose) { m_verbose = verbose; }
427 
428  //=========================================================================
429  // Validation and Scanning
430  //=========================================================================
431 
437  void open(bool validate_all = true) {
438  if (m_files.empty()) {
439  throw std::runtime_error("chain: No files added to chain");
440  }
441 
442  if (m_verbose) {
443  fmt::print("[chain] Validating {} files...\n", m_files.size());
444  }
445 
446  int valid_count = 0;
447  long total_events = 0;
448 
449  for (auto& file : m_files) {
450  if (!std::filesystem::exists(file.filename)) {
451  file.is_valid = false;
452  file.error_message = "File not found";
453  continue;
454  }
455 
456  file.file_size = std::filesystem::file_size(file.filename);
457 
458  if (validate_all) {
459  try {
460  reader temp_reader;
461  temp_reader.setTags(m_tags);
462  temp_reader.open(file.filename.c_str());
463 
464  file.total_events = temp_reader.getEntries();
465  file.num_records = temp_reader.getNRecords();
466  total_events += file.total_events;
467  valid_count++;
468 
469  if (m_verbose) {
470  fmt::print(" [{}] {} - OK ({} events)\n",
471  file.index, file.filename, file.total_events);
472  }
473  } catch (const std::exception& e) {
474  file.is_valid = false;
475  file.error_message = e.what();
476  }
477  } else {
478  valid_count++;
479  }
480  }
481 
482  m_stats.total_events = total_events;
483  m_metadata_loaded = validate_all;
484 
485  if (m_verbose) {
486  fmt::print("[chain] Valid files: {}/{}, Total events: {}\n",
487  valid_count, m_files.size(), total_events);
488  }
489 
490  if (valid_count == 0) {
491  throw std::runtime_error("chain: No valid files in chain");
492  }
493  }
494 
496  void scan() {
497  if (m_files.empty()) {
498  fmt::print("[chain] No files in chain\n");
499  return;
500  }
501 
502  std::unique_ptr<ProgressTracker> progress;
503  if (m_show_progress) {
505  config.label = "Scanning files";
506  config.show_eta = false;
507  progress = std::make_unique<ProgressTracker>(m_files.size(), config);
508  progress->start();
509  }
510 
511  long total_events = 0;
512  int total_records = 0;
513  std::uintmax_t total_size = 0;
514  int valid_files = 0;
515 
516  for (auto& file : m_files) {
517  try {
518  if (!std::filesystem::exists(file.filename)) {
519  std::lock_guard stdout_lock{detail::get_stdout_mutex()};
520  fmt::print(" [{}] {} - FILE NOT FOUND\n", file.index, file.filename);
521  continue;
522  }
523 
524  file.file_size = std::filesystem::file_size(file.filename);
525  total_size += file.file_size;
526 
527  reader temp_reader;
528  temp_reader.setTags(m_tags);
529  temp_reader.open(file.filename.c_str());
530 
531  file.num_records = temp_reader.getNRecords();
532  file.total_events = temp_reader.getEntries();
533  total_records += file.num_records;
534  total_events += file.total_events;
535  valid_files++;
536 
537  {
538  std::lock_guard stdout_lock{detail::get_stdout_mutex()};
539  fmt::print(" [{}] {}\n Records: {}, Events: {}, Size: {}\n",
540  file.index, file.filename, file.num_records,
541  file.total_events, file.size_string());
542  }
543 
544  } catch (const std::exception& e) {
545  std::lock_guard stdout_lock{detail::get_stdout_mutex()};
546  fmt::print(" [{}] {} - ERROR: {}\n", file.index, file.filename, e.what());
547  }
548 
549  if (progress) progress->increment();
550  }
551 
552  if (progress) progress->finish();
553 
554  fmt::print("\nScan Summary\n");
555  fmt::print(" Total files: {}\n", m_files.size());
556  fmt::print(" Valid files: {}\n", valid_files);
557  fmt::print(" Total records: {}\n", total_records);
558  fmt::print(" Total events: {}\n", total_events);
559 
560  FileInfo dummy;
561  dummy.file_size = total_size;
562  fmt::print(" Total size: {}\n", dummy.size_string());
563 
564  if (valid_files > 0) {
565  fmt::print(" Avg events/file: {}\n", total_events / valid_files);
566  }
567  }
568 
570  void list() const {
571  fmt::print("Chain file list ({} files)\n", m_files.size());
572  for (const auto& file : m_files) {
573  fmt::print(" [{}] {}", file.index, file.filename);
574  if (file.metadata_loaded()) {
575  fmt::print(" ({} events, {} records)", file.total_events, file.num_records);
576  }
577  if (!file.is_valid) {
578  fmt::print(" [INVALID: {}]", file.error_message);
579  }
580  fmt::print("\n");
581  }
582  }
583 
584  //=========================================================================
585  // Range-based Iteration
586  //=========================================================================
587 
598  [[nodiscard]] iterator begin() { return ChainIterator(this, false); }
599 
601  [[nodiscard]] iterator end() { return ChainIterator(this, true); }
602 
603  //=========================================================================
604  // Parallel Processing
605  //=========================================================================
606 
624  template<typename ProcessFunc>
625  void process(ProcessFunc&& process_func, double percentage = 100.0) {
626  if (m_files.empty()) {
627  fmt::print(stderr, "[chain] Warning: No files to process.\n");
628  return;
629  }
630 
631  percentage = std::clamp(percentage, 0.0, 100.0) / 100.0;
632 
633  // Ensure metadata is loaded
634  load_metadata_if_needed();
635 
636  long target_events = static_cast<long>(m_stats.total_events * percentage);
637 
638  // Build work list - now tracking records per file
639  struct WorkItem {
640  int file_idx;
641  int num_records;
642  long max_events; // Maximum events to process from this file
643  };
644  std::vector<WorkItem> work_list;
645  long events_accumulated = 0;
646 
647  for (const auto& file : m_files) {
648  if (!file.is_valid || events_accumulated >= target_events) continue;
649  long events_needed = target_events - events_accumulated;
650  long events_from_file = std::min(events_needed, file.total_events);
651  if (events_from_file > 0) {
652  work_list.push_back({file.index, file.num_records, events_from_file});
653  events_accumulated += events_from_file;
654  }
655  }
656 
657  m_stats.reset();
658  m_stats.total_events = events_accumulated;
659 
660  if (m_verbose) {
661  fmt::print("[chain] Processing {} events from {} files ({} threads, record-level parallelism)\n",
662  events_accumulated, work_list.size(), m_num_threads);
663  }
664 
665  std::unique_ptr<ProgressTracker> progress;
666  if (m_show_progress && m_stats.total_events > 0) {
668  config.label = fmt::format("Processing ({:.1f}%)", percentage * 100.0);
669  config.show_eta = true;
670  config.show_rate = true;
671  progress = std::make_unique<ProgressTracker>(m_stats.total_events, config);
672  progress->start();
673  }
674 
675  // Process one file at a time with all threads using record-level parallelism
676  for (const auto& work : work_list) {
677  const auto& file = m_files[work.file_idx];
678  std::atomic<int> next_record{0};
679  std::atomic<long> events_processed_in_file{0};
680  std::vector<std::future<void>> futures;
681 
682  for (int t = 0; t < m_num_threads; t++) {
683  futures.push_back(m_thread_pool.submit([&, this]() {
684  try {
685  // Each thread has its own reader and record buffer
686  reader file_reader;
687  file_reader.setTags(m_tags);
688  file_reader.open(file.filename.c_str());
689 
690  dictionary dict;
691  file_reader.readDictionary(dict);
692 
693  record rec;
694  event temp_event;
695 
696  while (true) {
697  // Grab next record atomically
698  int rec_idx = next_record.fetch_add(1);
699  if (rec_idx >= work.num_records) break;
700 
701  // Check if we've hit the event limit for this file
702  if (events_processed_in_file.load() >= work.max_events) break;
703 
704  // Load the entire record
705  if (!file_reader.loadRecord(rec, rec_idx)) continue;
706 
707  int events_in_record = rec.getEventCount();
708 
709  // Process all events in this record sequentially
710  for (int evt_in_rec = 0; evt_in_rec < events_in_record; evt_in_rec++) {
711  // Check event limit
712  long current_count = events_processed_in_file.fetch_add(1);
713  if (current_count >= work.max_events) {
714  events_processed_in_file.fetch_sub(1);
715  break;
716  }
717 
718  rec.readHipoEvent(temp_event, evt_in_rec);
719  chain_event evt(&temp_event, &dict);
720  process_func(evt, file.index, current_count);
721  m_stats.events_processed++;
722  if (progress) progress->increment();
723  }
724  }
725  } catch (const std::exception& e) {
726  std::lock_guard stdout_lock{detail::get_stdout_mutex()};
727  fmt::print(stderr, "[chain] Thread error: {}\n", e.what());
728  }
729  }));
730  }
731 
732  for (auto& f : futures) f.get();
733  }
734 
735  m_stats.end_time = std::chrono::steady_clock::now();
736  if (progress) progress->finish();
737  if (m_verbose) print_statistics();
738  }
739 
750  template<typename ProcessFunc>
751  void process_filtered(ProcessFunc&& process_func,
752  const std::vector<std::string>& required_banks,
753  double percentage = 100.0) {
754  if (m_files.empty()) {
755  fmt::print(stderr, "[chain] Warning: No files to process.\n");
756  return;
757  }
758 
759  percentage = std::clamp(percentage, 0.0, 100.0) / 100.0;
760  load_metadata_if_needed();
761 
762  long target_events = static_cast<long>(m_stats.total_events * percentage);
763  m_stats.reset();
764  m_stats.total_events = target_events;
765 
766  if (m_verbose) {
767  fmt::print("[chain] Filtered processing: requiring banks [");
768  for (size_t i = 0; i < required_banks.size(); i++) {
769  fmt::print("{}{}", required_banks[i],
770  i < required_banks.size() - 1 ? ", " : "");
771  }
772  fmt::print("] (record-level parallelism)\n");
773  }
774 
775  std::unique_ptr<ProgressTracker> progress;
776  if (m_show_progress && target_events > 0) {
778  config.label = "Filtered processing";
779  config.show_eta = true;
780  progress = std::make_unique<ProgressTracker>(target_events, config);
781  progress->start();
782  }
783 
784  std::atomic<long> events_found{0};
785 
786  for (const auto& file : m_files) {
787  if (!file.is_valid || events_found >= target_events) continue;
788 
789  std::atomic<int> next_record{0};
790  std::vector<std::future<void>> futures;
791 
792  for (int t = 0; t < m_num_threads && events_found < target_events; t++) {
793  futures.push_back(m_thread_pool.submit([&, this]() {
794  try {
795  reader file_reader;
796  file_reader.setTags(m_tags);
797  file_reader.open(file.filename.c_str());
798 
799  dictionary dict;
800  file_reader.readDictionary(dict);
801 
802  // Create bank objects for required banks
803  std::vector<bank> banks;
804  banks.reserve(required_banks.size());
805  for (const auto& name : required_banks) {
806  banks.emplace_back(dict.getSchema(name.c_str()));
807  }
808 
809  record rec;
810  event temp_event;
811 
812  while (events_found < target_events) {
813  // Grab next record atomically
814  int rec_idx = next_record.fetch_add(1);
815  if (rec_idx >= file.num_records) break;
816 
817  // Load the entire record
818  if (!file_reader.loadRecord(rec, rec_idx)) continue;
819 
820  int events_in_record = rec.getEventCount();
821 
822  // Process all events in this record
823  for (int evt_in_rec = 0; evt_in_rec < events_in_record; evt_in_rec++) {
824  if (events_found >= target_events) break;
825 
826  rec.readHipoEvent(temp_event, evt_in_rec);
827 
828  // Check if all required banks have data
829  bool has_all_banks = true;
830  for (auto& b : banks) {
831  temp_event.read(b);
832  if (b.getRows() == 0) {
833  has_all_banks = false;
834  break;
835  }
836  }
837 
838  if (has_all_banks) {
839  long evt_count = events_found.fetch_add(1);
840  if (evt_count >= target_events) break;
841 
842  chain_event evt(&temp_event, &dict);
843  process_func(evt, file.index, evt_count);
844  m_stats.events_processed++;
845  if (progress) progress->increment();
846  }
847  }
848  }
849  } catch (const std::exception& e) {
850  std::lock_guard stdout_lock{detail::get_stdout_mutex()};
851  fmt::print(stderr, "[chain] Thread error: {}\n", e.what());
852  }
853  }));
854  }
855  for (auto& f : futures) f.get();
856  }
857 
858  m_stats.end_time = std::chrono::steady_clock::now();
859  if (progress) progress->finish();
860  if (m_verbose) {
861  fmt::print("[chain] Found {} matching events\n", events_found.load());
862  print_statistics();
863  }
864  }
865 
871  template<typename FileFunc>
872  void for_each_file(FileFunc&& func) {
873  for (const auto& file : m_files) {
874  if (!file.is_valid) continue;
875  try {
876  reader file_reader;
877  file_reader.setTags(m_tags);
878  file_reader.open(file.filename.c_str());
879  func(file_reader, file);
880  } catch (const std::exception& e) {
881  fmt::print(stderr, "[chain] Error processing {}: {}\n",
882  file.filename, e.what());
883  }
884  }
885  }
886 
887  //=========================================================================
888  // Statistics and Information
889  //=========================================================================
890 
892  [[nodiscard]] const ChainStatistics& statistics() const noexcept { return m_stats; }
893 
895  [[nodiscard]] long total_events() {
896  load_metadata_if_needed();
897  return m_stats.total_events;
898  }
899 
901  [[nodiscard]] long total_events_count() const { return m_stats.total_events.load(); }
902 
904  void print_statistics() const {
905  fmt::print("\nChain Statistics\n");
906  fmt::print(" Files: {}\n", m_files.size());
907  fmt::print(" Total events: {}\n", m_stats.total_events.load());
908  fmt::print(" Events processed: {}\n", m_stats.events_processed.load());
909  fmt::print(" Processing time: {:.2f} seconds\n", m_stats.elapsed_seconds());
910  if (m_stats.elapsed_seconds() > 0) {
911  fmt::print(" Throughput: {:.1f} events/sec\n", m_stats.throughput());
912  }
913  }
914 
916  void show_all_info() {
917  fmt::print("\nChain File Info ({} files)\n", m_files.size());
918  fmt::print("================\n");
919  for (const auto& file : m_files) {
920  fmt::print("\nFile [{}]: {}\n", file.index, file.filename);
921  try {
922  reader temp_reader;
923  temp_reader.setTags(m_tags);
924  temp_reader.open(file.filename.c_str());
925  temp_reader.about();
926  } catch (const std::exception& e) {
927  fmt::print(" Error: {}\n", e.what());
928  }
929  }
930  }
931 
932  //=========================================================================
933  // Configuration Access
934  //=========================================================================
935 
937  [[nodiscard]] bool any_has_config(std::string_view name) {
938  for (const auto& file : m_files) {
939  if (!file.is_valid) continue;
940  try {
941  reader temp_reader;
942  temp_reader.setTags(m_tags);
943  temp_reader.open(file.filename.c_str());
944 
945  std::map<std::string, std::string> config;
946  temp_reader.readUserConfig(config);
947  if (config.find(std::string(name)) != config.end()) {
948  return true;
949  }
950  } catch (...) {}
951  }
952  return false;
953  }
954 
956  [[nodiscard]] std::optional<std::string> get_config(std::string_view name) {
957  for (const auto& file : m_files) {
958  if (!file.is_valid) continue;
959  try {
960  reader temp_reader;
961  temp_reader.setTags(m_tags);
962  temp_reader.open(file.filename.c_str());
963 
964  std::map<std::string, std::string> config;
965  temp_reader.readUserConfig(config);
966  auto it = config.find(std::string(name));
967  if (it != config.end()) {
968  return it->second;
969  }
970  } catch (...) {}
971  }
972  return std::nullopt;
973  }
974 
975  //=========================================================================
976  // Advanced Access
977  //=========================================================================
978 
980  [[nodiscard]] ThreadPool& threadpool() noexcept { return m_thread_pool; }
981 
982 private:
983  //=========================================================================
984  // Private Members
985  //=========================================================================
986 
987  std::vector<FileInfo> m_files;
988  std::vector<long> m_tags;
989  int m_num_threads;
990  bool m_show_progress;
991  bool m_verbose;
992  bool m_metadata_loaded{false};
993  ThreadPool m_thread_pool;
994  ChainStatistics m_stats;
995  mutable std::mutex m_mutex;
996 
997  //=========================================================================
998  // Private Methods
999  //=========================================================================
1000 
1002  void load_metadata_if_needed() {
1003  if (m_metadata_loaded) return;
1004 
1005  if (m_verbose) fmt::print("[chain] Loading file metadata...\n");
1006 
1007  long total = 0;
1008  for (auto& file : m_files) {
1009  if (!file.is_valid) continue;
1010  if (file.total_events < 0) {
1011  try {
1012  reader temp_reader;
1013  temp_reader.setTags(m_tags);
1014  temp_reader.open(file.filename.c_str());
1015 
1016  file.total_events = temp_reader.getEntries();
1017  file.num_records = temp_reader.getNRecords();
1018  } catch (const std::exception& e) {
1019  file.is_valid = false;
1020  file.error_message = e.what();
1021  file.total_events = 0;
1022  }
1023  }
1024  total += file.total_events;
1025  }
1026  m_stats.total_events = total;
1027  m_metadata_loaded = true;
1028  }
1029 
1030  // Allow ChainIterator to access private members
1031  friend class ChainIterator;
1032 };
1033 
1034 //=============================================================================
1035 // ChainIterator Implementation
1036 //=============================================================================
1037 
1038 inline ChainIterator::ChainIterator(chain* ch, bool at_end)
1039  : m_chain(ch), m_exhausted(at_end) {
1040  if (!at_end && ch && !ch->m_files.empty()) {
1041  if (open_next_file()) {
1042  advance_to_next_event();
1043  } else {
1044  m_exhausted = true;
1045  }
1046  }
1047 }
1048 
1050  if (!m_exhausted) {
1051  m_current_event_idx++;
1052  if (!advance_to_next_event()) {
1053  m_exhausted = true;
1054  }
1055  }
1056  return *this;
1057 }
1058 
1059 inline bool ChainIterator::advance_to_next_event() {
1060  while (m_current_reader) {
1061  if (m_current_reader->next(m_current_event)) {
1062  return true;
1063  }
1064  // Current file exhausted, try next
1065  m_current_file_idx++;
1066  m_current_event_idx = 0;
1067  if (!open_next_file()) {
1068  return false;
1069  }
1070  }
1071  return false;
1072 }
1073 
1074 inline bool ChainIterator::open_next_file() {
1075  while (m_current_file_idx < static_cast<int>(m_chain->m_files.size())) {
1076  const auto& file = m_chain->m_files[m_current_file_idx];
1077  if (!file.is_valid) {
1078  m_current_file_idx++;
1079  continue;
1080  }
1081  try {
1082  m_current_reader = std::make_unique<reader>();
1083  m_current_reader->setTags(m_chain->m_tags);
1084  m_current_reader->open(file.filename.c_str());
1085 
1086  m_current_dict = std::make_unique<dictionary>();
1087  m_current_reader->readDictionary(*m_current_dict);
1088  return true;
1089  } catch (const std::exception& e) {
1090  fmt::print(stderr, "[chain] Error opening {}: {}\n",
1091  file.filename, e.what());
1092  m_current_file_idx++;
1093  }
1094  }
1095  m_current_reader.reset();
1096  m_current_dict.reset();
1097  return false;
1098 }
1099 
1100 } // namespace hipo
Core HIPO data structures: structure, composite, and bank classes for tabular data access.
A ThreadPool implementation for managing asynchronous task execution.
Definition: threadpool.hpp:35
Iterator for traversing events across all files in a chain.
Definition: chain.h:183
std::input_iterator_tag iterator_category
Definition: chain.h:191
void operator++(int)
Definition: chain.h:202
std::ptrdiff_t difference_type
Definition: chain.h:193
EventData operator*()
Definition: chain.h:204
bool operator==(const ChainIterator &other) const noexcept
Definition: chain.h:209
bool operator!=(const ChainIterator &other) const noexcept
Definition: chain.h:216
ChainIterator & operator++()
Definition: chain.h:1049
Represents a HIPO bank, a tabular data structure with rows and typed columns.
Definition: bank.h:352
Event wrapper that provides bank access via dictionary.
Definition: chain.h:142
bank getBank(const std::string &name)
Get a bank by name, reading it from the event.
Definition: chain.h:148
event * raw()
Access underlying event.
Definition: chain.h:161
bank get_bank(const std::string &name)
Alias for getBank (snake_case)
Definition: chain.h:158
const event * raw() const
Definition: chain.h:162
chain_event(event *ev, dictionary *dict)
Definition: chain.h:145
int add(std::vector< std::string > &filenames)
Add multiple files to the chain.
Definition: chain.h:315
ThreadPool & threadpool() noexcept
Get the thread pool for advanced use.
Definition: chain.h:980
iterator end()
Get end iterator.
Definition: chain.h:601
void clear()
Remove all files from the chain.
Definition: chain.h:369
void process_filtered(ProcessFunc &&process_func, const std::vector< std::string > &required_banks, double percentage=100.0)
Process events with bank filtering (record-level parallelism)
Definition: chain.h:751
int get_nb_files() const noexcept
Legacy alias for size()
Definition: chain.h:388
bool empty() const noexcept
Check if chain is empty.
Definition: chain.h:385
const ChainStatistics & statistics() const noexcept
Get processing statistics.
Definition: chain.h:892
std::optional< std::string > get_config(std::string_view name)
Get configuration from first file that has it.
Definition: chain.h:956
void list() const
Print list of files in chain.
Definition: chain.h:570
std::vector< FileInfo > & files() noexcept
Get all file infos.
Definition: chain.h:406
int add(std::string_view filename)
Add a single file to the chain.
Definition: chain.h:296
void show_all_info()
Show detailed info for all files.
Definition: chain.h:916
bool any_has_config(std::string_view name)
Check if any file has a configuration key.
Definition: chain.h:937
void set_progress(bool show)
Enable/disable progress display.
Definition: chain.h:423
void set_verbose(bool verbose)
Enable/disable verbose output.
Definition: chain.h:426
void for_each_file(FileFunc &&func)
Apply a function to each file (for file-level operations)
Definition: chain.h:872
void print_statistics() const
Print processing statistics.
Definition: chain.h:904
long total_events()
Get total event count (loads metadata if needed)
Definition: chain.h:895
void process(ProcessFunc &&process_func, double percentage=100.0)
Process events in parallel across all files (record-level parallelism)
Definition: chain.h:625
void set_tags(const std::vector< long > &tags)
Set event tags for filtering.
Definition: chain.h:415
void scan()
Scan and display detailed information about all files.
Definition: chain.h:496
int add_pattern(std::string_view pattern)
Add files matching a glob pattern.
Definition: chain.h:325
void open(bool validate_all=true)
Validate and optionally load metadata for all files.
Definition: chain.h:437
const FileInfo & file_info(int index) const
Legacy alias for operator[].
Definition: chain.h:401
void set_threads(int n)
Set number of processing threads.
Definition: chain.h:418
const FileInfo & operator[](std::size_t index) const
Get file info by index.
Definition: chain.h:393
iterator begin()
Get iterator to first event.
Definition: chain.h:598
chain(int threads=0, bool progress=true, bool verbose=false)
Construct a chain with specified thread count.
Definition: chain.h:277
friend class ChainIterator
Definition: chain.h:1031
std::size_t size() const noexcept
Number of files in the chain.
Definition: chain.h:382
long total_events_count() const
Legacy alias.
Definition: chain.h:901
Collection of schema definitions, typically read from a HIPO file header.
Definition: dictionary.h:248
schema & getSchema(const char *name)
Retrieve a schema by name.
Definition: dictionary.h:271
Represents a HIPO event, a container for multiple structures/banks.
Definition: event.h:77
void read(hipo::bank &b)
Read a bank from this event (alias for getStructure).
Definition: event.cpp:67
Sequential reader for HIPO files.
Definition: reader.h:250
void setTags(int tag)
Add an event tag to the read filter.
Definition: reader.h:331
int getEntries()
Definition: reader.h:379
int getNRecords() const
Definition: reader.h:368
void open(const char *filename)
Open a HIPO file for reading.
Definition: reader.cpp:81
void about()
Print file information to stdout.
Definition: reader.cpp:72
void readUserConfig(std::map< std::string, std::string > &mapConfig)
Read user configuration key-value pairs from the file header.
Definition: reader.cpp:283
Schema definitions and schema dictionary for HIPO banks.
HIPO event container and manipulation interface.
auto submit(Callable &&callable, Args &&... args) -> std::future< std::invoke_result_t< std::decay_t< Callable >, std::decay_t< Args >... >>
Submits a task for asynchronous execution.
Definition: threadpool.hpp:162
std::recursive_mutex & get_stdout_mutex()
Returns the global recursive mutex used to synchronize stdout access. A recursive mutex is used to al...
Definition: bank.cpp:47
Thread-safe progress tracker with visual progress bar display.
Sequential and random-access reader for HIPO files with event filtering and dictionary support.
HIPO record reading and event extraction.
Configuration settings for progress bar display.
std::string label
Label text displayed before the progress bar (default: "Processing")
bool show_rate
Display processing rate in items per second (default: true)
bool show_eta
Display estimated time to completion (default: true)
Thread-safe statistics for chain processing.
Definition: chain.h:106
std::chrono::steady_clock::time_point end_time
Definition: chain.h:111
double elapsed_seconds() const noexcept
Definition: chain.h:120
std::atomic< long > total_events
Definition: chain.h:107
std::atomic< long > events_processed
Definition: chain.h:108
std::chrono::steady_clock::time_point start_time
Definition: chain.h:110
std::atomic< long > events_skipped
Definition: chain.h:109
double throughput() const noexcept
Definition: chain.h:126
void reset() noexcept
Definition: chain.h:113
Metadata container for a file in the chain.
Definition: chain.h:66
std::string filename
Definition: chain.h:67
bool is_valid
Definition: chain.h:72
int num_records
Definition: chain.h:70
std::string size_string() const
Get human-readable file size string.
Definition: chain.h:85
int index
Definition: chain.h:68
std::uintmax_t file_size
Definition: chain.h:71
FileInfo()=default
long total_events
Definition: chain.h:69
bool metadata_loaded() const noexcept
Check if metadata has been loaded.
Definition: chain.h:80
std::string error_message
Definition: chain.h:73
FileInfo(std::string_view name, int idx)
Definition: chain.h:76
Thread pool implementation for managing asynchronous task execution.