44 #include <string_view>
47 #include <fmt/format.h>
87 return fmt::format(
"{:.1f} GB",
file_size / (1024.0 * 1024 * 1024));
89 return fmt::format(
"{:.1f} MB",
file_size / (1024.0 * 1024));
91 return fmt::format(
"{:.1f} KB",
file_size / 1024.0);
117 start_time = std::chrono::steady_clock::now();
121 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
123 return elapsed.count() / 1000.0;
149 if (!m_event || !m_dict) {
150 throw std::runtime_error(
"Invalid chain_event (no event data)");
161 event*
raw() {
return m_event; }
162 const event*
raw()
const {
return m_event; }
165 explicit operator bool()
const {
return m_event !=
nullptr && m_dict !=
nullptr; }
205 return {
chain_event(&m_current_event, m_current_dict.get()),
206 m_current_file_idx, m_current_event_idx};
210 if (m_exhausted && other.m_exhausted)
return true;
211 if (m_exhausted != other.m_exhausted)
return false;
212 return m_current_file_idx == other.m_current_file_idx &&
213 m_current_event_idx == other.m_current_event_idx;
217 return !(*
this == other);
222 std::unique_ptr<reader> m_current_reader;
223 std::unique_ptr<dictionary> m_current_dict;
224 event m_current_event;
225 int m_current_file_idx{0};
226 long m_current_event_idx{0};
227 bool m_exhausted{
false};
229 bool advance_to_next_event();
230 bool open_next_file();
277 explicit chain(
int threads = 0,
bool progress =
true,
bool verbose =
false)
278 : m_num_threads(threads == 0 ? static_cast<int>(std::thread::hardware_concurrency()) : threads),
279 m_show_progress(progress),
281 m_thread_pool(m_num_threads,
"ChainWorker") {
283 fmt::print(
"[chain] Initialized with {} threads\n", m_num_threads);
296 int add(std::string_view filename) {
297 if (!std::filesystem::exists(filename)) {
298 fmt::print(stderr,
"[chain] Warning: File not found: {}\n", filename);
299 return static_cast<int>(m_files.size());
301 std::lock_guard lock(m_mutex);
302 m_files.emplace_back(filename,
static_cast<int>(m_files.size()));
303 m_metadata_loaded =
false;
305 fmt::print(
"[chain] Added file {}: {}\n", m_files.size(), filename);
307 return static_cast<int>(m_files.size());
315 int add(std::vector<std::string>& filenames) {
316 for (
const auto& f : filenames)
add(f);
317 return static_cast<int>(m_files.size());
327 std::filesystem::path pattern_path(pattern);
328 std::filesystem::path dir = pattern_path.parent_path();
329 if (dir.empty()) dir =
".";
331 std::string filename_pattern = pattern_path.filename().string();
332 std::string regex_pattern = std::regex_replace(filename_pattern,
333 std::regex(R
"(\*)"), ".*");
334 regex_pattern = std::regex_replace(regex_pattern,
335 std::regex(R
"(\?)"), ".");
338 std::regex file_regex(regex_pattern);
339 std::vector<std::string> matched_files;
341 for (
const auto& entry : std::filesystem::directory_iterator(dir)) {
342 if (entry.is_regular_file()) {
343 std::string fname = entry.path().filename().string();
344 if (std::regex_match(fname, file_regex)) {
345 matched_files.push_back(entry.path().string());
351 std::sort(matched_files.begin(), matched_files.end());
352 for (
const auto& f : matched_files) {
356 }
catch (
const std::exception& e) {
357 fmt::print(stderr,
"[chain] Error scanning directory: {}\n", e.what());
361 fmt::print(
"[chain] Pattern '{}' matched {} files\n", pattern, added);
370 std::lock_guard lock(m_mutex);
373 m_metadata_loaded =
false;
374 if (m_verbose) fmt::print(
"[chain] Cleared all files\n");
382 [[nodiscard]] std::size_t
size() const noexcept {
return m_files.size(); }
385 [[nodiscard]]
bool empty() const noexcept {
return m_files.empty(); }
389 return static_cast<int>(m_files.size());
394 if (index >= m_files.size()) {
395 throw std::out_of_range(
"chain: File index out of range");
397 return m_files[index];
402 return (*
this)[
static_cast<std::size_t
>(index)];
406 [[nodiscard]] std::vector<FileInfo>&
files() noexcept {
415 void set_tags(
const std::vector<long>& tags) { m_tags = tags; }
419 m_num_threads = (n == 0) ?
static_cast<int>(std::thread::hardware_concurrency()) : n;
437 void open(
bool validate_all =
true) {
438 if (m_files.empty()) {
439 throw std::runtime_error(
"chain: No files added to chain");
443 fmt::print(
"[chain] Validating {} files...\n", m_files.size());
449 for (
auto& file : m_files) {
450 if (!std::filesystem::exists(file.filename)) {
451 file.is_valid =
false;
452 file.error_message =
"File not found";
456 file.file_size = std::filesystem::file_size(file.filename);
462 temp_reader.
open(file.filename.c_str());
470 fmt::print(
" [{}] {} - OK ({} events)\n",
471 file.index, file.filename, file.total_events);
473 }
catch (
const std::exception& e) {
474 file.is_valid =
false;
475 file.error_message = e.what();
483 m_metadata_loaded = validate_all;
486 fmt::print(
"[chain] Valid files: {}/{}, Total events: {}\n",
490 if (valid_count == 0) {
491 throw std::runtime_error(
"chain: No valid files in chain");
497 if (m_files.empty()) {
498 fmt::print(
"[chain] No files in chain\n");
502 std::unique_ptr<ProgressTracker> progress;
503 if (m_show_progress) {
505 config.
label =
"Scanning files";
507 progress = std::make_unique<ProgressTracker>(m_files.size(), config);
512 int total_records = 0;
513 std::uintmax_t total_size = 0;
516 for (
auto& file : m_files) {
518 if (!std::filesystem::exists(file.filename)) {
520 fmt::print(
" [{}] {} - FILE NOT FOUND\n", file.index, file.filename);
524 file.file_size = std::filesystem::file_size(file.filename);
525 total_size += file.file_size;
529 temp_reader.
open(file.filename.c_str());
533 total_records += file.num_records;
539 fmt::print(
" [{}] {}\n Records: {}, Events: {}, Size: {}\n",
540 file.index, file.filename, file.num_records,
541 file.total_events, file.size_string());
544 }
catch (
const std::exception& e) {
546 fmt::print(
" [{}] {} - ERROR: {}\n", file.index, file.filename, e.what());
549 if (progress) progress->increment();
552 if (progress) progress->finish();
554 fmt::print(
"\nScan Summary\n");
555 fmt::print(
" Total files: {}\n", m_files.size());
556 fmt::print(
" Valid files: {}\n", valid_files);
557 fmt::print(
" Total records: {}\n", total_records);
562 fmt::print(
" Total size: {}\n", dummy.
size_string());
564 if (valid_files > 0) {
565 fmt::print(
" Avg events/file: {}\n",
total_events / valid_files);
571 fmt::print(
"Chain file list ({} files)\n", m_files.size());
572 for (
const auto& file : m_files) {
573 fmt::print(
" [{}] {}", file.index, file.filename);
574 if (file.metadata_loaded()) {
575 fmt::print(
" ({} events, {} records)", file.total_events, file.num_records);
577 if (!file.is_valid) {
578 fmt::print(
" [INVALID: {}]", file.error_message);
624 template<
typename ProcessFunc>
625 void process(ProcessFunc&& process_func,
double percentage = 100.0) {
626 if (m_files.empty()) {
627 fmt::print(stderr,
"[chain] Warning: No files to process.\n");
631 percentage = std::clamp(percentage, 0.0, 100.0) / 100.0;
634 load_metadata_if_needed();
636 long target_events =
static_cast<long>(m_stats.
total_events * percentage);
644 std::vector<WorkItem> work_list;
645 long events_accumulated = 0;
647 for (
const auto& file : m_files) {
648 if (!file.is_valid || events_accumulated >= target_events)
continue;
649 long events_needed = target_events - events_accumulated;
650 long events_from_file = std::min(events_needed, file.total_events);
651 if (events_from_file > 0) {
652 work_list.push_back({file.index, file.num_records, events_from_file});
653 events_accumulated += events_from_file;
661 fmt::print(
"[chain] Processing {} events from {} files ({} threads, record-level parallelism)\n",
662 events_accumulated, work_list.size(), m_num_threads);
665 std::unique_ptr<ProgressTracker> progress;
668 config.
label = fmt::format(
"Processing ({:.1f}%)", percentage * 100.0);
671 progress = std::make_unique<ProgressTracker>(m_stats.
total_events, config);
676 for (
const auto& work : work_list) {
677 const auto& file = m_files[work.file_idx];
678 std::atomic<int> next_record{0};
679 std::atomic<long> events_processed_in_file{0};
680 std::vector<std::future<void>> futures;
682 for (
int t = 0; t < m_num_threads; t++) {
683 futures.push_back(m_thread_pool.
submit([&,
this]() {
687 file_reader.setTags(m_tags);
688 file_reader.open(file.filename.c_str());
691 file_reader.readDictionary(dict);
698 int rec_idx = next_record.fetch_add(1);
699 if (rec_idx >= work.num_records) break;
702 if (events_processed_in_file.load() >= work.max_events) break;
705 if (!file_reader.loadRecord(rec, rec_idx)) continue;
707 int events_in_record = rec.getEventCount();
710 for (int evt_in_rec = 0; evt_in_rec < events_in_record; evt_in_rec++) {
712 long current_count = events_processed_in_file.fetch_add(1);
713 if (current_count >= work.max_events) {
714 events_processed_in_file.fetch_sub(1);
718 rec.readHipoEvent(temp_event, evt_in_rec);
719 chain_event evt(&temp_event, &dict);
720 process_func(evt, file.index, current_count);
721 m_stats.events_processed++;
722 if (progress) progress->increment();
725 } catch (
const std::exception& e) {
727 fmt::print(stderr,
"[chain] Thread error: {}\n", e.what());
732 for (
auto& f : futures) f.get();
735 m_stats.end_time = std::chrono::steady_clock::now();
736 if (progress) progress->finish();
750 template<
typename ProcessFunc>
752 const std::vector<std::string>& required_banks,
753 double percentage = 100.0) {
754 if (m_files.empty()) {
755 fmt::print(stderr,
"[chain] Warning: No files to process.\n");
759 percentage = std::clamp(percentage, 0.0, 100.0) / 100.0;
760 load_metadata_if_needed();
762 long target_events =
static_cast<long>(m_stats.total_events * percentage);
764 m_stats.total_events = target_events;
767 fmt::print(
"[chain] Filtered processing: requiring banks [");
768 for (
size_t i = 0; i < required_banks.size(); i++) {
769 fmt::print(
"{}{}", required_banks[i],
770 i < required_banks.size() - 1 ?
", " :
"");
772 fmt::print(
"] (record-level parallelism)\n");
775 std::unique_ptr<ProgressTracker> progress;
776 if (m_show_progress && target_events > 0) {
778 config.
label =
"Filtered processing";
780 progress = std::make_unique<ProgressTracker>(target_events, config);
784 std::atomic<long> events_found{0};
786 for (
const auto& file : m_files) {
787 if (!file.is_valid || events_found >= target_events)
continue;
789 std::atomic<int> next_record{0};
790 std::vector<std::future<void>> futures;
792 for (
int t = 0; t < m_num_threads && events_found < target_events; t++) {
793 futures.push_back(m_thread_pool.submit([&,
this]() {
796 file_reader.setTags(m_tags);
797 file_reader.open(file.filename.c_str());
800 file_reader.readDictionary(dict);
803 std::vector<bank> banks;
804 banks.reserve(required_banks.size());
805 for (const auto& name : required_banks) {
806 banks.emplace_back(dict.getSchema(name.c_str()));
812 while (events_found < target_events) {
814 int rec_idx = next_record.fetch_add(1);
815 if (rec_idx >= file.num_records) break;
818 if (!file_reader.loadRecord(rec, rec_idx)) continue;
820 int events_in_record = rec.getEventCount();
823 for (int evt_in_rec = 0; evt_in_rec < events_in_record; evt_in_rec++) {
824 if (events_found >= target_events) break;
826 rec.readHipoEvent(temp_event, evt_in_rec);
829 bool has_all_banks = true;
830 for (auto& b : banks) {
832 if (b.getRows() == 0) {
833 has_all_banks = false;
839 long evt_count = events_found.fetch_add(1);
840 if (evt_count >= target_events) break;
842 chain_event evt(&temp_event, &dict);
843 process_func(evt, file.index, evt_count);
844 m_stats.events_processed++;
845 if (progress) progress->increment();
849 } catch (
const std::exception& e) {
851 fmt::print(stderr,
"[chain] Thread error: {}\n", e.what());
855 for (
auto& f : futures) f.get();
858 m_stats.end_time = std::chrono::steady_clock::now();
859 if (progress) progress->finish();
861 fmt::print(
"[chain] Found {} matching events\n", events_found.load());
871 template<
typename FileFunc>
873 for (
const auto& file : m_files) {
874 if (!file.is_valid)
continue;
878 file_reader.
open(file.filename.c_str());
879 func(file_reader, file);
880 }
catch (
const std::exception& e) {
881 fmt::print(stderr,
"[chain] Error processing {}: {}\n",
882 file.filename, e.what());
896 load_metadata_if_needed();
897 return m_stats.total_events;
905 fmt::print(
"\nChain Statistics\n");
906 fmt::print(
" Files: {}\n", m_files.size());
907 fmt::print(
" Total events: {}\n", m_stats.total_events.load());
908 fmt::print(
" Events processed: {}\n", m_stats.events_processed.load());
909 fmt::print(
" Processing time: {:.2f} seconds\n", m_stats.elapsed_seconds());
910 if (m_stats.elapsed_seconds() > 0) {
911 fmt::print(
" Throughput: {:.1f} events/sec\n", m_stats.throughput());
917 fmt::print(
"\nChain File Info ({} files)\n", m_files.size());
918 fmt::print(
"================\n");
919 for (
const auto& file : m_files) {
920 fmt::print(
"\nFile [{}]: {}\n", file.index, file.filename);
924 temp_reader.
open(file.filename.c_str());
926 }
catch (
const std::exception& e) {
927 fmt::print(
" Error: {}\n", e.what());
938 for (
const auto& file : m_files) {
939 if (!file.is_valid)
continue;
943 temp_reader.
open(file.filename.c_str());
945 std::map<std::string, std::string> config;
947 if (config.find(std::string(name)) != config.end()) {
956 [[nodiscard]] std::optional<std::string>
get_config(std::string_view name) {
957 for (
const auto& file : m_files) {
958 if (!file.is_valid)
continue;
962 temp_reader.
open(file.filename.c_str());
964 std::map<std::string, std::string> config;
966 auto it = config.find(std::string(name));
967 if (it != config.end()) {
987 std::vector<FileInfo> m_files;
988 std::vector<long> m_tags;
990 bool m_show_progress;
992 bool m_metadata_loaded{
false};
994 ChainStatistics m_stats;
995 mutable std::mutex m_mutex;
1002 void load_metadata_if_needed() {
1003 if (m_metadata_loaded)
return;
1005 if (m_verbose) fmt::print(
"[chain] Loading file metadata...\n");
1008 for (
auto& file : m_files) {
1009 if (!file.is_valid)
continue;
1010 if (file.total_events < 0) {
1013 temp_reader.setTags(m_tags);
1014 temp_reader.open(file.filename.c_str());
1016 file.total_events = temp_reader.getEntries();
1017 file.num_records = temp_reader.getNRecords();
1018 }
catch (
const std::exception& e) {
1019 file.is_valid =
false;
1020 file.error_message = e.what();
1021 file.total_events = 0;
1024 total += file.total_events;
1026 m_stats.total_events = total;
1027 m_metadata_loaded =
true;
1038 inline ChainIterator::ChainIterator(
chain* ch,
bool at_end)
1039 : m_chain(ch), m_exhausted(at_end) {
1040 if (!at_end && ch && !ch->m_files.empty()) {
1041 if (open_next_file()) {
1042 advance_to_next_event();
1051 m_current_event_idx++;
1052 if (!advance_to_next_event()) {
1059 inline bool ChainIterator::advance_to_next_event() {
1060 while (m_current_reader) {
1061 if (m_current_reader->next(m_current_event)) {
1065 m_current_file_idx++;
1066 m_current_event_idx = 0;
1067 if (!open_next_file()) {
1074 inline bool ChainIterator::open_next_file() {
1075 while (m_current_file_idx <
static_cast<int>(m_chain->m_files.size())) {
1076 const auto& file = m_chain->m_files[m_current_file_idx];
1077 if (!file.is_valid) {
1078 m_current_file_idx++;
1082 m_current_reader = std::make_unique<reader>();
1083 m_current_reader->setTags(m_chain->m_tags);
1084 m_current_reader->open(file.filename.c_str());
1086 m_current_dict = std::make_unique<dictionary>();
1087 m_current_reader->readDictionary(*m_current_dict);
1089 }
catch (
const std::exception& e) {
1090 fmt::print(stderr,
"[chain] Error opening {}: {}\n",
1091 file.filename, e.what());
1092 m_current_file_idx++;
1095 m_current_reader.reset();
1096 m_current_dict.reset();
Core HIPO data structures: structure, composite, and bank classes for tabular data access.
A ThreadPool implementation for managing asynchronous task execution.
Iterator for traversing events across all files in a chain.
std::input_iterator_tag iterator_category
std::ptrdiff_t difference_type
bool operator==(const ChainIterator &other) const noexcept
bool operator!=(const ChainIterator &other) const noexcept
ChainIterator & operator++()
Represents a HIPO bank, a tabular data structure with rows and typed columns.
Event wrapper that provides bank access via dictionary.
bank getBank(const std::string &name)
Get a bank by name, reading it from the event.
event * raw()
Access underlying event.
bank get_bank(const std::string &name)
Alias for getBank (snake_case)
const event * raw() const
chain_event(event *ev, dictionary *dict)
int add(std::vector< std::string > &filenames)
Add multiple files to the chain.
ThreadPool & threadpool() noexcept
Get the thread pool for advanced use.
iterator end()
Get end iterator.
void clear()
Remove all files from the chain.
void process_filtered(ProcessFunc &&process_func, const std::vector< std::string > &required_banks, double percentage=100.0)
Process events with bank filtering (record-level parallelism)
int get_nb_files() const noexcept
Legacy alias for size()
bool empty() const noexcept
Check if chain is empty.
const ChainStatistics & statistics() const noexcept
Get processing statistics.
std::optional< std::string > get_config(std::string_view name)
Get configuration from first file that has it.
void list() const
Print list of files in chain.
std::vector< FileInfo > & files() noexcept
Get all file infos.
int add(std::string_view filename)
Add a single file to the chain.
void show_all_info()
Show detailed info for all files.
bool any_has_config(std::string_view name)
Check if any file has a configuration key.
void set_progress(bool show)
Enable/disable progress display.
void set_verbose(bool verbose)
Enable/disable verbose output.
void for_each_file(FileFunc &&func)
Apply a function to each file (for file-level operations)
void print_statistics() const
Print processing statistics.
long total_events()
Get total event count (loads metadata if needed)
void process(ProcessFunc &&process_func, double percentage=100.0)
Process events in parallel across all files (record-level parallelism)
void set_tags(const std::vector< long > &tags)
Set event tags for filtering.
void scan()
Scan and display detailed information about all files.
int add_pattern(std::string_view pattern)
Add files matching a glob pattern.
void open(bool validate_all=true)
Validate and optionally load metadata for all files.
const FileInfo & file_info(int index) const
Legacy alias for operator[].
void set_threads(int n)
Set number of processing threads.
const FileInfo & operator[](std::size_t index) const
Get file info by index.
iterator begin()
Get iterator to first event.
chain(int threads=0, bool progress=true, bool verbose=false)
Construct a chain with specified thread count.
friend class ChainIterator
std::size_t size() const noexcept
Number of files in the chain.
long total_events_count() const
Legacy alias.
Collection of schema definitions, typically read from a HIPO file header.
schema & getSchema(const char *name)
Retrieve a schema by name.
Represents a HIPO event, a container for multiple structures/banks.
void read(hipo::bank &b)
Read a bank from this event (alias for getStructure).
Sequential reader for HIPO files.
void setTags(int tag)
Add an event tag to the read filter.
void open(const char *filename)
Open a HIPO file for reading.
void about()
Print file information to stdout.
void readUserConfig(std::map< std::string, std::string > &mapConfig)
Read user configuration key-value pairs from the file header.
Schema definitions and schema dictionary for HIPO banks.
HIPO event container and manipulation interface.
auto submit(Callable &&callable, Args &&... args) -> std::future< std::invoke_result_t< std::decay_t< Callable >, std::decay_t< Args >... >>
Submits a task for asynchronous execution.
std::recursive_mutex & get_stdout_mutex()
Returns the global recursive mutex used to synchronize stdout access. A recursive mutex is used to al...
Thread-safe progress tracker with visual progress bar display.
Sequential and random-access reader for HIPO files with event filtering and dictionary support.
HIPO record reading and event extraction.
Configuration settings for progress bar display.
std::string label
Label text displayed before the progress bar (default: "Processing")
bool show_rate
Display processing rate in items per second (default: true)
bool show_eta
Display estimated time to completion (default: true)
Thread-safe statistics for chain processing.
std::chrono::steady_clock::time_point end_time
double elapsed_seconds() const noexcept
std::atomic< long > total_events
std::atomic< long > events_processed
std::chrono::steady_clock::time_point start_time
std::atomic< long > events_skipped
double throughput() const noexcept
Metadata container for a file in the chain.
std::string size_string() const
Get human-readable file size string.
bool metadata_loaded() const noexcept
Check if metadata has been loaded.
std::string error_message
FileInfo(std::string_view name, int idx)
Thread pool implementation for managing asynchronous task execution.