Jun
22
2017
--

ClickHouse in a General Analytical Workload (Based on a Star Schema Benchmark)

ClickHouse

ClickHouseIn this blog post, we’ll look at how ClickHouse performs in a general analytical workload using the star schema benchmark test.

We have mentioned ClickHouse in some recent posts (ClickHouse: New Open Source Columnar Database, Column Store Database Benchmarks: MariaDB ColumnStore vs. Clickhouse vs. Apache Spark), where it showed excellent results. ClickHouse by itself seems to be event-oriented RDBMS, as its name suggests (clicks). Its primary purpose, using Yandex Metrica (the system similar to Google Analytics), also points to an event-based nature. We also can see there is a requirement for date-stamped columns.

It is possible, however, to use ClickHouse in a general analytical workload. This blog post shares my findings. For these tests, I used a Star Schema benchmark — slightly-modified so that able to handle ClickHouse specifics.

First, let’s talk about schemas. We need to adjust to ClickHouse data types. For example, the biggest fact table in SSB is “lineorder”. Below is how it is defined for Amazon RedShift (as taken from https://docs.aws.amazon.com/redshift/latest/dg/tutorial-tuning-tables-create-test-data.html):

CREATE TABLE lineorder
(
  lo_orderkey          INTEGER NOT NULL,
  lo_linenumber        INTEGER NOT NULL,
  lo_custkey           INTEGER NOT NULL,
  lo_partkey           INTEGER NOT NULL,
  lo_suppkey           INTEGER NOT NULL,
  lo_orderdate         INTEGER NOT NULL,
  lo_orderpriority     VARCHAR(15) NOT NULL,
  lo_shippriority      VARCHAR(1) NOT NULL,
  lo_quantity          INTEGER NOT NULL,
  lo_extendedprice     INTEGER NOT NULL,
  lo_ordertotalprice   INTEGER NOT NULL,
  lo_discount          INTEGER NOT NULL,
  lo_revenue           INTEGER NOT NULL,
  lo_supplycost        INTEGER NOT NULL,
  lo_tax               INTEGER NOT NULL,
  lo_commitdate        INTEGER NOT NULL,
  lo_shipmode          VARCHAR(10) NOT NULL
);

For ClickHouse, the table definition looks like this:

CREATE TABLE lineorderfull (
        LO_ORDERKEY             UInt32,
        LO_LINENUMBER           UInt8,
        LO_CUSTKEY              UInt32,
        LO_PARTKEY              UInt32,
        LO_SUPPKEY              UInt32,
        LO_ORDERDATE            Date,
        LO_ORDERPRIORITY        String,
        LO_SHIPPRIORITY         UInt8,
        LO_QUANTITY             UInt8,
        LO_EXTENDEDPRICE        UInt32,
        LO_ORDTOTALPRICE        UInt32,
        LO_DISCOUNT             UInt8,
        LO_REVENUE              UInt32,
        LO_SUPPLYCOST           UInt32,
        LO_TAX                  UInt8,
        LO_COMMITDATE           Date,
        LO_SHIPMODE             String
)Engine=MergeTree(LO_ORDERDATE,(LO_ORDERKEY,LO_LINENUMBER),8192);

From this we can see we need to use datatypes like UInt8 and UInt32, which are somewhat unusual for database world datatypes.

The second table (RedShift definition):

CREATE TABLE customer
(
  c_custkey      INTEGER NOT NULL,
  c_name         VARCHAR(25) NOT NULL,
  c_address      VARCHAR(25) NOT NULL,
  c_city         VARCHAR(10) NOT NULL,
  c_nation       VARCHAR(15) NOT NULL,
  c_region       VARCHAR(12) NOT NULL,
  c_phone        VARCHAR(15) NOT NULL,
  c_mktsegment   VARCHAR(10) NOT NULL
);

For ClickHouse, I defined as:

CREATE TABLE customerfull (
        C_CUSTKEY       UInt32,
        C_NAME          String,
        C_ADDRESS       String,
        C_CITY          String,
        C_NATION        String,
        C_REGION        String,
        C_PHONE         String,
        C_MKTSEGMENT    String,
        C_FAKEDATE      Date
)Engine=MergeTree(C_FAKEDATE,(C_CUSTKEY),8192);

For reference, the full schema for the benchmark is here: https://github.com/vadimtk/ssb-clickhouse/blob/master/create.sql.

For this table, we need to define a rudimentary column C_FAKEDATE Date in order to use ClickHouse’s most advanced engine (MergeTree). I was told by the ClickHouse team that they plan to remove this limitation in the future.

To generate data acceptable by ClickHouse, I made modifications to ssb-dbgen. You can find my version here: https://github.com/vadimtk/ssb-dbgen. The most notable change is that ClickHouse can’t accept dates in CSV files formatted as “19971125”. It has to be “1997-11-25”. This is something to keep in mind when loading data into ClickHouse.

It is possible to do some preformating on the load, but I don’t have experience with that. A common approach is to create the staging table with datatypes that match loaded data, and then convert them using SQL functions when inserting to the main table.

Hardware Setup

One of the goals of this benchmark to see how ClickHouse scales on multiple nodes. I used a setup of one node, and then compared to a setup of three nodes. Each node is 24 cores of “Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz” CPUs, and the data is located on a very fast PCIe Flash storage.

For the SSB benchmark I use a scale factor of 2500, which provides (in raw data):

Table lineorder – 15 bln rows, raw size 1.7TB, Table customer – 75 mln rows

When loaded into ClickHouse, the table lineorder takes 464GB, which corresponds to a 3.7x compression ratio.

We compare a one-node (table names lineorderfull, customerfull) setup vs. a three-node (table names lineorderd, customerd) setup.

Single Table Operations

Query:

SELECT
    toYear(LO_ORDERDATE) AS yod,
    sum(LO_REVENUE)
FROM lineorderfull
GROUP BY yod

One node:

7 rows in set. Elapsed: 9.741 sec. Processed 15.00 billion rows, 90.00 GB (1.54 billion rows/s., 9.24 GB/s.)

Three nodes:

7 rows in set. Elapsed: 3.258 sec. Processed 15.00 billion rows, 90.00 GB (4.60 billion rows/s., 27.63 GB/s.)

We see a speed up of practically three times. Handling 4.6 billion rows/s is blazingly fast!

One Table with Filtering

SELECT sum(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorderfull
WHERE (toYear(LO_ORDERDATE) = 1993) AND ((LO_DISCOUNT >= 1) AND (LO_DISCOUNT <= 3)) AND (LO_QUANTITY < 25)

One node:

1 rows in set. Elapsed: 3.175 sec. Processed 2.28 billion rows, 18.20 GB (716.60 million rows/s., 5.73 GB/s.)

Three nodes:

1 rows in set. Elapsed: 1.295 sec. Processed 2.28 billion rows, 18.20 GB (1.76 billion rows/s., 14.06 GB/s.)

It’s worth mentioning that during the execution of this query, ClickHouse was able to use ALL 24 cores on each box. This confirms that ClickHouse is a massively parallel processing system.

Two Tables (Independent Subquery)

In this case, I want to show how Clickhouse handles independent subqueries:

SELECT sum(LO_REVENUE)
FROM lineorderfull
WHERE LO_CUSTKEY IN
(
    SELECT C_CUSTKEY AS LO_CUSTKEY
    FROM customerfull
    WHERE C_REGION = 'ASIA'
)

One node:

1 rows in set. Elapsed: 28.934 sec. Processed 15.00 billion rows, 120.00 GB (518.43 million rows/s., 4.15 GB/s.)

Three nodes:

1 rows in set. Elapsed: 14.189 sec. Processed 15.12 billion rows, 121.67 GB (1.07 billion rows/s., 8.57 GB/s.)

We  do not see, however, the close to 3x speedup on three nodes, because of the required data transfer to perform the match LO_CUSTKEY with C_CUSTKEY

Two Tables JOIN

With a subquery using columns to return results, or for GROUP BY, things get more complicated. In this case we want to GROUP BY the column from the second table.

First, ClickHouse doesn’t support traditional subquery syntax, so we need to use JOIN. For JOINs, ClickHouse also strictly prescribes how it must be written (a limitation that will also get changed in the future). Our JOIN should look like:

SELECT
    C_REGION,
    sum(LO_EXTENDEDPRICE * LO_DISCOUNT)
FROM lineorderfull
ANY INNER JOIN
(
    SELECT
        C_REGION,
        C_CUSTKEY AS LO_CUSTKEY
    FROM customerfull
) USING (LO_CUSTKEY)
WHERE (toYear(LO_ORDERDATE) = 1993) AND ((LO_DISCOUNT >= 1) AND (LO_DISCOUNT <= 3)) AND (LO_QUANTITY < 25)
GROUP BY C_REGION

One node:

5 rows in set. Elapsed: 31.443 sec. Processed 2.35 billion rows, 28.79 GB (74.75 million rows/s., 915.65 MB/s.)

Three nodes:

5 rows in set. Elapsed: 25.160 sec. Processed 2.58 billion rows, 33.25 GB (102.36 million rows/s., 1.32 GB/s.)

In this case the speedup is not even two times. This corresponds to the fact of the random data distribution for the tables lineorderd and customerd. Both tables were defines as:

CREATE TABLE lineorderd AS lineorder ENGINE = Distributed(3shards, default, lineorder, rand());
CREATE TABLE customerd AS customer ENGINE = Distributed(3shards, default, customer, rand());

Where  rand() defines that records are distributed randomly across three nodes. When we perform a JOIN by LO_CUSTKEY=C_CUSTKEY, records might be located on different nodes. One way to deal with this is to define data locally. For example:

CREATE TABLE lineorderLD AS lineorderL ENGINE = Distributed(3shards, default, lineorderL, LO_CUSTKEY);
CREATE TABLE customerLD AS customerL ENGINE = Distributed(3shards, default, customerL, C_CUSTKEY);

Three Tables JOIN

This is where it becomes very complicated. Let’s consider the query that you would normally write:

SELECT sum(LO_REVENUE),P_MFGR, toYear(LO_ORDERDATE) yod FROM lineorderfull ,customerfull,partfull WHERE C_REGION = 'ASIA' and
LO_CUSTKEY=C_CUSTKEY and P_PARTKEY=LO_PARTKEY GROUP BY P_MFGR,yod ORDER BY P_MFGR,yod;

With Clickhouse’s limitations on JOINs syntax, the query becomes:

SELECT
    sum(LO_REVENUE),
    P_MFGR,
    toYear(LO_ORDERDATE) AS yod
FROM
(
    SELECT
        LO_PARTKEY,
        LO_ORDERDATE,
        LO_REVENUE
    FROM lineorderfull
    ALL INNER JOIN
    (
        SELECT
            C_REGION,
            C_CUSTKEY AS LO_CUSTKEY
        FROM customerfull
    ) USING (LO_CUSTKEY)
    WHERE C_REGION = 'ASIA'
)
ALL INNER JOIN
(
    SELECT
        P_MFGR,
        P_PARTKEY AS LO_PARTKEY
    FROM partfull
) USING (LO_PARTKEY)
GROUP BY
    P_MFGR,
    yod
ORDER BY
    P_MFGR ASC,
    yod ASC

By writing queries this way, we force ClickHouse to use the prescribed JOIN order — at this moment there is no optimizer in ClickHouse and it is totally unaware of data distribution.

There is also not much speedup when we compare one node vs. three nodes:

One node execution time:

35 rows in set. Elapsed: 697.806 sec. Processed 15.08 billion rows, 211.53 GB (21.61 million rows/s., 303.14 MB/s.)

Three nodes execution time:

35 rows in set. Elapsed: 622.536 sec. Processed 15.12 billion rows, 211.71 GB (24.29 million rows/s., 340.08 MB/s.)

There is a way to make the query faster for this 3-way JOIN, however. (Thanks to Alexander Zaytsev from https://www.altinity.com/ for help!)

Optimized query:

SELECT
    sum(revenue),
    P_MFGR,
    yod
FROM
(
    SELECT
        LO_PARTKEY AS P_PARTKEY,
        toYear(LO_ORDERDATE) AS yod,
        SUM(LO_REVENUE) AS revenue
    FROM lineorderfull
    WHERE LO_CUSTKEY IN
    (
        SELECT C_CUSTKEY
        FROM customerfull
        WHERE C_REGION = 'ASIA'
    )
    GROUP BY
        P_PARTKEY,
        yod
)
ANY INNER JOIN partfull USING (P_PARTKEY)
GROUP BY
    P_MFGR,
    yod
ORDER BY
    P_MFGR ASC,
    yod ASC

Optimized query time:

One node:

35 rows in set. Elapsed: 106.732 sec. Processed 15.00 billion rows, 210.05 GB (140.56 million rows/s., 1.97 GB/s.)

Three nodes:

35 rows in set. Elapsed: 75.854 sec. Processed 15.12 billion rows, 211.71 GB (199.36 million rows/s., 2.79 GB/s.

That’s an improvement of about 6.5 times compared to the original query. This shows the importance of understanding data distribution, and writing the optimal query to process the data.

Another option for dealing with JOIN complexity, and to improve performance, is to use ClickHouse’s dictionaries. These dictionaries are described here: https://www.altinity.com/blog/2017/4/12/dictionaries-explained.

I will review dictionary performance in future posts.

Another traditional way to deal with JOIN complexity in an analytics workload is to use denormalization. We can move some columns (for example, P_MFGR from the last query) to the facts table (lineorder).

Observations

  • ClickHouse can handle general analytical queries (it requires special schema design and considerations, however)
  • Linear speedup is possible, but it depends on query design and requires advanced planning — proper speedup depends on data locality
  • ClickHouse is blazingly fast (beyond what I’ve seen before) because it can use all available CPU cores for query, as shown above using 24 cores for single server and 72 cores for three nodes
  • Multi-table JOINs are cumbersome and require manual work to achieve better performance, so consider using dictionaries or denormalization
Mar
15
2017
--

Percona Live Featured Session with Evan Elias: Automatic MySQL Schema Management with Skeema

Percona Live Featured Session

Percona Live Featured SessionWelcome to another post in the series of Percona Live featured session blogs! In these blogs, we’ll highlight some of the session speakers that will be at this year’s Percona Live conference. We’ll also discuss how these sessions can help you improve your database environment. Make sure to read to the end to get a special Percona Live 2017 registration bonus!

In this Percona Live featured session, we’ll meet Evan Elias, Director of Engineering, Tumblr. His session is Automatic MySQL Schema Management with SkeemaSkeema is a new open source CLI tool for managing MySQL schemas and migrations. It allows you to easily track your schemas in a repository, supporting a pull-request-based workflow for schema change submission, review, and execution.

I had a chance to speak with Evan about Skeema:

Evan EliasPercona: How did you get into database technology? What do you love about it?

Evan: I first started using MySQL at a college IT job in 2003, and over the years I eventually began tackling much larger-scale deployments at Tumblr and Facebook. I’ve spent most of the past decade working on social networks, where massive high-volume database technology is fundamental to the product. I love the technical challenges present in that type of environment, as well as the huge potential impact of database automation and tooling. In companies with giant databases and many engineers, a well-designed automation system can provide a truly enormous increase in productivity.

Percona: Your talk is called Automatic MySQL Schema Management with Skeema. What is Skeema, and how is it helpful for engineers and DBAs?

Evan: Skeema is an open source tool for managing MySQL schemas and migrations. It allows users to diff, push or pull schema definitions between the local filesystem and one or more databases. It can be configured to support multiple environments (e.g. development/staging/production), external online schema change tools, sharding, and service discovery. Once configured, an engineer or DBA can use Skeema to execute an online schema change on many shards concurrently simply by editing a CREATE TABLE statement in a file and then running “skeema push”.

Percona: What are the benefits of storing schemas in a repository?

Evan: The whole industry is moving towards infrastructure-as-code solutions, providing automated configuration which is reproducible across multiple environments. In extending this concept to database schemas, a file repository stores the desired state of each table, and a schema change is tied to simply changing these files. A few large companies like Facebook have internal closed-source tools to tie MySQL schemas to a git repo, allowing schema changes to be powered by pull requests (without any manual DBA effort). There hasn’t previously been an open source, general-purpose tool for managing schemas and migrations in this way, however. I developed Skeema to fill this gap.

Percona: What do you want attendees to take away from your session? Why should they attend?

Evan: In this session, MySQL DBAs will learn how to automate their schema change workflow to reduce manual operational work, while software engineers will discover how Skeema permits easy online migrations even in frameworks like Rails or Django. Skeema is a brand new tool, and this is the first conference session to introduce it. At this relatively early stage, feedback and feature requests from attendees will greatly influence the direction and prioritization of future development.

Percona: What are you most looking forward to at Percona Live 2017?

Evan: Percona Live is my favorite technical conference. It’s the best place to learn about all of the recent developments in the database world, and meet the top experts in the field. This is my fifth year attending in Santa Clara. I’m looking forward to reconnecting with old friends and making some new ones as well!

Register for Percona Live Data Performance Conference 2017, and see Evan present his session on Automatic MySQL Schema Management with Skeema. Use the code FeaturedTalk and receive $100 off the current registration price!

Percona Live Data Performance Conference 2017 is the premier open source event for the data performance ecosystem. It is the place to be for the open source community as well as businesses that thrive in the MySQL, NoSQL, cloud, big data and Internet of Things (IoT) marketplaces. Attendees include DBAs, sysadmins, developers, architects, CTOs, CEOs, and vendors from around the world.

The Percona Live Data Performance Conference will be April 24-27, 2017 at the Hyatt Regency Santa Clara & The Santa Clara Convention Center.

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com