BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Columnar Databases and Vectorization

Columnar Databases and Vectorization

Key Takeaways

  • Columnar databases help with Online Analytical Processing (OLAP) workloads since the queries touch across a subset of columns but a large number of rows for those columns.
  • Columnar storage format allows us to use several lightweight compression algorithms on a per column basis.
  • Vectorized data processing helps with developing faster analytical query engines by making efficient utilization of CPU cache.
  • Arrow's columnar format allows to use lightweight schemes like dictionary encoding, bit packing, and run length encoding, which favor query performance over compression ratio

A columnar database organizes the values for a given column contiguously on disk or in-memory. Column oriented storage format benefits Online Analytical Processing (OLAP) workloads since these workloads have queries that touch across a subset of columns but a large number of rows for those columns. Using columnar format for such queries significantly reduces the amount of data transferred from disk into memory and subsequently from memory into registers. This results in efficient utilization of bandwidth throughout the storage hierarchy. Furthermore, columnar format allows us to use several lightweight compression algorithms on a per column basis. Compression algorithms perform better in such cases since the input data to the compression engine is of the same type and is likely to be compressed better and faster.

Since it was popularized by MonetDB-X100 (Vectorwise), vectorized processing is the standard for speeding up data processing on modern hardware for building highly efficient analytical query engines. This model requires columnar representation of the data to write highly optimized query processing algorithms. Vectorized processing is a significant departure from conventional tuple based query processing model. Rewriting query processing algorithms to work on columns as opposed to rows/tuples is the key difference between the two approaches. A column is stored in a contiguous fashion and the in-memory representation is generally referred to as a vector which contains a fixed number of values from the column.

The second difference between the vectorized model and the traditional model is that instead of pushing one tuple at a time up the query plan tree, you basically push a block. A block comprises of fixed set of tuples (records) which are represented by a set of vectors with one-to-one correspondence between vector and column/field in the schema. The block of vectors is the fundamental unit of data that flows through the execution plan tree from one operator to another.

 

 

Figure 1: Traditional tuple-at-a-time processing v/s Vectorized Processing  

In Figure 1, on the left hand side we have the traditional model of the tuple-at-a-time processing. The scan operator starts getting the input data and starts pushing tuples through the filter operator. Then, the filter operator pushes the qualifying tuples to the aggregation operator. Operator keeps calling next on the operator downstream in the query plan tree. The output of that is that the operator down in the tree starts pushing the tuple up to the operator located above in the tree. That's how the query execution proceeds.

Now, there is a severe performance overhead in this execution because of large number of function calls and the small amount of data being processed/transferred across each function call from one operator to another. Secondly, we are transporting a tuple where as you may need to work only with a subset of columns in that tuple.

On the right hand side now we show the vectorized model where we push a block of vectors, and each vector has a set of records or column values. There are as many vectors as there are columns in the dataset. You just keep pushing a batch of these vectors up the query plan tree and this will be the input and output of different operators in the query plan. This approach is far more efficient than the other approach because you amortize the cost associated with function calls between different operators. Secondly we operate on columns as opposed to rows/tuples.

Vectorized code makes efficient utilization of CPU cache. For example, consider a row of data with 10 columns and a query plan that needs to operate only on a single column. In a row-oriented query processing model, nine columns would occupy cache unnecessarily, limiting the number of values that can fit into cache. With column oriented processing, only the values from particular column of interest would be read into cache, allowing for many more values to be processed together and efficient usage of CPU-memory bandwidth.

The main idea behind vectorized processing is to work on columns (or columnar data) and defer the materialization of values from multiple columns into tuples (or rows) till very late in the query plan -- mostly when we need to project the resultset back to user. This is why query execution algorithms are typically rewritten to do “column based processing”. If we store data in columnar format but our processing code is written for row-oriented format then as soon as we read column(s), we have to stitch together values from multiple columns to form a tuple and then feed the tuple into a query operator that does conventional row-by-row processing. Forming tuples sooner during execution prevents running highly optimized query processing logic on columnar data.

Modern processors have extended instruction sets that extend the concept of vectorized execution to multiple column values in parallel in a single instruction. Single Instruction Multiple Data (SIMD) instructions became mainstream in desktop computing in the 1990s to improve the performance of multimedia applications, such as gaming.

