Developing a Distributed Event Store: Avro + Parquet + Java Reflection

Developing a Distributed Event Store: Avro + Parquet + Java Reflection

by Nick Whalen and Ethan Geil

Bitsight collects, stores, and processes billions of security-related events every day. In our last post, we discussed reasons why we're moving our massive event store from HBase to S3. Today we’ll take a close look at one key component to the event store: our data storage format.

Every data store design has to decide upon a serialization format—that is, how records are converted to and from byte streams, in order to be stored on disk or transmitted across the network. That choice can have an enormous impact on performance, reliability, and ease of development and maintenance.

There are countless serialization frameworks, each with various advantages and disadvantages. For Java projects (like ours), the simplest option is to use Java’s built-in serialization. Unfortunately, Java serialization tends to suffer from relatively poor performance (both in time, and in the size of serialized objects), in part because of the extensive type information and checks that are carried around. Frameworks like Kryo attempt to improve upon this by registering all classes to be serialized/deserialized ahead of time and storing type information as a compact enumeration, among other tricks.

Hadoop-ecosystem projects (again, like ours) commonly use Hadoop Writable serialization (in fact, this is what we previously used for much of our data storage). Writable serialization is fast and efficient, but very low-level: each class must provide a matched pair of methods, write() and readFields(), which sequentially serialize/deserialize each of the class’s fields, either as primitive types, or by delegating to the field type’s own write/readFields methods. The two methods must (de)serialize fields in exactly the same order, and it is surprisingly easy to introduce subtle bugs in them—especially because (again, for performance) objects are often recycled, rather than being created from scratch for each new record. Any changes to the class require developers to carefully make matching updates to write/readFields. Worse, updating the class typically makes it impossible to read records serialized using the old version, so it’s necessary to rewrite the entire data store.

Data serialized using any of the above Java-class-based methods requires access to the original Java classes to read, which hampers interoperability with other analysis tools, and creates difficulties for non-developers (e.g. data scientists). At the opposite end of the spectrum are text-based formats like JSON or XML. Libraries such as Jackson and Gson make it relatively easy to (de)serialize Java objects to/from JSON, and a vast number of tools and languages can ingest JSON. Unfortunately, JSON-serialized objects tend to be large in size (although compression can help), and parsing JSON is relatively slow. Furthermore, the lack of an enforced schema leads to complex validation logic in code that reads JSON.

Frameworks like Avro, Thrift, and Protocol Buffers attempt to find a middle ground by efficiently encoding primitive objects in a binary format, but also including a well-defined, strongly-typed object model (roughly, a schema), which allows for interoperability between various languages and tools.

In addition to an object serialization format, it’s also necessary to define a file format—that is, how collections of objects are assembled into a physical file. The naive solution is to simply write out one object after another, but this is not necessarily optimal: for one, accessing any field of any object requires reading the entire file.

Apache Parquet—our format of choice—is a much more sophisticated solution. It stores records in a columnar format: all the values of a particular field, or column, of a record group are serialized together. This has two major advantages: First, queries are often interested in only a subset of fields, and a columnar format makes it possible to efficiently read just those, rather than the entire file. Second, grouping by column tends to improve compression, because similar objects (often with small or zero deltas from each other) are stored together. Parquet files include a schema definition, and so are self-describing and readable anywhere; support is available in a large (and growing) set of tools, including Spark SQL, Impala, and even Python.

Parquet doesn’t specify a particular object model, but comes with out-of-the-box support for Thrift and Avro schemas via the parquet-mr module; we chose Avro because of its better support for schema evolution. Initially, we used the provided AvroParquetWriter to convert our Java objects to Parquet records, but there were several downsides to that approach.

First, it requires all objects to implement the IndexedRecord interface:

Interface IndexedRecord

Object get(int i) // Get value of indexed fieldvoid

void put(int i, Object v) // Set value of indexed field

The class’s fields must be numbered (indexed) and accessed by number. This suffers from the same sort of difficulties as Hadoop Writable serialization; it requires the get and put methods to be kept in sync with each other and with the class’s fields, and it makes schema evolution difficult. It’s also necessary to store the Avro schema (which is encoded in JSON) somewhere, either as a static variable or in a separate file, and to keep this up-to-date with get/put. Avro includes a tool for automatically generating the schema and get/put, but we found this cumbersome to use, especially in the presence of class hierarchies (necessary for much of our existing business logic). For example, generated Java code puts all inherited fields into the child class to make them accessible, which destroys encapsulation. Additionally, the generated code almost always needed hand-editing to work well. All of this made changing definitions a lengthy and error-prone process.

We eventually settled on a different approach: we dynamically generate the class schemas at runtime, using Avro's ReflectData, and we use Java reflection to get/set fields by name, rather than by index or sequentially. We thus avoid the need for code generation, and our Java classes need only light annotation (e.g. marking fields as nullable), and don’t have to implement any particular interface. Schema evolution is straightforward; as long as newly-added fields are nullable, no special effort is required (removed fields are simply ignored upon deserialization). Since Parquet files already include a schema definition, we don’t have to maintain a separate schema anywhere.

Java reflection has a (deserved) reputation for being slow. We work around this, however, by simply caching the field names and accessors the first time that particular class is encountered during (de)serialization. Informal benchmarking of complex objects indicated that, to our surprise, reflection-based serialization was almost 10% faster than using IndexedRecords.

This approach did require some up-front work. We had to implement our own AvroWriteSupport, including support for serializing types with no direct Avro equivalents (such as java.net.InetAddress). The effort was worthwhile, however; we’ve found the combination of Java reflection, Avro, and Parquet to be a powerful, efficient, and relatively developer-friendly data storage framework.