Parallel Processing¶
The chain class provides multi-file parallel processing with record-level parallelism.
Basic Parallel Processing¶
hipo::chain ch(4, true, true); // 4 threads, progress bar, verbose
// Add files
ch.add("file1.hipo");
ch.add("file2.hipo");
// or use glob patterns
ch.add_pattern("data/*.hipo");
// Process in parallel
ch.process([](auto& event, int file_idx, long event_idx) {
auto particles = event.getBank("REC::Particle");
int nrows = particles.getRows();
// thread-safe processing...
});
Record-Level Parallelism¶
Each thread grabs entire records and processes all events within them sequentially. This minimizes random file access and maximizes cache efficiency.
Thread 0: [Record 0] -> events 0-999
Thread 1: [Record 1] -> events 1000-1999
Thread 2: [Record 2] -> events 2000-2999
Thread 3: [Record 3] -> events 3000-3999
...
Filtered Processing¶
Only process events that contain all required banks:
ch.process_filtered([](auto& event, int file_idx, long event_idx) {
auto parts = event.getBank("REC::Particle");
auto calos = event.getBank("REC::Calorimeter");
// both banks guaranteed to have rows > 0
}, {"REC::Particle", "REC::Calorimeter"});
Percentage Processing¶
Process only a fraction of events:
// Process 10% of events
ch.process([](auto& event, int file_idx, long event_idx) {
// ...
}, 10.0);
Sequential Iteration¶
For non-parallel processing, use range-based iteration:
for (auto& [event, file_idx, event_idx] : ch) {
auto particles = event.getBank("REC::Particle");
// single-threaded processing
}
Thread Safety¶
Important
The callback function passed to process() must be thread-safe. Each thread has its own reader, dictionary, record, and event objects, but any shared state (histograms, counters, output files) must be synchronized.
Safe Patterns¶
Thread-local accumulation:
std::mutex histMutex;
TH1F* histogram = new TH1F("h", "title", 100, 0, 10);
ch.process([&](auto& event, int file_idx, long event_idx) {
auto parts = event.getBank("REC::Particle");
for (int row = 0; row < parts.getRows(); row++) {
float px = parts.getFloat("px", row);
std::lock_guard lock(histMutex);
histogram->Fill(px);
}
});
Atomic counters:
std::atomic<long> electronCount{0};
ch.process([&](auto& event, int file_idx, long event_idx) {
auto parts = event.getBank("REC::Particle");
for (int row = 0; row < parts.getRows(); row++) {
if (parts.getInt("pid", row) == 11) {
electronCount++;
}
}
});
printf("Total electrons: %ld\n", electronCount.load());
Scanning Files¶
Get metadata about all files in the chain:
ch.scan(); // displays progress bar, file info, summary
ch.list(); // print file list
Statistics¶
ch.process(myFunc);
auto& stats = ch.statistics();
printf("Processed: %ld events in %.2f seconds\n",
stats.events_processed.load(),
stats.elapsed_seconds());
printf("Throughput: %.1f events/sec\n", stats.throughput());
File-Level Operations¶
ch.for_each_file([](hipo::reader& r, const hipo::FileInfo& info) {
printf("File %s: %ld events, %s\n",
info.filename.c_str(),
info.total_events,
info.size_string().c_str());
});