SIMD is especially beneficial for applying the same change to many values, such as adjusting the brightness of an image. Each pixel’s brightness is defined by the values for red (R), green (G) and blue (B). To change the brightness, the R, G and B values are read from memory, the values are adjusted, and the results are written back out to memory. Without SIMD, pixel RGB values are read into memory individually. With SIMD, blocks of pixel RGB values can be processed together in a single instruction. This is vastly more efficient.

These concepts are very applicable to processing data in the world of analytics. SIMD exploits data level parallelism independent of concurrency. SIMD instructions allow us to execute the same instruction on multiple column values at a time in the same clock cycle, literally multiplying by 4 or more the throughput of execution. Columnar format of data is amenable to SIMD processing since we can potentially store the column values in well-aligned densely packed arrays in memory which can then be loaded into a fixed-width SIMD register. Modern Intel compilers are equipped with AVX-512 (Advanced Vector Extensions) instruction sets which have increased the width of SIMD register to 512 bits. In other words, this allows us to operate on 16 4-byte integer column values in parallel. Other SIMD instruction sets are SSE, SSE2, AVX, AVX2.

To take advantage of vectorized processing and SIMD it is essential to correctly organize data for maximum benefit. An open source framework now exists for in-memory processing called Apache Arrow. Arrow makes sure values are properly aligned in memory to take maximum advantage of vectorization and SIMD instructions when possible.  

Apache Arrow

The Arrow project is a top level open source project from the Apache Software Foundation. Arrow defines a standard way for representing data in-memory for efficient processing, as well as bindings in many popular programming languages including Java, C++, and Python.

Arrow was announced two years ago and has seen rapid growth. Developers from over a dozen major open source projects have been contributing to the Arrow community since its inception. Arrow benefits many different types of projects, and simplifies the exchange of data between processes. The benefits of using Arrow can be quite significant.

Vectorized Query Processing Using Apache Arrow

The main focus with Arrow is CPU and GPU efficiency. Arrow provides language independent standardized format for columnar representations of data (flat or nested) and corresponding libraries that allow us to efficiently run analytical workloads on modern hardware.

Data warehouse workloads and analytical queries benefit greatly from columnar formats since such queries generally involve a subset of columns but a large fraction of rows across those columns. Some examples of queries in analytical workloads are large aggregations, scans and complex joins.

Columnar format allows us to write very simple and efficient query processing code to accelerate analytical operations. We can write tight for-loop code that quickly runs through the column values and do the necessary operations like FILTER, COUNT, SUM, MIN etc. This approach is very CPU friendly because cache lines are filled with related data -- a set of values from column and all of them need to be processed. Similarly when we read columnar data from disk into memory, we only need to read the necessary columns. So query algorithms written around columnar format turn out to be far more efficient consumers of disk I/O and CPU-memory bandwidth as compared to their row-based counterparts.

In addition, the compiler helps us a lot along with this data format because such tight-loop code can be converted into vectorized instructions automatically by the compiler if it sees such optimization opportunities during compilation. These opportunities would not be available to us when we write query processing algorithm for row oriented data.

Arrow’s in-memory format for columnar data actually allows us to access any cell value in the column in constant time. Similarly, the columnar format also allows us to use CPU-efficient compression schemes that are extremely lightweight and actually favor the performance of query processing rather than actually favoring the compression ratio, which can hurt your CPU efficiency tremendously.

Query processing with Arrow in Dremio

Dremio is an open source platform for self-service data. Our core execution engine is called Sabot and it's built entirely on top of Arrow’s libraries.

Let's talk about the memory management inside Dremio with respect to Arrow. Arrow actually includes a chunk-based managed allocator which is built entirely on top of of Netty's JEMalloc implementation. The main memory management model or allocation model is a tree-based model where we start with the root allocator. We can then create multiple child allocators under the root allocator. Each allocator has an initial reservation (imposed at the time of creation of allocator) and a maximum allocation limit. Reservation doesn’t imply pre-allocation. It means the amount of memory reserved will likely be available to the operator for allocation during the lifetime of allocator.  

 

 

Figure 2: Tree based allocation system

