Skip to content

Parallel Processing

The chain class provides multi-file parallel processing with record-level parallelism.

Basic Parallel Processing

hipo::chain ch(4, true, true);  // 4 threads, progress bar, verbose

// Add files
ch.add("file1.hipo");
ch.add("file2.hipo");
// or use glob patterns
ch.add_pattern("data/*.hipo");

// Process in parallel
ch.process([](auto& event, int file_idx, long event_idx) {
    auto particles = event.getBank("REC::Particle");
    int nrows = particles.getRows();
    // thread-safe processing...
});

Record-Level Parallelism

Each thread grabs entire records and processes all events within them sequentially. This minimizes random file access and maximizes cache efficiency.

Thread 0: [Record 0] -> events 0-999
Thread 1: [Record 1] -> events 1000-1999
Thread 2: [Record 2] -> events 2000-2999
Thread 3: [Record 3] -> events 3000-3999
...

Filtered Processing

Only process events that contain all required banks:

ch.process_filtered([](auto& event, int file_idx, long event_idx) {
    auto parts = event.getBank("REC::Particle");
    auto calos = event.getBank("REC::Calorimeter");
    // both banks guaranteed to have rows > 0
}, {"REC::Particle", "REC::Calorimeter"});

Percentage Processing

Process only a fraction of events:

// Process 10% of events
ch.process([](auto& event, int file_idx, long event_idx) {
    // ...
}, 10.0);

Sequential Iteration

For non-parallel processing, use range-based iteration:

for (auto& [event, file_idx, event_idx] : ch) {
    auto particles = event.getBank("REC::Particle");
    // single-threaded processing
}

Thread Safety

Important

The callback function passed to process() must be thread-safe. Each thread has its own reader, dictionary, record, and event objects, but any shared state (histograms, counters, output files) must be synchronized.

Safe Patterns

Thread-local accumulation:

std::mutex histMutex;
TH1F* histogram = new TH1F("h", "title", 100, 0, 10);

ch.process([&](auto& event, int file_idx, long event_idx) {
    auto parts = event.getBank("REC::Particle");
    for (int row = 0; row < parts.getRows(); row++) {
        float px = parts.getFloat("px", row);
        std::lock_guard lock(histMutex);
        histogram->Fill(px);
    }
});

Atomic counters:

std::atomic<long> electronCount{0};

ch.process([&](auto& event, int file_idx, long event_idx) {
    auto parts = event.getBank("REC::Particle");
    for (int row = 0; row < parts.getRows(); row++) {
        if (parts.getInt("pid", row) == 11) {
            electronCount++;
        }
    }
});

printf("Total electrons: %ld\n", electronCount.load());

Scanning Files

Get metadata about all files in the chain:

ch.scan();   // displays progress bar, file info, summary
ch.list();   // print file list

Statistics

ch.process(myFunc);

auto& stats = ch.statistics();
printf("Processed: %ld events in %.2f seconds\n",
       stats.events_processed.load(),
       stats.elapsed_seconds());
printf("Throughput: %.1f events/sec\n", stats.throughput());

File-Level Operations

ch.for_each_file([](hipo::reader& r, const hipo::FileInfo& info) {
    printf("File %s: %ld events, %s\n",
           info.filename.c_str(),
           info.total_events,
           info.size_string().c_str());
});