Architecture¶
Layer Overview¶
| Layer | Components |
|---|---|
| User-Facing | chain, hipoeventfile, fusion, reaction, tuple |
| I/O | reader, writer, readerIndex, readerstream |
| Data Model | event, bank, structure, composite, node |
| Schema | dictionary, schema, schemaEntry_t |
| Record | record, recordbuilder, dataframe |
| Infrastructure | datastream, ThreadPool, ProgressTracker, benchmark, Parser, utils, json/jsonutil |
Data Flow: Reading¶
flowchart TB
A[File on disk] --> B["reader.open — read 72 B file header"]
B --> C["readDictionary — load schemas from first record"]
C --> D["readerIndex — record offsets + cumulative event counts"]
D --> E["reader.next — advance to next event"]
E --> F["record — LZ4 decompress on demand"]
F --> G["event.read(bank) — scan structures by (group, item)"]
G --> H["bank.getInt / bank.getFloat — typed columnar access"]
Detailed Read Pipeline¶
-
reader.open(filename)- Opens file via
datastream(localifstreamor XRootD) - Reads 80 bytes from offset 0, detects endianness via magic number
- Seeks to
trailerPosition, reads the trailer record, populatesreaderIndexwith record positions and cumulative event counts
- Opens file via
-
reader.readDictionary(dict)- Reads the first record (immediately after file header)
- Extracts structures with (group=120, item=2) -- each is a schema string
- Parses each schema string into the dictionary
-
reader.next()- Advances
readerIndexto the next event - If the event is in a different record: loads and decompresses the new record via LZ4
- If the event is in the same record: no disk I/O (record already decompressed)
- Extracts the current event from the decompressed record buffer
- Advances
-
event.read(bank)- Linear scan through the event buffer, matching structure headers by (group, item)
- Copies matching structure into the bank's internal buffer
- Sets row count from
totalSize / rowLength
-
bank.getFloat("px", row)- Resolves column name to index via schema's name-to-index map
- Computes byte offset:
columnOffset * nrows + row * typeSize - Reads bytes from the bank's data buffer
Lazy Record Loading
Records are loaded on demand. If next() is called 1000 times and all events are in the same record, only one disk read + decompression occurs.
Data Flow: Writing¶
flowchart TB
A["User builds schema + bank with data"] --> B["event.addStructure(bank) — serialize into event buffer"]
B --> C["writer.addEvent(event)"]
C --> D["recordbuilder accumulates events"]
D --> E{"Full? (100k events OR 8 MB)"}
E -- no --> C
E -- yes --> F["LZ4 compress"]
F --> G["Flush compressed record to disk"]
G --> C
C --> H["writer.close — flush remainder, write trailer index"]
Detailed Write Pipeline¶
- Schema registration -- add schemas to
writer.getDictionary()beforeopen() writer.open(filename)-- builds dictionary record, writes file header + dictionarywriter.addEvent(event)-- routes event torecordBuilder; flushes when fullwriter.close()-- flushes remaining events, writes trailer with record index, patches file header with trailer position
Record Flushing
A record is flushed when either the event count reaches 100,000 or accumulated data reaches 8 MB, whichever comes first.
I/O Abstraction¶
The datastream class hierarchy abstracts file I/O:
datastream (abstract base)
+-- datastreamLocalFile wraps std::ifstream
+-- datastreamXrootd wraps XrdClient (compile with -D__XROOTD__)
This allows the reader to work identically with local files and remote XRootD files.
Design Patterns¶
| Pattern | Usage |
|---|---|
| Columnar Storage (SOA) | Banks store data column-major for cache-friendly access |
| Flyweight / Reusable Buffers | bank objects are reused across events; event.read(bank) fills in-place |
| Builder | recordbuilder accumulates events before compression/flush |
| Iterator | hipoeventfile, chain, bank::rowlist support range-based for loops |
| Strategy | datastream abstracts local vs. XRootD file access |