We use off-heap buffers as the underlying memory for our in-memory columnar data structures inside our execution engine. We avoid using JVM heap to eliminate the overhead associated with garbage collection in Java.

Now, let's talk about how we use this tree-based allocation model as well as the initial reservation and memory limit semantics inside Dremio. Each operator in the query plan tree gets its own allocator (we call it parent allocator). Each operator then creates one or more child allocators (with initial reservation and limit) for different individual pieces of work inside the operator.

Let's take the example of the external sort operator. This is the operator responsible for handling sort queries gracefully during out of memory conditions. It has the ability to spill data whenever we are running under low memory conditions.

 

 

Figure 3: Tree based allocation for external sort operator

At the top level we have a root allocator for the operator. Before query execution begins, we setup the operator with its own allocator. There are two main sub-components of the sort operator. One is Memory-Run, and the second is Disk-Run and each sub-component creates its own child allocator off operator’s allocator.

Memory-Run is basically responsible for consuming and sorting batches of data that are arriving into the operator. At the end when all the input has been processed you have all the data sorted and the operator can start outputting the data out of the memory run sub-component.

We also have a Disk-Run component which manages spilling. In case we run out of memory, we need to spill (one or more times) the in-memory data sorted data. Then once the data has been spilled, we again need to do some sort of processing to load multiple sorted streams of data from disk into memory, do an in-memory merge to finish processing, and then pump the data out of the operator. The code that processes spilled data has to guarantee there is enough memory to load 2 or more sets (or batches) of spilled records into memory to continue with in-memory merge processing. DiskRun component keeps tracks of multiple spill cycles (or iterations) and size of largest spilled batch in each cycle. The child allocator reserves enough memory to be able to load a spilled batch from each spilled iteration.

In Dremio the data flows in pipelined fashion from one operator to another operator as a set of vectors. We call this a record batch. A record batch comprises of column vector(s) (Arrow data structure for columnar representation) with a fixed number of records. Record batch is the unit of work inside Dremio’s execution engine.

 

 

Figure 4: Pipelined data flow from one query operator to another without expensive copy

In this example we have two operators: scan and aggregation. The dataset happens to have three columns and has one Arrow vector for each of those columns, so three vectors total. The above diagram shows how the scan operator’s output (records as vectors) is being fed as input into the aggregation operator.

During some operations, such as some types of joins and some aggregations, it may be necessary to transpose from a column oriented format to a row oriented format. Through performance experiments we discovered that columnar representation of data is not efficient for hash table insertion, lookup for algorithms like hash join and hash aggregation. So the implementation of these algorithms partially (mostly hash table code) operates on row-wise data as we pivot the key columns from incoming record batch into corresponding row-wise representation before inserting into hashtable for both aggregation and join.

 

 

Figure 5: Vectorized Hash Aggregation with row-wise representation of key columns

Here is an overview of the vectorized code we use to perform the pivot from column-oriented to row-oriented data:

Code example of Vectorized columnar pivot to row-wise representation for efficient insertion/lookup in hash table - used in vectorized hash aggregation and join. This is done on GROUP BY key columns or join columns for hash aggregation and hash join

static void pivot4Bytes(
     VectorPivotDef def,
     FixedBlockVector fixedBlock,
     final int count) {
   /* source column vector to pivot */
   final FieldVector field =def.getIncomingVector();
   /* source column vector buffers */
   final List<ArrowBuf> buffers = field.getFieldBuffers();

   final int blockLength = fixedBlock.getBlockWidth();
   final int bitOffset = def.getNullBitOffset();
   
   /* validity buffer of source vector */
   long srcBitsAddr = buffers.get(0).memoryAddress();
   /* data buffer of source vector */
   long srcDataAddr = buffers.get(1).memoryAddress();

   /* target memory region to store pivoted (row-wise) representation */
   long targetAddr = fixedBlock.getMemoryAddress();

   /* determine number of null values to work through a word at a time */
   final int remainCount = count % WORD_BITS;
   final int wordCount = (count - remainCount) / WORD_BITS;
   final long finalWordAddr = srcDataAddr + (wordCount * WORD_BITS * FOUR_BYTE);

   long bitTargetAddr = targetAddr + def.getNullByteOffset();
   long valueTargetAddr = targetAddr + def.getOffset();

   // decode word at a time -- 64 column values
   while (srcDataAddr < finalWordAddr) {
     final long bitValues = PlatformDependent.getLong(srcBitsAddr);

     if (bitValues == NONE_SET) {
       // noop (all nulls).
       bitTargetAddr += (WORD_BITS * blockLength);
       valueTargetAddr += (WORD_BITS * blockLength);
       srcDataAddr += (WORD_BITS * FOUR_BYTE);

     } else if (bitValues == ALL_SET) {
       // all set, set the bit values using a constant AND.
       // Independently set the data values without transformation.
       final int bitVal = 1 << bitOffset;
       for (int i = 0; i < WORD_BITS; i++, bitTargetAddr += blockLength) {
         PlatformDependent.putInt(bitTargetAddr,
          PlatformDependent.getInt(bitTargetAddr) | bitVal);
       }

       for (int i = 0; i < WORD_BITS; i++, valueTargetAddr += blockLength, srcDataAddr += FOUR_BYTE) {
         PlatformDependent.putInt(valueTargetAddr, PlatformDependent.getInt(srcDataAddr));
       }
     } else {
       // some nulls, some not, update each value to zero or the value, depending on the null bit.
       for (int i = 0; i < WORD_BITS; i++, bitTargetAddr += blockLength, valueTargetAddr += blockLength, srcDataAddr += FOUR_BYTE) {
         final int bitVal = ((int) (bitValues >>> i)) & 1;
         PlatformDependent.putInt(bitTargetAddr, 
PlatformDependent.getInt(bitTargetAddr) | (bitVal << bitOffset));
         PlatformDependent.putInt(valueTargetAddr, PlatformDependent.getInt(srcDataAddr) * bitVal);
       }
     }
     srcBitsAddr += WORD_BYTES;
   }

  if(remainCount > 0) {
// do the remaining bits..
  }

Code Example: of Vectorized copy from one column vector (source) to another (target) using 2 byte selection vector. Very efficient tight-loop C/C++ style code written to directly interact with underlying memory (example is specific to fixed width 4 byte column):

Example Use Case: SELECT C1 from FOO where C2 > 1000;

We first do efficient filter processing on C2 in a tight for-loop vectorized code and construct a selection vector that stores the offsets of column values that passed the filter. Now run another loop that uses these offsets to index the values to be projected from C1.

static class FourByteCopier extends FieldBufferCopier {
    private static final int SIZE = 4;
    private final FieldVector source;
    private final FieldVector target;
    private final FixedWidthVector targetAlt;

    public FourByteCopier(FieldVector source, FieldVector target) {
      this.source = source;
      this.target = target;
      this.targetAlt = (FixedWidthVector) target;
    }

    @Override
    public void copy(long offsetAddr, int count) {
      targetAlt.allocateNew(count);
      final long max = offsetAddr + count * 2;
      final long srcAddr = source.getDataBufferAddress();
      long dstAddr = target.getDataBufferAddress();
      for(long addr = offsetAddr; addr < max; addr += STEP_SIZE, dstAddr += SIZE){
        PlatformDependent.putInt(dstAddr, 
PlatformDependent.getInt(srcAddr + ((char)PlatformDependent.getShort(addr)) * SIZE));
      }
    }

Determining Batch Size of Vectors

Let's talk about how we actually size our vectors. A unit of work in Dremio is called as a record batch or data batch or a set which comprises of Arrow in-memory vectors. Each vector is for a field or a column in your dataset and consists of fixed number of records.

 

 

Figure 6: Record batch transfer from query operator to another

Let's say you have a million record dataset. What we will work on at a time is record batch of around 4000 records. The record batch is the unit of data that flows through the pipeline from one operator to another operator. Now, there are different kinds of mechanics associated with how to fix the number of records in a data batch. It could be one or all the way up to 64 thousand.

We have seen that the large batch size like 8000 or 16,000 actually improves the performance because the unit of work increases and the number of times you'll repeat the processing goes down. But, the larger batch also causes pipelining problems because you are actually sending large amounts of data between operators. Whereas if you use a smaller batch size like 128 or 256, although the processing on an individual batch will be faster and the amount of the data transferred between the operators is going to be much slower, but because of the sheer number of times that the processing has to be repeated and the volume of objects that will be constructed, the heap overhead of a query just shoots up. This is why the standard record batch size that we have been working with at Dremio is configurable, and in most cases it is a batch size of 4096 records.

With batch size we actually control the amount of memory that we have allocated for the vectors. In operators like external sort or aggregation or join where we really need to be conscious of the amount of memory that the operator has to work with, we cannot really rely on default memory allocation of vectors provided by Arrow APIs.

Carefully configuring the batch size for vectors and then allocating memory appropriately just for those number of records allows us to write robust algorithms that work nicely in memory constrained environments.

Compression Columnar format allows us to leverage very lightweight and CPU efficient compression schemes as opposed to traditional general purpose compression schemes which are used heavily in databases and other systems like LZO, ZLIB ETC. The general purpose compression algorithms give better compression ratios but they hurt CPU efficiency as the cost of compression and decompressions adds to the overall elapsed time of query.

Arrow’s columnar format allows us to use several lightweight schemes like dictionary encoding, bit packing, and run length encoding, which favor query performance over compression ratio. Secondly they allow us to directly operate on the compressed columnar data, which improves the performance of queries by an order of magnitude because you don't have to decompress all the column data upfront before starting the processing.

Let's take an example on how we use dictionary encoding on the variable width column values.

 

 

Figure 7: Using dictionary encoding with SIMD for efficient predicate evaluation on Strings

You have COUNTRY column with values United States, China, India, France, United Kingdom etc which are variable width in nature. You need to write a query which does a FILTER on COUNTRY. Dictionary encoding the column allows us to efficiently do the filter processing by rewriting filter on variable width strings as filter on fixed width dictionary encoded values.

SELECT C1, C2 FROM FOO WHERE COUNTRY=’FRANCE’

We first consult the dictionary to get the dictionary encoded value of “FRANCE” which is four. Load the dictionary value 4 into SIMD register and then load all the encoded column values followed by parallel comparison of encoded column values with 4 to find out the offsets (or indexes) of cells where COUNTRY column value is “FRANCE”.

This is the power of dictionary encoding. You can actually compress your variable width column values into fixed-width dense arrays of dictionary values and then rewrite your query processing algorithms that can just quickly loop through those compressed column values in a very efficient manner.

Data Reflections

Dremio uses a feature called Data Reflections to optimize data in order to accelerate queries. Data Reflections are stored in columnar format on disk using Apache Parquet. When we read data from a Data Reflection, we load the data from Parquet to the corresponding columnar format in memory into Arrow for processing in our execution engine.

Our initial reader for Data Reflections was actually based on a row oriented format. It was basically a row wise reader where it was not at all taking advantage of the fact that both source (disk) and target (memory) data formats are columnar.

So we rewrote the reader to be completely vectorized where the processing is column oriented. This has improved the efficiency of code that reads Parquet pages (compressed or uncompressed) from disk into memory and reconstruct Arrow column vectors.  

We also support filter push down for Parquet scans. The predicates indicated in the query can directly be pushed down to the Parquet scan code such that we only load the necessary column data into memory when reconstructing the Arrow in-memory vectors.

Summary

Dremio is an open source data processing framework based on Apache Arrow with vectorization capabilities. In this article we discussed some of the key aspects of these capabilities and a detailed tech talk was given at Strata Conference this year in San Jose. In Dremio we make extensive use of Arrow throughout our in-memory execution engine. We recently revamped most parts of Java implementation in Arrow for improved performance and heap usage. Some Tpch queries exhibited as much as a 60% reduction in latency. We have additional enhancements we plan to make, including native SIMD acceleration libraries which we believe will provide significant additional improvements in processing efficiency.

About the Author

Siddharth Teotia is a software engineer at Dremio and a contributor to Apache Arrow project. Previously, Siddharth was on the database kernel team at Oracle, where he worked on storage, indexing, and the in-memory columnar query processing layers of Oracle RDBMS. He holds an MS in software engineering from CMU and a BS in information systems from BITS Pilani, India. During his studies, Siddharth focused on distributed systems, databases, and software architecture.

Rate this Article

Adoption
Style

BT