Feb
05
2019
--

Backed by Benchmark, Blue Hexagon just raised $31 million for its deep learning cybersecurity software

Nayeem Islam spent nearly 11 years with chipmaker Qualcomm, where he founded its Silicon Valley-based R&D facility, recruited its entire team and oversaw research on all aspects of security, including applying machine learning on mobile devices and in the network to detect threats early.

Islam was nothing if not prolific, developing a system for on-device machine learning for malware detection, libraries for optimizing deep learning algorithms on mobile devices and systems for parallel compute on mobile devices, among other things.

In fact, because of his work, he also saw a big opportunity in better protecting enterprises from cyberthreats through deep neural networks that are capable of processing every raw byte within a file and that can uncover complex relations within data sets. So two years ago, Islam and Saumitra Das, a former Qualcomm engineer with 330 patents to his name and another 450 pending, struck out on their own to create Blue Hexagon, a now 30-person Sunnyvale, Calif.-based company that is today disclosing it has raised $31 million in funding from Benchmark and Altimeter.

The funding comes roughly one year after Benchmark quietly led a $6 million Series A round for the firm.

So what has investors so bullish on the company’s prospects, aside from its credentialed founders? In a word, speed, seemingly. According to Islam, Blue Hexagon has created a real-time, cybersecurity platform that he says can detect known and unknown threats at first encounter, then block them in “sub seconds” so the malware doesn’t have time to spread.

The industry has to move to real-time detection, he says, explaining that four new and unique malware samples are released every second, and arguing that traditional security methods can’t keep pace. He says that sandboxes, for example, meaning restricted environments that quarantine cyberthreats and keep them from breaching sensitive files, are no longer state of the art. The same is true of signatures, which are mathematical techniques used to validate the authenticity and integrity of a message, software or digital document but are being bypassed by rapidly evolving new malware.

Only time will tell if Blue Hexagon is far more capable of identifying and stopping attackers, as Islam insists is the case. It is not the only startup to apply deep learning to cybersecurity, though it’s certainly one of the first. Critics, some who are protecting their own corporate interests, also worry that hackers can foil security algorithms by targeting the warning flags they look for.

Still, with its technology, its team and its pitch, Blue Hexagon is starting to persuade not only top investors of its merits, but a growing — and broad — base of customers, says Islam. “Everyone has this issue, from large banks, insurance companies, state and local governments. Nowhere do you find someone who doesn’t need to be protected.”

Blue Hexagon can even help customers that are already under attack, Islam says, even if it isn’t ideal. “Our goal is to catch an attack as early in the kill chain as possible. But if someone is already being attacked, we’ll see that activity and pinpoint it and be able to turn it off.”

Some damage may already be done, of course. It’s another reason to plan ahead, he says. “With automated attacks, you need automated techniques.” Deep learning, he insists, “is one way of leveling the playing field against attackers.”

Dec
06
2018
--

Contentful raises $33.5M for its headless CMS platform

Contentful, a Berlin- and San Francisco-based startup that provides content management infrastructure for companies like Spotify, Nike, Lyft and others, today announced that it has raised a $33.5 million Series D funding round led by Sapphire Ventures, with participation from OMERS Ventures and Salesforce Ventures, as well as existing investors General Catalyst, Benchmark, Balderton Capital and Hercules. In total, the company has now raised $78.3 million.

It’s been less than a year since the company raised its Series C round and, as Contentful co-founder and CEO Sascha Konietzke told me, the company didn’t really need to raise right now. “We had just raised our last round about a year ago. We still had plenty of cash in our bank account and we didn’t need to raise as of now,” said Konietzke. “But we saw a lot of economic uncertainty, so we thought it might be a good moment in time to recharge. And at the same time, we already had some interesting conversations ongoing with Sapphire [formerly SAP Ventures] and Salesforce. So we saw the opportunity to add more funding and also start getting into a tight relationship with both of these players.”

The original plan for Contentful was to focus almost explicitly on mobile. As it turns out, though, the company’s customers also wanted to use the service to handle its web-based applications and these days, Contentful happily supports both. “What we’re seeing is that everything is becoming an application,” he told me. “We started with native mobile application, but even the websites nowadays are often an application.”

In its early days, Contentful focused only on developers. Now, however, that’s changing, and having these connections to large enterprise players like SAP and Salesforce surely isn’t going to hurt the company as it looks to bring on larger enterprise accounts.

Currently, the company’s focus is very much on Europe and North America, which account for about 80 percent of its customers. For now, Contentful plans to continue to focus on these regions, though it obviously supports customers anywhere in the world.

Contentful only exists as a hosted platform. As of now, the company doesn’t have any plans for offering a self-hosted version, though Konietzke noted that he does occasionally get requests for this.

What the company is planning to do in the near future, though, is to enable more integrations with existing enterprise tools. “Customers are asking for deeper integrations into their enterprise stack,” Konietzke said. “And that’s what we’re beginning to focus on and where we’re building a lot of capabilities around that.” In addition, support for GraphQL and an expanded rich text editing experience is coming up. The company also recently launched a new editing experience.

Sep
05
2018
--

Elastic’s IPO filing is here

Elastic, the provider of subscription-based data search software used by Dell, Netflix, The New York Times and others, has unveiled its IPO filing after confidentially submitting paperwork to the SEC in June. The company will be the latest in a line of enterprise SaaS businesses to hit the public markets in 2018.

Headquartered in Mountain View, Elastic plans to raise $100 million in its NYSE listing, though that’s likely a placeholder amount. The timing of the filing suggests the company will transition to the public markets this fall; we’ve reached out to the company for more details. 

Elastic will trade under the symbol ESTC.

The business is known for its core product, an open-source search tool called ElasticSearch. It also offers a range of analytics and visualization tools meant to help businesses organize large data sets, competing directly with companies like Splunk and even Amazon — a name it mentions 14 times in the filing.

Amazon offers some of our open source features as part of its Amazon Web Services offering. As such, Amazon competes with us for potential customers, and while Amazon cannot provide our proprietary software, the pricing of Amazon’s offerings may limit our ability to adjust,” the company wrote in the filing, which also lists Endeca, FAST, Autonomy and several others as key competitors.

This is our first look at Elastic’s financials. The company brought in $159.9 million in revenue in the 12 months ended July 30, 2018, up roughly 100 percent from $88.1 million the year prior. Losses are growing at about the same rate. Elastic reported a net loss of $18.5 million in the second quarter of 2018. That’s an increase from $9.9 million in the same period in 2017.

Founded in 2012, the company has raised about $100 million in venture capital funding, garnering a $700 million valuation the last time it raised VC, which was all the way back in 2014. Its investors include Benchmark, NEA and Future Fund, which each retain a 17.8 percent, 10.2 percent and 8.2 percent pre-IPO stake, respectively.

A flurry of business software companies have opted to go public this year. Domo, a business analytics company based in Utah, went public in June raising $193 million in the process. On top of that, subscription biller Zuora had a positive debut in April in what was a “clear sign post on the road to SaaS maturation,” according to TechCrunch’s Ron Miller. DocuSign and Smartsheet are also recent examples of both high-profile and successful SaaS IPOs.

Apr
21
2018
--

Timescale is leading the next wave of NYC database tech

Data is the lifeblood of the modern corporation, yet acquiring, storing, processing, and analyzing it remains a remarkably challenging and expensive project. Every time data infrastructure finally catches up with the streams of information pouring in, another source and more demanding decision-making makes the existing technology obsolete.

Few cities rely on data the same way as New York City, nor has any other city so shaped the technology that underpins our data infrastructure. Back in the 1960s, banks and accounting firms helped to drive much of the original computation industry with their massive finance applications. Today, that industry has been supplanted by finance and advertising, both of which need to make microsecond decisions based on petabyte datasets and complex statistical models.

Unsurprisingly, the city’s hunger for data has led to waves of database companies finding their home in the city.

As web applications became increasingly popular in the mid-aughts, SQL databases came under increasing strain to scale, while also proving to be inflexible in terms of their data schemas for the fast-moving startups they served. That problem spawned Manhattan-based MongoDB, whose flexible “NoSQL” schemas and horizontal scaling capabilities made it the default choice for a generation of startups. The company would go on to raise $311 million according to Crunchbase, and debuted late last year on NASDAQ, trading today with a market cap of $2 billion.

At the same time that the NoSQL movement was hitting its stride, academic researchers and entrepreneurs were exploring how to evolve SQL to scale like its NoSQL competitors, while retaining the kinds of features (joining tables, transactions) that make SQL so convenient for developers.

One leading company in this next generation of database tech is New York-based Cockroach Labs, which was founded in 2015 by a trio of former Square, Viewfinder, and Google engineers. The company has gone on to raise more than $50 million according to Crunchbase from a luminary list of investors including Peter Fenton at Benchmark, Mike Volpi at Index, and Satish Dharmaraj at Redpoint, along with GV and Sequoia.

While web applications have their own peculiar data needs, the rise of the internet of things (IoT) created a whole new set of data challenges. How can streams of data from potentially millions of devices be stored in an easily analyzable manner? How could companies build real-time systems to respond to that data?

Mike Freedman and Ajay Kulkarni saw that problem increasingly manifesting itself in 2015. The two had been roommates at MIT in the late 90s, and then went on separate paths into academia and industry respectively. Freedman went to Stanford for a PhD in computer science, and nearly joined the spinout of Nicira, which sold to VMware in 2012 for $1.26 billion. Kulkarni joked that “Mike made the financially wise decision of not joining them,” and Freedman eventually went to Princeton as an assistant professor, and was awarded tenure in 2013. Kulkarni founded and worked at a variety of startups including GroupMe, as well as receiving an MBA from MIT.

The two had startup dreams, and tried building an IoT platform. As they started building it though, they realized they would need a real-time database to process the data streams coming in from devices. “There are a lot of time series databases, [so] let’s grab one off the shelf, and then we evaluated a few,” Kulkarni explained. They realized what they needed was a hybrid of SQL and NoSQL, and nothing they could find offered the feature set they required to power their platform. That challenge became the problem to be solved, and Timescale was born.

In many ways, Timescale is how you build a database in 2018. Rather than starting de novo, the team decided to build on top of Postgres, a popular open-source SQL database. “By building on top of Postgres, we became the more reliable option,” Kulkarni said of their thinking. In addition, the company opted to make the database fully open source. “In this day and age, in order to get wide adoption, you have to be an open source database company,” he said.

Since the project’s first public git commit on October 18, 2016, the company’s database has received nearly 4,500 stars on Github, and it has raised $16.1 million from Benchmark and NEA .

Far more important though are their customers, who are definitely not the typical tech startup roster and include companies from oil and gas, mining, and telecommunications. “You don’t think of them as early adopters, but they have a need, and because we built it on top of Postgres, it integrates into an ecosystem that they know,” Freedman explained. Kulkarni continued, “And the problem they have is that they have all of this time series data, and it isn’t sitting in the corner, it is integrated with their core service.”

New York has been a strong home for the two founders. Freedman continues to be a professor at Princeton, where he has built a pipeline of potential grads for the company. More widely, Kulkarni said, “Some of the most experienced people in databases are in the financial industry, and that’s here.” That’s evident in one of their investors, hedge fund Two Sigma. “Two Sigma had been the only venture firm that we talked to that already had built out their own time series database,” Kulkarni noted.

The two also benefit from paying customers. “I think the Bay Area is great for open source adoption, but a lot of Bay Area companies, they develop their own database tech, or they use an open source project and never pay for it,” Kulkarni said. Being in New York has meant closer collaboration with customers, and ultimately more revenues.

Open source plus revenues. It’s the database way, and the next wave of innovation in the NYC enterprise infrastructure ecosystem.

Nov
28
2017
--

Best Practices for Percona XtraDB Cluster on AWS

Percona XtraDB Cluster on AWS 2 small

In this blog post I’ll look at the performance of Percona XtraDB Cluster on AWS using different service instances, and recommend some best practices for maximizing performance.

You can use Percona XtraDB Cluster in AWS environments. We often get questions about how best to deploy it, and how to optimize both performance and spend when doing so. I decided to look into it with some benchmark testing.

For these benchmark tests, I used the following configuration:

  • Region:
    • Availability zones: US East – 1, zones: b, c, d
    • Sysbench 1.0.8
    • ProxySQL 1.4.3
    • 10 tables, 40mln records – ~95GB dataset
    • Percona XtraDB Cluster 5.7.18
    • Amazon Linux AMI

We evaluated different AWS instances to provide the best recommendation to run Percona XtraDB Cluster. We used instances

  • With General Purpose storage volumes, 200GB each
  • With IO provisioned volumes, 200GB, 10000 IOS
  • I3 instances with local attached NVMe storage.

We also used different instance sizes:

Instance vCPU Memory
r4.large 2 15.25
r4.xlarge 4 30.5
r4.2xlarge 8 61
r4.4xlarge 16 122
i3.large 2 15.25
i3.xlarge 4 30.5
i3.2xlarge 8 61
i3.4xlarge 16 122

 

While I3 instances with NVMe storage do not provide the same functionality for handling shared storage and snapshots as General Purpose and IO provisioned volumes, since Percona XtraDB Cluster provides data duplication by itself we think it is still valid to include them in this comparison.

We ran benchmarks in the US East 1 (N. Virginia) Region, and we used different availability zones for each of the Percona XtraDB Cluster zones (mostly zones “b”, “c” and “d”):

Percona XtraDB Cluster on AWS 1

The client was directly connected and used ProxySQL, so we were able to measure ProxySQL’s performance overhead as well.

ProxySQL is an advanced method to access Percona XtraDB Cluster. It can perform a health check of the nodes and route the traffic to the ONLINE node. It can also split read and write traffic and route read traffic to different nodes (although we didn’t use this capability in our benchmark).

In our benchmarks, we used 1,4, 16, 64 and 256 user threads. For this detailed review, however, we’ll look at the 64 thread case. 

Results

First, let’s review the average throughput (higher is better) and latency (lower is better) results (we measured 99% percentile with one-second resolution):

Percona XtraDB Cluster on AWS 2

Results summary, raw performance:

The performance for Percona XtraDB Cluster running on GP2 volumes is often pretty slow, so it is hard to recommend this volume type for the serious workloads.

IO provisioned volumes perform much better, and should be considered as the primary target for Percona XtraDB Cluster deployments. I3 instances show even better performance.

I3 instances use locally attached volumes and do not provide equal functionality as EBS IO provisioned volumes — although some of these limitations are covered by Percona XtraDB Cluster’s ability to keep copies of data on each node.

Results summary for jitter:

Along with average throughput and latency, it is important to take into account “jitter” — how stable is the performance during the runs?

Percona XtraDB Cluster on AWS 3

Latency variation for GP2 volumes is significant — practically not acceptable for serious usage. Let’s review the latency for only IO provisioning and NVMe volumes. The following chart provides better scale for just these two:

Percona XtraDB Cluster on AWS 4

At this scale, we see that NVMe provides a 99% better response time and is more stable. There is still variation for IO provisioned volumes.

Results summary, cost

When speaking about instance and volume types, it would be impractical to avoid mentioning of the instance costs. We need to analyze how much we need to pay to achieve the better performance. So we prepared data how much does it cost to produce throughput of 1000 transactions per second.

We compare on-demand and reserved instances pricing (reserved for one year / all upfront / tenancy-default):

Percona XtraDB Cluster on AWS 5

Because IO provisioned instances give much better performance, the price performance is comparable if not better than GP2 instances.

I3 instances are a clear winner.

It is also interesting to compare the raw cost of benchmarked instances:

Percona XtraDB Cluster on AWS 6

We can see that IO provisioned instances are the most expensive, and using reserved instances does not provide much savings. To understand the reason for this, let’s take a look at how cost is calculated for components:

Percona XtraDB Cluster on AWS 7

So for IO provisioned volumes, the majority of the cost comes from IO provisioning (which is the same for both on-demand and reserved instances).

Percona XtraDB Cluster scalability

Another interesting effort is looking at how Percona XtraDB Cluster performance scales with the instance size. As we double resources (both CPU and Memory) while increasing the instance size, how does it affect Percona XtraDB Cluster?

So let’s take a look at throughput:

Percona XtraDB Cluster on AWS 8

Throughput improves with increasing the instance size. Let’s calculate speedup with increasing instance size for IO provisioned and I3 instances:

Speedup X Times to Large Instance IO1 i3
large 1 1
xlarge 2.67 2.11
2xlarge 5.38 4.31
4xlarge 5.96 7.83

 

Percona XtraDB Cluster can scale (improve performance) with increasing instance size. Keep in mind, however, that it depends significantly on the workload. You may not get the same performance speedup as in this benchmark.

ProxySQL overhead

As mentioned above, ProxySQL adds additional functionality to the cluster. It can also add overhead, however. We would like to understand the expected performance penalty, so we compared the throughput and latency with and without ProxySQL.

Out of box, the ProxySQL performance was not great and required additional tuning. 

ProxySQL specific configuration:

  • Use connection through TCP-IP address, not through local socket
  • Adjust  mysql-max_stmts_per_connection variable for optimal value (default:50) – optimal – 1000
  • Ensure that “monitor@<host>” user has permissions as it’s important for proper handling of prepared statement.
    • CREATE USER ‘monitor’@‘172.30.%.%’ IDENTIFIED BY ‘monitor’;

Throughput:

Percona XtraDB Cluster on AWS 9

Response time:

Percona XtraDB Cluster on AWS 10

ProxySQL performance penalty in throughput

ProxySQL performance penalty IO1 i3
large 0.97 0.98
xlarge 1.03 0.97
2xlarge 0.95 0.95
4xlarge 0.96 0.93

 

It appears that ProxySQL adds 3-7% overhead. I wouldn’t consider this a significant penalty for additional functionality.

Summary

Amazon instances

First, the results show that instances based on General Purpose volumes do not provide acceptable performance and should be avoided in general for serious production usage. The choice is between IO provisioned instances and NVMe based instances.

IO provisioned instances are more expensive, but offer much better performance than General Purpose volumes. If we also look at price/performance metric, IO provisioned volumes are comparable with General Purpose volumes. You should use IO provisioned volumes if you are looking for the functionality provided by EBS volumes.

If you do not need EBS volumes, however, then i3 instances with NVMe volumes are a better choice. Both are cheaper and provide better performance than IO provisioned instances. Percona XtraDB Cluster provides data duplication on its own, which mitigates the need for EBS volumes to some extent.

ProxySQL overhead

We recommend using Percona XtraDB Cluster in combination with ProxySQL, as ProxySQL provides additional management and routing functionality. In general, the overhead for ProxySQL is not significant. But in our experience, however, ProxySQL has to be properly tuned — otherwise the performance penalty could be a bottleneck.

Percona XtraDB Cluster scalability

AWS has great capability to increase the instance size (both CPU and memory) if we exceed the capacity of the current instance. From our experiments, we see that Percona XtraDB Cluster can scale along with and benefit from increased instance size.

Below is a chart showing the speedup in relation to the instance size:

Percona XtraDB Cluster on AWS 11

So increasing the instance size is a feasible strategy for improving Percona XtraDB Cluster performance in an AWS environment.

Thanks for reading this benchmark! Put any questions or thoughts in the comments below.

Jul
12
2017
--

Gh-ost benchmark against pt-online-schema-change performance

gh-ost-benchmark

In this blog post, I will run a gh-ost benchmark against the performance of pt-online-schema-change.

When gh-ost came out, I was very excited. As MySQL ROW replication became commonplace, you could use it to track changes instead of triggers. This practice is cleaner and safer compared to Percona Toolkit’s pt-online-schema-change. Since gh-ost doesn’t need triggers, I assumed it would generate lower overhead and work faster. I frequently called it “pt-online-schema-change on steroids” in my talks. Finally, I’ve found some time to check my theoretical claims with some benchmarks.

DISCLAIMER: These benchmarks correspond to one specific ALTER TABLE on the table of one specific structure and hardware configuration. I have not set up a broad set of tests. If you have other results – please comment!

Benchmark Setup Details

  • pt-online-schema-change from Percona Toolkit 3.0.3
  • gh-ost 1.0.36
  • Percona Server 5.7.18 on Ubuntu 16.04 LTS
  • Hardware: 28CPU cores/56 Threads.  128GB Memory.   Samsung 960 Pro 512GB
  • Sysbench 1.0.7

Prepare the table by running:

sysbench --threads=40 --rate=0 --report-interval=1 --percentile=99 --events=0 --time=0 --db-ps-mode=auto --mysql-user=sbtest --mysql-password=sbtest  /usr/share/sysbench/oltp_read_write.lua --table_size=10000000 prepare

The table size is about 3GB (completely fitting to innodb_buffer_pool).

Run the benchmark in “full ACID” mode with:

  • sync_binlog=1
  • innodb_flush_log_at_trx_commit=1
  • innodb_doublewrite=1

This is important as this workload is heavily commit-bound, and extensively relies on group commit.

This is the pt-online-schema-change command to alter table:

time pt-online-schema-change --execute --alter "ADD COLUMN c1 INT" D=sbtest,t=sbtest1

This the gh-ost command to alter table:

time ./gh-ost  --user="sbtest" --password="sbtest" --host=localhost --allow-on-master --database="sbtest" --table="sbtest1"  --alter="ADD COLUMN c1 INT" --execute

Tests Details

For each test the old sysbench table was dropped and a new one prepared. I tested alter table in three different cases:

  • When nothing else was running (“Idle Load”)   
  • When the system handled about 2% of load it can handle at full capacity (“Light Background Load”)
  • When the system handled about 40% of the possible load, with sysbench injected about 25% of the transactions/sec the system could handle at full load (“Heavy Background Load”)

I measured the alter table completion times for all cases, as well as the overhead generated by the alter (in other words, how much peak throughput is reduced by running alter table through the tools).

Idle Load

gh-ost benchmark 1

For the Idle Load test, pt-online-schema-change completed nearly twice as fast as gh-ost. This was a big surprise for me. I haven’t looked into the reasons or details yet, though I can see most of the CPU usage for gh-ost is on the MySQL server side. Perhaps the differences relate to the SQL used to perform non-blocking alter tables.

Light Background Load

I generated the Light Background Load by running the sysbench command below. It corresponds to a roughly 4% load, as the system can handle some 2500 transactions/sec at this concurrency under full load. Adjust the 

--rate

 value to scale it for your system.

time sysbench --threads=40 --rate=100 --report-interval=1 --percentile=99 --events=0 --time=0 --db-ps-mode=auto --mysql-user=sbtest --mysql-password=sbtest  /usr/share/sysbench/oltp_read_write.lua --table_size=10000000 run

gh-ost benchmark 2

The numbers changed (as expected), but pt-online-schema-change is still approximately twice as fast as gh-ost.

What is really interesting in this case is how a relatively light background load affects the process completion time. It took both pt-online-schema-change and gh-ost about 2.7x times longer to finish! 

Heavy Background Load

I generated the Heavy Background Load running the sysbench command below. It corresponds to a roughly 40% load, as the system can handle some 2500 transactions/sec at this concurrency under full load. Adjust

--rate

 value to scale it for your system.

time sysbench --threads=40 --rate=1000 --report-interval=1 --percentile=99 --events=0 --time=0 --db-ps-mode=auto --mysql-user=sbtest --mysql-password=sbtest  /usr/share/sysbench/oltp_read_write.lua --table_size=10000000 run

gh-ost benchmark 3

What happened in this case? When the load gets higher, gh-ost can’t keep up with binary log processing, and just never finishes at all. While this may be surprising at first, it makes sense if you think more about how these tools work. pt-online-schema-change uses triggers, and while they have a lot of limitations and overhead they can execute in parallel. gh-ost, on the other hand, processes the binary log in a single thread and might not be able to keep up.   

In MySQL 5.6 we didn’t have parallel replication, which applies writes to the same table in parallel. For that version the gh-ost limitation probably isn’t as big a deal, as such a heavy load would also cause replication lag. MySQL 5.7 has parallel replication. This makes it much easier to quickly replicate workloads that are too heavy for gh-ost to handle.

I should note that the workload being simulated in this benchmark is a rather extreme case. The table being altered by gh-ost here is at the same time handling a background load so high it can’t be replicated in a single thread.

Future versions of gh-ost could improve this issue by applying binlog events in parallel, similar to what MySQL replicas do.

An excerpt from the gh-ost log shows how it is totally backed up trying to apply the binary log:

root@rocky:/tmp# time ./gh-ost  --user="sbtest" --password="sbtest" --host=localhost --allow-on-master --database="sbtest" --table="sbtest1"  --alter="ADD COLUMN c1 INT" --execute
2017/06/25 19:16:05 binlogsyncer.go:75: [info] create BinlogSyncer with config &{99999 mysql localhost 3306 sbtest sbtest  false false <nil>}
2017/06/25 19:16:05 binlogsyncer.go:241: [info] begin to sync binlog from position (rocky-bin.000018, 640881773)
2017/06/25 19:16:05 binlogsyncer.go:134: [info] register slave for master server localhost:3306
2017/06/25 19:16:05 binlogsyncer.go:568: [info] rotate to (rocky-bin.000018, 640881773)
2017-06-25 19:16:05 ERROR parsing time "" as "2006-01-02T15:04:05.999999999Z07:00": cannot parse "" as "2006"
# Migrating `sbtest`.`sbtest1`; Ghost table is `sbtest`.`_sbtest1_gho`
# Migrating rocky:3306; inspecting rocky:3306; executing on rocky
# Migration started at Sun Jun 25 19:16:05 -0400 2017
# chunk-size: 1000; max-lag-millis: 1500ms; max-load: ; critical-load: ; nice-ratio: 0.000000
# throttle-additional-flag-file: /tmp/gh-ost.throttle
# Serving on unix socket: /tmp/gh-ost.sbtest.sbtest1.sock
Copy: 0/9872432 0.0%; Applied: 0; Backlog: 0/100; Time: 0s(total), 0s(copy); streamer: rocky-bin.000018:641578191; State: migrating; ETA: N/A
Copy: 0/9872432 0.0%; Applied: 0; Backlog: 100/100; Time: 1s(total), 1s(copy); streamer: rocky-bin.000018:641626699; State: migrating; ETA: N/A
Copy: 0/9872432 0.0%; Applied: 640; Backlog: 100/100; Time: 2s(total), 2s(copy); streamer: rocky-bin.000018:641896215; State: migrating; ETA: N/A
Copy: 0/9872432 0.0%; Applied: 1310; Backlog: 100/100; Time: 3s(total), 3s(copy); streamer: rocky-bin.000018:642178659; State: migrating; ETA: N/A
Copy: 0/9872432 0.0%; Applied: 1920; Backlog: 100/100; Time: 4s(total), 4s(copy); streamer: rocky-bin.000018:642436043; State: migrating; ETA: N/A
Copy: 0/9872432 0.0%; Applied: 2600; Backlog: 100/100; Time: 5s(total), 5s(copy); streamer: rocky-bin.000018:642722777; State:
...
Copy: 0/9872432 0.0%; Applied: 120240; Backlog: 100/100; Time: 3m0s(total), 3m0s(copy); streamer: rocky-bin.000018:694142377; State: migrating; ETA: N/A
Copy: 0/9872432 0.0%; Applied: 140330; Backlog: 100/100; Time: 3m30s(total), 3m30s(copy); streamer: rocky-bin.000018:702948219; State: migrating; ETA: N/A
Copy: 0/9872432 0.0%; Applied: 160450; Backlog: 100/100; Time: 4m0s(total), 4m0s(copy); streamer: rocky-bin.000018:711775662; State: migrating; ETA: N/A
Copy: 0/9872432 0.0%; Applied: 180600; Backlog: 100/100; Time: 4m30s(total), 4m30s(copy); streamer: rocky-bin.000018:720626338; State: migrating; ETA: N/A
Copy: 0/9872432 0.0%; Applied: 200770; Backlog: 100/100; Time: 5m0s(total), 5m0s(copy); streamer: rocky-bin.000018:729509960; State: migrating; ETA: N/A

Online Schema Change Performance Impact

For this test I started the alter table, waited 60 seconds and then ran sysbench at full speed for five minutes. Then I measured how much the performance was impacted by running the tool:

sysbench --threads=40 --rate=0 --report-interval=1 --percentile=99 --events=0 --time=300 --db-ps-mode=auto --mysql-user=sbtest --mysql-password=sbtest  /usr/share/sysbench/oltp_read_write.lua --table_size=10000000 run

gh-ost benchmark 4

As we can see, gh-ost has negligible overhead in this case. pt-online-schema-change on the other hand, had peformance reduced by 12%. It is worth noting though that pt-online-schema-change still makes progress in this case (though slowly), while gh-ost would never complete.

If anything, I was surprised at how little impact the pt-online-schema-change run had on sysbench performance.

It’s important to note that in this case we only measured the overhead for the “copy” stage of the online schema change. Another thing you should worry about is the impact to performance during “table rotation” (which I have not measured).

Summary

While gh-ost introduces a number of design advantages, and gives better results in some situation, I wouldn’t call it always superior the tried and true pt-online-schema-change. At least in some cases, pt-online-schema-change offers better performance than gh-ost and completes a schema change when gh-ost is unable to keep up. Consider trying out both tools and see what works best in your situation.

Jun
22
2017
--

ClickHouse in a General Analytical Workload (Based on a Star Schema Benchmark)

ClickHouse

ClickHouseIn this blog post, we’ll look at how ClickHouse performs in a general analytical workload using the star schema benchmark test.

We have mentioned ClickHouse in some recent posts (ClickHouse: New Open Source Columnar Database, Column Store Database Benchmarks: MariaDB ColumnStore vs. Clickhouse vs. Apache Spark), where it showed excellent results. ClickHouse by itself seems to be event-oriented RDBMS, as its name suggests (clicks). Its primary purpose, using Yandex Metrica (the system similar to Google Analytics), also points to an event-based nature. We also can see there is a requirement for date-stamped columns.

It is possible, however, to use ClickHouse in a general analytical workload. This blog post shares my findings. For these tests, I used a Star Schema benchmark — slightly-modified so that able to handle ClickHouse specifics.

First, let’s talk about schemas. We need to adjust to ClickHouse data types. For example, the biggest fact table in SSB is “lineorder”. Below is how it is defined for Amazon RedShift (as taken from https://docs.aws.amazon.com/redshift/latest/dg/tutorial-tuning-tables-create-test-data.html):

CREATE TABLE lineorder
(
  lo_orderkey          INTEGER NOT NULL,
  lo_linenumber        INTEGER NOT NULL,
  lo_custkey           INTEGER NOT NULL,
  lo_partkey           INTEGER NOT NULL,
  lo_suppkey           INTEGER NOT NULL,
  lo_orderdate         INTEGER NOT NULL,
  lo_orderpriority     VARCHAR(15) NOT NULL,
  lo_shippriority      VARCHAR(1) NOT NULL,
  lo_quantity          INTEGER NOT NULL,
  lo_extendedprice     INTEGER NOT NULL,
  lo_ordertotalprice   INTEGER NOT NULL,
  lo_discount          INTEGER NOT NULL,
  lo_revenue           INTEGER NOT NULL,
  lo_supplycost        INTEGER NOT NULL,
  lo_tax               INTEGER NOT NULL,
  lo_commitdate        INTEGER NOT NULL,
  lo_shipmode          VARCHAR(10) NOT NULL
);

For ClickHouse, the table definition looks like this:

CREATE TABLE lineorderfull (
        LO_ORDERKEY             UInt32,
        LO_LINENUMBER           UInt8,
        LO_CUSTKEY              UInt32,
        LO_PARTKEY              UInt32,
        LO_SUPPKEY              UInt32,
        LO_ORDERDATE            Date,
        LO_ORDERPRIORITY        String,
        LO_SHIPPRIORITY         UInt8,
        LO_QUANTITY             UInt8,
        LO_EXTENDEDPRICE        UInt32,
        LO_ORDTOTALPRICE        UInt32,
        LO_DISCOUNT             UInt8,
        LO_REVENUE              UInt32,
        LO_SUPPLYCOST           UInt32,
        LO_TAX                  UInt8,
        LO_COMMITDATE           Date,
        LO_SHIPMODE             String
)Engine=MergeTree(LO_ORDERDATE,(LO_ORDERKEY,LO_LINENUMBER),8192);

From this we can see we need to use datatypes like UInt8 and UInt32, which are somewhat unusual for database world datatypes.

The second table (RedShift definition):

CREATE TABLE customer
(
  c_custkey      INTEGER NOT NULL,
  c_name         VARCHAR(25) NOT NULL,
  c_address      VARCHAR(25) NOT NULL,
  c_city         VARCHAR(10) NOT NULL,
  c_nation       VARCHAR(15) NOT NULL,
  c_region       VARCHAR(12) NOT NULL,
  c_phone        VARCHAR(15) NOT NULL,
  c_mktsegment   VARCHAR(10) NOT NULL
);

For ClickHouse, I defined as:

CREATE TABLE customerfull (
        C_CUSTKEY       UInt32,
        C_NAME          String,
        C_ADDRESS       String,
        C_CITY          String,
        C_NATION        String,
        C_REGION        String,
        C_PHONE         String,
        C_MKTSEGMENT    String,
        C_FAKEDATE      Date
)Engine=MergeTree(C_FAKEDATE,(C_CUSTKEY),8192);

For reference, the full schema for the benchmark is here: https://github.com/vadimtk/ssb-clickhouse/blob/master/create.sql.

For this table, we need to define a rudimentary column C_FAKEDATE Date in order to use ClickHouse’s most advanced engine (MergeTree). I was told by the ClickHouse team that they plan to remove this limitation in the future.

To generate data acceptable by ClickHouse, I made modifications to ssb-dbgen. You can find my version here: https://github.com/vadimtk/ssb-dbgen. The most notable change is that ClickHouse can’t accept dates in CSV files formatted as “19971125”. It has to be “1997-11-25”. This is something to keep in mind when loading data into ClickHouse.

It is possible to do some preformating on the load, but I don’t have experience with that. A common approach is to create the staging table with datatypes that match loaded data, and then convert them using SQL functions when inserting to the main table.

Hardware Setup

One of the goals of this benchmark to see how ClickHouse scales on multiple nodes. I used a setup of one node, and then compared to a setup of three nodes. Each node is 24 cores of “Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz” CPUs, and the data is located on a very fast PCIe Flash storage.

For the SSB benchmark I use a scale factor of 2500, which provides (in raw data):

Table lineorder – 15 bln rows, raw size 1.7TB, Table customer – 75 mln rows

When loaded into ClickHouse, the table lineorder takes 464GB, which corresponds to a 3.7x compression ratio.

We compare a one-node (table names lineorderfull, customerfull) setup vs. a three-node (table names lineorderd, customerd) setup.

Single Table Operations

Query:

SELECT
    toYear(LO_ORDERDATE) AS yod,
    sum(LO_REVENUE)
FROM lineorderfull
GROUP BY yod

One node:

7 rows in set. Elapsed: 9.741 sec. Processed 15.00 billion rows, 90.00 GB (1.54 billion rows/s., 9.24 GB/s.)

Three nodes:

7 rows in set. Elapsed: 3.258 sec. Processed 15.00 billion rows, 90.00 GB (4.60 billion rows/s., 27.63 GB/s.)

We see a speed up of practically three times. Handling 4.6 billion rows/s is blazingly fast!

One Table with Filtering

SELECT sum(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorderfull
WHERE (toYear(LO_ORDERDATE) = 1993) AND ((LO_DISCOUNT >= 1) AND (LO_DISCOUNT <= 3)) AND (LO_QUANTITY < 25)

One node:

1 rows in set. Elapsed: 3.175 sec. Processed 2.28 billion rows, 18.20 GB (716.60 million rows/s., 5.73 GB/s.)

Three nodes:

1 rows in set. Elapsed: 1.295 sec. Processed 2.28 billion rows, 18.20 GB (1.76 billion rows/s., 14.06 GB/s.)

It’s worth mentioning that during the execution of this query, ClickHouse was able to use ALL 24 cores on each box. This confirms that ClickHouse is a massively parallel processing system.

Two Tables (Independent Subquery)

In this case, I want to show how Clickhouse handles independent subqueries:

SELECT sum(LO_REVENUE)
FROM lineorderfull
WHERE LO_CUSTKEY IN
(
    SELECT C_CUSTKEY AS LO_CUSTKEY
    FROM customerfull
    WHERE C_REGION = 'ASIA'
)

One node:

1 rows in set. Elapsed: 28.934 sec. Processed 15.00 billion rows, 120.00 GB (518.43 million rows/s., 4.15 GB/s.)

Three nodes:

1 rows in set. Elapsed: 14.189 sec. Processed 15.12 billion rows, 121.67 GB (1.07 billion rows/s., 8.57 GB/s.)

We  do not see, however, the close to 3x speedup on three nodes, because of the required data transfer to perform the match LO_CUSTKEY with C_CUSTKEY

Two Tables JOIN

With a subquery using columns to return results, or for GROUP BY, things get more complicated. In this case we want to GROUP BY the column from the second table.

First, ClickHouse doesn’t support traditional subquery syntax, so we need to use JOIN. For JOINs, ClickHouse also strictly prescribes how it must be written (a limitation that will also get changed in the future). Our JOIN should look like:

SELECT
    C_REGION,
    sum(LO_EXTENDEDPRICE * LO_DISCOUNT)
FROM lineorderfull
ANY INNER JOIN
(
    SELECT
        C_REGION,
        C_CUSTKEY AS LO_CUSTKEY
    FROM customerfull
) USING (LO_CUSTKEY)
WHERE (toYear(LO_ORDERDATE) = 1993) AND ((LO_DISCOUNT >= 1) AND (LO_DISCOUNT <= 3)) AND (LO_QUANTITY < 25)
GROUP BY C_REGION

One node:

5 rows in set. Elapsed: 31.443 sec. Processed 2.35 billion rows, 28.79 GB (74.75 million rows/s., 915.65 MB/s.)

Three nodes:

5 rows in set. Elapsed: 25.160 sec. Processed 2.58 billion rows, 33.25 GB (102.36 million rows/s., 1.32 GB/s.)

In this case the speedup is not even two times. This corresponds to the fact of the random data distribution for the tables lineorderd and customerd. Both tables were defines as:

CREATE TABLE lineorderd AS lineorder ENGINE = Distributed(3shards, default, lineorder, rand());
CREATE TABLE customerd AS customer ENGINE = Distributed(3shards, default, customer, rand());

Where  rand() defines that records are distributed randomly across three nodes. When we perform a JOIN by LO_CUSTKEY=C_CUSTKEY, records might be located on different nodes. One way to deal with this is to define data locally. For example:

CREATE TABLE lineorderLD AS lineorderL ENGINE = Distributed(3shards, default, lineorderL, LO_CUSTKEY);
CREATE TABLE customerLD AS customerL ENGINE = Distributed(3shards, default, customerL, C_CUSTKEY);

Three Tables JOIN

This is where it becomes very complicated. Let’s consider the query that you would normally write:

SELECT sum(LO_REVENUE),P_MFGR, toYear(LO_ORDERDATE) yod FROM lineorderfull ,customerfull,partfull WHERE C_REGION = 'ASIA' and
LO_CUSTKEY=C_CUSTKEY and P_PARTKEY=LO_PARTKEY GROUP BY P_MFGR,yod ORDER BY P_MFGR,yod;

With Clickhouse’s limitations on JOINs syntax, the query becomes:

SELECT
    sum(LO_REVENUE),
    P_MFGR,
    toYear(LO_ORDERDATE) AS yod
FROM
(
    SELECT
        LO_PARTKEY,
        LO_ORDERDATE,
        LO_REVENUE
    FROM lineorderfull
    ALL INNER JOIN
    (
        SELECT
            C_REGION,
            C_CUSTKEY AS LO_CUSTKEY
        FROM customerfull
    ) USING (LO_CUSTKEY)
    WHERE C_REGION = 'ASIA'
)
ALL INNER JOIN
(
    SELECT
        P_MFGR,
        P_PARTKEY AS LO_PARTKEY
    FROM partfull
) USING (LO_PARTKEY)
GROUP BY
    P_MFGR,
    yod
ORDER BY
    P_MFGR ASC,
    yod ASC

By writing queries this way, we force ClickHouse to use the prescribed JOIN order — at this moment there is no optimizer in ClickHouse and it is totally unaware of data distribution.

There is also not much speedup when we compare one node vs. three nodes:

One node execution time:

35 rows in set. Elapsed: 697.806 sec. Processed 15.08 billion rows, 211.53 GB (21.61 million rows/s., 303.14 MB/s.)

Three nodes execution time:

35 rows in set. Elapsed: 622.536 sec. Processed 15.12 billion rows, 211.71 GB (24.29 million rows/s., 340.08 MB/s.)

There is a way to make the query faster for this 3-way JOIN, however. (Thanks to Alexander Zaytsev from https://www.altinity.com/ for help!)

Optimized query:

SELECT
    sum(revenue),
    P_MFGR,
    yod
FROM
(
    SELECT
        LO_PARTKEY AS P_PARTKEY,
        toYear(LO_ORDERDATE) AS yod,
        SUM(LO_REVENUE) AS revenue
    FROM lineorderfull
    WHERE LO_CUSTKEY IN
    (
        SELECT C_CUSTKEY
        FROM customerfull
        WHERE C_REGION = 'ASIA'
    )
    GROUP BY
        P_PARTKEY,
        yod
)
ANY INNER JOIN partfull USING (P_PARTKEY)
GROUP BY
    P_MFGR,
    yod
ORDER BY
    P_MFGR ASC,
    yod ASC

Optimized query time:

One node:

35 rows in set. Elapsed: 106.732 sec. Processed 15.00 billion rows, 210.05 GB (140.56 million rows/s., 1.97 GB/s.)

Three nodes:

35 rows in set. Elapsed: 75.854 sec. Processed 15.12 billion rows, 211.71 GB (199.36 million rows/s., 2.79 GB/s.

That’s an improvement of about 6.5 times compared to the original query. This shows the importance of understanding data distribution, and writing the optimal query to process the data.

Another option for dealing with JOIN complexity, and to improve performance, is to use ClickHouse’s dictionaries. These dictionaries are described here: https://www.altinity.com/blog/2017/4/12/dictionaries-explained.

I will review dictionary performance in future posts.

Another traditional way to deal with JOIN complexity in an analytics workload is to use denormalization. We can move some columns (for example, P_MFGR from the last query) to the facts table (lineorder).

Observations

  • ClickHouse can handle general analytical queries (it requires special schema design and considerations, however)
  • Linear speedup is possible, but it depends on query design and requires advanced planning — proper speedup depends on data locality
  • ClickHouse is blazingly fast (beyond what I’ve seen before) because it can use all available CPU cores for query, as shown above using 24 cores for single server and 72 cores for three nodes
  • Multi-table JOINs are cumbersome and require manual work to achieve better performance, so consider using dictionaries or denormalization
May
10
2017
--

Cockroach Labs announces $27M Series B and enterprise tier for its reliable database

 “A database that replicates itself and is meant to survive” — that was the connection that Cockroach Labs CEO Spencer Kimball made between the startup’s memorable name and its value proposition. Despite entering a crowded market, Cockroach has been able to gain the favor of some of the best known VCs in tech. Today’s $27 million Series B is being led by… Read More

Mar
17
2017
--

Column Store Database Benchmarks: MariaDB ColumnStore vs. Clickhouse vs. Apache Spark

Column Store Database

This blog shares some column store database benchmark results, and compares the query performance of MariaDB ColumnStore v. 1.0.7 (based on InfiniDB), Clickhouse and Apache Spark.

I’ve already written about ClickHouse (Column Store database).

The purpose of the benchmark is to see how these three solutions work on a single big server, with many CPU cores and large amounts of RAM. Both systems are massively parallel (MPP) database systems, so they should use many cores for SELECT queries.

For the benchmarks, I chose three datasets:

  1. Wikipedia page Counts, loaded full with the year 2008, ~26 billion rows
  2. Query analytics data from Percona Monitoring and Management
  3. Online shop orders

This blog post shares the results for the Wikipedia page counts (same queries as for the Clickhouse benchmark). In the following posts I will use other datasets to compare the performance.

Databases, Versions and Storage Engines Tested

  • MariaDB ColumnStore v. 1.0.7, ColumnStore storage engine
  • Yandex ClickHouse v. 1.1.54164, MergeTree storage engine
  • Apache Spark v. 2.1.0, Parquet files and ORC files

Although all of the above solutions can run in a “cluster” mode (with multiple nodes), I’ve only used one server.

Hardware

This time I’m using newer and faster hardware:

  • CPU: physical = 2, cores = 32, virtual = 64, hyperthreading = yes
  • RAM: 256Gb
  • Disk: Samsung SSD 960 PRO 1TB, NVMe card

Data Sizes

I’ve loaded the above data into Clickhouse, ColumnStore and MySQL (for MySQL the data included a primary key; Wikistat was not loaded to MySQL due to the size). MySQL tables are InnoDB with a primary key.

Dataset Size (GB) Column Store Clickhouse MySQL Spark / Parquet Spark / ORC file
Wikistat 374.24 Gb 211.3 Gb n/a (> 2 Tb) 395 Gb 273 Gb
Query metrics 61.23 Gb 28.35 Gb 520 Gb
Store Orders 9.3 Gb 4.01 Gb 46.55 Gb

 

Query Performance

Wikipedia page counts queries

Test type (warm) Spark Clickhouse ColumnStore
Query 1: count(*) 5.37 2.14 30.77
Query 2: group by month 205.75 16.36 259.09
Query 3: top 100 wiki pages by hits (group by path) 750.35 171.22 1640.7

Test type (cold) Spark Clickhouse ColumnStore
Query 1: count(*) 21.93 8.01 139.01
Query 2: group by month 217.88 16.65 420.77
Query 3: top 100 wiki pages by hits (group by path) 887.434 182.56 1703.19


Partitioning and Primary Keys

All of the solutions have the ability to take advantage of data “partitioning,” and only scan needed rows.

Clickhouse has “primary keys” (for the MergeTree storage engine) and scans only the needed chunks of data (similar to partition “pruning” in MySQL). No changes to SQL or table definitions is needed when working with Clickhouse.

Clickhouse example:

:) select count(*), toMonth(date) as mon
:-] from wikistat where toYear(date)=2008
:-] and toMonth(date) = 1
:-] group by mon
:-] order by mon;
SELECT
    count(*),
    toMonth(date) AS mon
FROM wikistat
WHERE (toYear(date) = 2008) AND (toMonth(date) = 1)
GROUP BY mon
ORDER BY mon ASC
?????count()???mon??
? 2077594099 ?   1 ?
????????????????????
1 rows in set. Elapsed: 0.787 sec. Processed 2.08 billion rows, 4.16 GB (2.64 billion rows/s., 5.28 GB/s.)
:) select count(*), toMonth(date) as mon from wikistat where toYear(date)=2008 and toMonth(date) between 1 and 10 group by mon order by mon;
SELECT
    count(*),
    toMonth(date) AS mon
FROM wikistat
WHERE (toYear(date) = 2008) AND ((toMonth(date) >= 1) AND (toMonth(date) <= 10))
GROUP BY mon
ORDER BY mon ASC
?????count()???mon??
? 2077594099 ?   1 ?
? 1969757069 ?   2 ?
? 2081371530 ?   3 ?
? 2156878512 ?   4 ?
? 2476890621 ?   5 ?
? 2526662896 ?   6 ?
? 2460873213 ?   7 ?
? 2480356358 ?   8 ?
? 2522746544 ?   9 ?
? 2614372352 ?  10 ?
????????????????????
10 rows in set. Elapsed: 13.426 sec. Processed 23.37 billion rows, 46.74 GB (1.74 billion rows/s., 3.48 GB/s.)

As we can see here, ClickHouse has processed ~two billion rows for one month of data, and ~23 billion rows for ten months of data. Queries that only select one month of data are much faster.

For ColumnStore we need to re-write the SQL query and use “between ‘2008-01-01’ and 2008-01-10′” so it can take advantage of partition elimination (as long as the data is loaded in approximate time order). When using functions (i.e., year(dt) or month(dt)), the current implementation does not use this optimization. (This is similar to MySQL, in that if the WHERE clause has month(dt) or any other functions, MySQL can’t use an index on the dt field.)

ColumnStore example:

MariaDB [wikistat]> select count(*), month(date) as mon
    -> from wikistat where year(date)=2008
    -> and month(date) = 1
    -> group by mon
    -> order by mon;
+------------+------+
| count(*)   | mon  |
+------------+------+
| 2077594099 |    1 |
+------------+------+
1 row in set (2 min 12.34 sec)
MariaDB [wikistat]> select count(*), month(date) as mon
from wikistat
where date between '2008-01-01' and '2008-01-31'
group by mon order by mon;
+------------+------+
| count(*)   | mon  |
+------------+------+
| 2077594099 |    1 |
+------------+------+
1 row in set (12.46 sec)

Apache Spark does have partitioning however. It requires the use of partitioning with parquet format in the table definition. Without declaring partitions, even the modified query (“select count(*), month(date) as mon from wikistat where date between ‘2008-01-01’ and ‘2008-01-31’ group by mon order by mon”) will have to scan all the data.

The following table and graph shows the performance of the updated query:

Test type / updated query Spark Clickhouse ColumnStore
group by month, one month, updated syntax 205.75 0.93 12.46
group by month, ten months, updated syntax 205.75 8.84 170.81

 

Working with Large Datasets

With 1Tb uncompressed data, doing a “GROUP BY” requires lots of memory to store the intermediate results (unlike MySQL, ColumnStore, Clickhouse and Apache Spark use hash tables to store groups by “buckets”). For example, this query requires a very large hash table:

SELECT
path,
count(*),
sum(hits) AS sum_hits,
round(sum(hits) / count(*), 2) AS hit_ratio
FROM wikistat
WHERE project = 'en'
GROUP BY path
ORDER BY sum_hits DESC
LIMIT 100

As “path” is actually a URL (without the hostname), it takes a lot of memory to store the intermediate results (hash table) for GROUP BY.

MariaDB ColumnStore does not allow us to “spill” data on disk for now (only disk-based joins are implemented). If you need to GROUP BY on a large text field, you can decrease the disk block cache setting in Columnstore.xml (i.e., set disk cache to 10% of RAM) to make room for an intermediate GROUP BY:

<DBBC>
                <!-- The percentage of RAM to use for the disk block cache. Defaults to 86% -->
                <NumBlocksPct>10</NumBlocksPct>

In addition, as the query has an ORDER BY, we need to

increase max_length_for_sort_data

 in MySQL:

ERROR 1815 (HY000): Internal error: IDB-2015: Sorting length exceeded. Session variable max_length_for_sort_data needs to be set higher.
mysql> set global max_length_for_sort_data=8*1024*1024;

SQL Support

SQL Spark* Clickhouse ColumnStore
INSERT … VALUES ? yes ? yes ? yes
INSERT SELECT / BULK INSERT ? yes ? yes ? yes
UPDATE ? no ? no ? yes
DELETE ? no ? no ? yes
ALTER … ADD/DROP/MODIFY COLUMN ? no ? yes ? yes
ALTER … change paritions ? yes ? yes ? yes
SELECT with WINDOW functions ? yes ? no ? yes

 

*Spark does not support UPDATE/DELETE. However, Hive supports ACID transactions with UPDATE and DELETE statements. BEGIN, COMMIT, and ROLLBACK are not yet supported (only the ORC file format is supported).

ColumnStore is the only database out of the three that supports a full set of DML and DDL (almost all of the MySQL’s implementation of SQL is supported).

Comparing ColumnStore to Clickhouse and Apache Spark

 Solution  Advantages  Disadvantages
MariaDB ColumnStore
  • MySQL frontend (make it easy to migrate from MySQL)
  • UPDATE and DELETE are supported
  • Window functions support
  • Select queries are slower
  • No replication from normal MySQL server (planned for the future versions)
  • No support for GROUP BY on disk
Yandex ClickHouse
  • Fastest performance
  • Better compression
  • Primary keys
  • Disk-based GROUP BY, etc.
  • No MySQL protocol support
Apache Spark
  • Flexible storage options
  • Machine learning integration (i.e., pyspark ML libraries run inside spark nodes)
  • No MySQL protocol support
  • Slower select queries (compared to ClickHouse)


Conclusion

Yandex ClickHouse is an absolute winner in this benchmark: it shows both better performance (>10x) and better compression than
MariaDB ColumnStore and Apache Spark. If you are looking for the best performance and compression, ClickHouse looks very good.

At the same time, ColumnStore provides a MySQL endpoint (MySQL protocol and syntax), so it is a good option if you are migrating from MySQL. Right now, it can’t replicate directly from MySQL but if this option is available in the future we can attach a ColumnStore replication slave to any MySQL master and use the slave for reporting queries (i.e., BI or data science teams can use a ColumnStore database, which is updated very close to realtime).

Table Structure and List of Queries

Table structure (MySQL / Columnstore version):

CREATE TABLE `wikistat` (
  `date` date DEFAULT NULL,
  `time` datetime DEFAULT NULL,
  `project` varchar(20) DEFAULT NULL,
  `subproject` varchar(2) DEFAULT NULL,
  `path` varchar(1024) DEFAULT NULL,
  `hits` bigint(20) DEFAULT NULL,
  `size` bigint(20) DEFAULT NULL
) ENGINE=Columnstore DEFAULT CHARSET=utf8

Query 1:

select count(*) from wikistat

Query 2a (full scan):

select count(*), month(dt) as mon
from wikistat where year(dt)=2008
and month(dt) between 1 and 10
group by month(dt)
order by month(dt)

Query 2b (for partitioning test)

select count(*), month(date) as mon
from wikistat where
date between '2008-01-01' and '2008-10-31'
group by mon
order by mon;

Query 3:

SELECT
path,
count(*),
sum(hits) AS sum_hits,
round(sum(hits) / count(*), 2) AS hit_ratio
FROM wikistat
WHERE project = 'en'
GROUP BY path
ORDER BY sum_hits DESC
LIMIT 100;

 

Feb
13
2017
--

ClickHouse: New Open Source Columnar Database

Clickhouse

For this blog post, I’ve decided to try ClickHouse: an open source column-oriented database management system developed by Yandex (it currently powers Yandex.Metrica, the world’s second-largest web analytics platform).

In my previous set of posts, I tested Apache Spark for big data analysis and used Wikipedia page statistics as a data source. I’ve used the same data as in the Apache Spark blog post: Wikipedia Page Counts. This allows me to compare ClickHouse’s performance to Spark’s.

ClickHouse

I’ve spent some time testing ClickHouse for relatively large volumes of data (1.2Tb uncompressed). Here is a list of ClickHouse advantages and disadvantages that I saw:

ClickHouseClickHouse advantages

  • Parallel processing for single query (utilizing multiple cores)
  • Distributed processing on multiple servers
  • Very fast scans (see benchmarks below) that can be used for real-time queries
  • Column storage is great for working with “wide” / “denormalized” tables (many columns)
  • Good compression
  • SQL support (with limitations)
  • Good set of functions, including support for approximated calculations
  • Different storage engines (disk storage format)
  • Great for structural log/event data as well as time series data (engine MergeTree requires date field)
  • Index support (primary key only, not all storage engines)
  • Nice command line interface with user-friendly progress bar and formatting

Here is a full list of ClickHouse features

ClickHouse disadvantages

  • No real delete/update support, and no transactions (same as Spark and most of the big data systems)
  • No secondary keys (same as Spark and most of the big data systems)
  • Own protocol (no MySQL protocol support)
  • Limited SQL support, and the joins implementation is different. If you are migrating from MySQL or Spark, you will probably have to re-write all queries with joins.
  • No window functions

Full list of ClickHouse limitations

Group by: in-memory vs. on-disk

Running out of memory is one of the potential problems you may encounter when working with large datasets in ClickHouse:

SELECT
    min(toMonth(date)),
    max(toMonth(date)),
    path,
    count(*),
    sum(hits),
    sum(hits) / count(*) AS hit_ratio
FROM wikistat
WHERE (project = 'en')
GROUP BY path
ORDER BY hit_ratio DESC
LIMIT 10
? Progress: 1.83 billion rows, 85.31 GB (68.80 million rows/s., 3.21 GB/s.) ???????????                                                                                                                                                    6%Received exception from server:
Code: 241. DB::Exception: Received from localhost:9000, 127.0.0.1.
DB::Exception: Memory limit (for query) exceeded: would use 9.31 GiB (attempt to allocate chunk of 1048576 bytes), maximum: 9.31 GiB:
(while reading column hits):

By default, ClickHouse limits the amount of memory for group by (it uses a hash table for group by). This is easily fixed – if you have free memory, increase this parameter:

SET max_memory_usage = 128000000000; #128G

If you don’t have that much memory available, ClickHouse can “spill” data to disk by setting this:

set max_bytes_before_external_group_by=20000000000; #20G
set max_memory_usage=40000000000; #40G

According to the documentation, if you need to use

max_bytes_before_external_group_by

 it is recommended to set

max_memory_usage

 to be ~2x of the size of

max_bytes_before_external_group_by

.

(The reason for this is that the aggregation is performed in two phases: (1) reading and building an intermediate data, and (2) merging the intermediate data. The spill to disk can only happen during the first phase. If there won’t be spill, ClickHouse might need the same amount of RAM for stage 1 and 2.)

Benchmarks: ClickHouse vs. Spark

Both ClickHouse and Spark can be distributed. However, for the purpose of this test I’ve run a single node for both ClickHouse and Spark. The results are quite impressive.

Benchmark summary

 Size / compression  Spark v. 2.0.2  ClickHouse
 Data storage format  Parquet, compressed: snappy   Internal storage, compressed 
 Size (uncompressed: 1.2TB)   395G  212G

Clickhouse

 

 Test  Spark v. 2.0.2  ClickHouse   Diff
 Query 1: count (warm)  7.37 sec (no disk IO)  6.61 sec   ~same
 Query 2: simple group (warm)   792.55 sec (no disk IO)   37.45 sec  21x better
 Query 3: complex group by   2522.9 sec  398.55 sec  6.3x better

 

ClickHouse vs. MySQL

I wanted to see how ClickHouse compared to MySQL. Obviously, we can’t compare some workloads. For example:

  • Storing terabytes of data and querying (“crunching” would be a better word here) data without an index. It would take weeks (or even months) to load data and build the indexes. That is a much more suitable workload for ClickHouse or Spark.
  • Real-time updates / OLTP. ClickHouse does not support real-time updates / deletes.

Usually big data systems provide us with real-time queries. Systems based on map/reduce (i.e., Hive on top of HDFS) are just too slow for real-time queries, as it takes a long time to initialize the map/reduce job and send the code to all nodes.

Potentially, you can use ClickHouse for real-time queries. It does not support secondary indexes, however. This means it will probably scan lots of rows, but it can do it very quickly.

To do this test, I’m using the data from the Percona Monitoring and Management system. The table I’m using has 150 columns, so it is good for column storage. The size in MySQL is ~250G:

mysql> show table status like 'query_class_metrics'G
*************************** 1. row ***************************
           Name: query_class_metrics
         Engine: InnoDB
        Version: 10
     Row_format: Compact
           Rows: 364184844
 Avg_row_length: 599
    Data_length: 218191888384
Max_data_length: 0
   Index_length: 18590056448
      Data_free: 6291456
 Auto_increment: 416994305

Scanning the whole table is significantly faster in ClickHouse. Retrieving just ten rows by key is faster in MySQL (especially from memory).

But what if we only need to scan limited amount of rows and do a group by? In this case, ClickHouse may be faster. Here is the example (real query used to create sparklines):

MySQL

SELECT
	(1480888800 - UNIX_TIMESTAMP(start_ts)) / 11520 as point,
	FROM_UNIXTIME(1480888800 - (SELECT point) * 11520) AS ts,
	COALESCE(SUM(query_count), 0) / 11520 AS query_count_per_sec,
	COALESCE(SUM(Query_time_sum), 0) / 11520 AS query_time_sum_per_sec,
	COALESCE(SUM(Lock_time_sum), 0) / 11520 AS lock_time_sum_per_sec,
	COALESCE(SUM(Rows_sent_sum), 0) / 11520 AS rows_sent_sum_per_sec,
	COALESCE(SUM(Rows_examined_sum), 0) / 11520 AS rows_examined_sum_per_sec
FROM  query_class_metrics
WHERE  query_class_id = 7 AND instance_id = 1259 AND (start_ts >= '2014-11-27 00:00:00'
AND start_ts < '2014-12-05 00:00:00')
GROUP BY point;
...
61 rows in set (0.10 sec)
# Query_time: 0.101203  Lock_time: 0.000407  Rows_sent: 61  Rows_examined: 11639  Rows_affected: 0
explain SELECT ...
*************************** 1. row ***************************
           id: 1
  select_type: PRIMARY
        table: query_class_metrics
   partitions: NULL
         type: range
possible_keys: agent_class_ts,agent_ts
          key: agent_class_ts
      key_len: 12
          ref: NULL
         rows: 21686
     filtered: 100.00
        Extra: Using index condition; Using temporary; Using filesort
*************************** 2. row ***************************
           id: 2
  select_type: DEPENDENT SUBQUERY
        table: NULL
   partitions: NULL
         type: NULL
possible_keys: NULL
          key: NULL
      key_len: NULL
          ref: NULL
         rows: NULL
     filtered: NULL
        Extra: No tables used
2 rows in set, 2 warnings (0.00 sec)

It is relatively fast.

ClickHouse (some functions are different, so we will have to rewrite the query):

SELECT
    intDiv(1480888800 - toRelativeSecondNum(start_ts), 11520) AS point,
    toDateTime(1480888800 - (point * 11520)) AS ts,
    SUM(query_count) / 11520 AS query_count_per_sec,
    SUM(Query_time_sum) / 11520 AS query_time_sum_per_sec,
    SUM(Lock_time_sum) / 11520 AS lock_time_sum_per_sec,
    SUM(Rows_sent_sum) / 11520 AS rows_sent_sum_per_sec,
    SUM(Rows_examined_sum) / 11520 AS rows_examined_sum_per_sec,
    SUM(Rows_affected_sum) / 11520 AS rows_affected_sum_per_sec
FROM query_class_metrics
WHERE (query_class_id = 7) AND (instance_id = 1259) AND ((start_ts >= '2014-11-27 00:00:00')
AND (start_ts < '2014-12-05 00:00:00'))
GROUP BY point;
61 rows in set. Elapsed: 0.017 sec. Processed 270.34 thousand rows, 14.06 MB (15.73 million rows/s., 817.98 MB/s.)

As we can see, even though ClickHouse scans more rows (270K vs. 11K – over 20x more) it is faster to execute the ClickHouse query (0.10 seconds in MySQL compared to 0.01 second in ClickHouse). The column store format helps a lot here, as MySQL has to read all 150 columns (stored inside InnoDB pages) and ClickHouse only needs to read seven columns.

Wikipedia trending article of the month

Inspired by the article about finding trending topics using Google Books n-grams data, I decided to implement the same algorithm on top of the Wikipedia page visit statistics data. My goal here is to find the “article trending this month,” which has significantly more visits this month compared to the previous month. As I was implementing the algorithm, I came across another ClickHouse limitation: join syntax is limited. In ClickHouse, you can only do join with the “using” keyword. This means that the fields you’re joining need to have the same name. If the field name is different, we have to use a subquery.

Below is an example.

First, create a temporary table to aggregate the visits per month per page:

CREATE TABLE wikistat_by_month ENGINE = Memory AS
SELECT
    path,
    mon,
    sum(hits) / total_hits AS ratio
FROM
(
    SELECT
        path,
        hits,
        toMonth(date) AS mon
    FROM wikistat
    WHERE (project = 'en') AND (lower(path) NOT LIKE '%special%') AND (lower(path) NOT LIKE '%page%') AND (lower(path) NOT LIKE '%test%') AND (lower(path) NOT LIKE '%wiki%') AND (lower(path) NOT LIKE '%index.html%')
) AS a
ANY INNER JOIN
(
    SELECT
        toMonth(date) AS mon,
        sum(hits) AS total_hits
    FROM wikistat
    WHERE (project = 'en') AND (lower(path) NOT LIKE '%special%') AND (lower(path) NOT LIKE '%page%') AND (lower(path) NOT LIKE '%test%') AND (lower(path) NOT LIKE '%wiki%') AND (lower(path) NOT LIKE '%index.html%')
    GROUP BY toMonth(date)
) AS b USING (mon)
GROUP BY
    path,
    mon,
    total_hits
ORDER BY ratio DESC
Ok.
0 rows in set. Elapsed: 543.607 sec. Processed 53.77 billion rows, 2.57 TB (98.91 million rows/s., 4.73 GB/s.)

Second, calculate the actual list:

SELECT
    path,
    mon + 1,
    a_ratio AS ratio,
    a_ratio / b_ratio AS increase
FROM
(
    SELECT
        path,
        mon,
        ratio AS a_ratio
    FROM wikistat_by_month
    WHERE ratio > 0.0001
) AS a
ALL INNER JOIN
(
    SELECT
        path,
        CAST((mon - 1) AS UInt8) AS mon,
        ratio AS b_ratio
    FROM wikistat_by_month
    WHERE ratio > 0.0001
) AS b USING (path, mon)
WHERE (mon > 0) AND (increase > 2)
ORDER BY
    mon ASC,
    increase DESC
LIMIT 100
??path?????????????????????????????????????????????????plus(mon, 1)????????????????????ratio?????????????increase??
? Heath_Ledger                                       ?            2 ?  0.0008467223172121601 ?  6.853825241458039 ?
? Cloverfield                                        ?            2 ?  0.0009372609760313347 ?  3.758937474560766 ?
? The_Dark_Knight_(film)                             ?            2 ?  0.0003508532447770276 ? 2.8858100355450484 ?
? Scientology                                        ?            2 ?  0.0003300109101992719 ?   2.52497180013816 ?
? Barack_Obama                                       ?            3 ?  0.0005786473399980557 ?  2.323409928527576 ?
? Canine_reproduction                                ?            3 ?  0.0004836300843539438 ? 2.0058985801174662 ?
? Iron_Man                                           ?            6 ?    0.00036261003907049 ? 3.5301196568303888 ?
? Iron_Man_(film)                                    ?            6 ? 0.00035634745198422497 ? 3.3815325090507193 ?
? Grand_Theft_Auto_IV                                ?            6 ?  0.0004036713142943461 ? 3.2112732008504885 ?
? Indiana_Jones_and_the_Kingdom_of_the_Crystal_Skull ?            6 ?  0.0002856570195547951 ?  2.683443198030021 ?
? Tha_Carter_III                                     ?            7 ? 0.00033954377342889735 ?  2.820114216429247 ?
? EBay                                               ?            7 ?  0.0006575000133427979 ? 2.5483158977946787 ?
? Bebo                                               ?            7 ?  0.0003958340022793501 ? 2.3260912792668162 ?
? Facebook                                           ?            7 ?   0.001683658379576915 ?   2.16460972864883 ?
? Yahoo!_Mail                                        ?            7 ?  0.0002190640575012259 ? 2.1075879062784737 ?
? MySpace                                            ?            7 ?   0.001395608643577507 ?  2.103263660621813 ?
? Gmail                                              ?            7 ?  0.0005449834079575953 ? 2.0675919337716757 ?
? Hotmail                                            ?            7 ?  0.0009126863121737026 ?  2.052471735190232 ?
? Google                                             ?            7 ?   0.000601645849087389 ? 2.0155448612416644 ?
? Barack_Obama                                       ?            7 ? 0.00027336526076130943 ? 2.0031305241832302 ?
? Facebook                                           ?            8 ?  0.0007778115183044431 ?  2.543477658022576 ?
? MySpace                                            ?            8 ?   0.000663544314346641 ?  2.534512981232934 ?
? Two-Face                                           ?            8 ? 0.00026975137404447024 ? 2.4171743959768803 ?
? YouTube                                            ?            8 ?   0.001482456447101451 ? 2.3884527929836152 ?
? Hotmail                                            ?            8 ? 0.00044467667764940547 ? 2.2265750216262954 ?
? The_Dark_Knight_(film)                             ?            8 ?  0.0010482536106662156 ?  2.190078096294301 ?
? Google                                             ?            8 ?  0.0002985028319919154 ? 2.0028812075734637 ?
? Joe_Biden                                          ?            9 ? 0.00045067411455437264 ?  2.692262662620829 ?
? The_Dark_Knight_(film)                             ?            9 ? 0.00047863754833213585 ?  2.420864550676665 ?
? Sarah_Palin                                        ?           10 ?  0.0012459220318907518 ?  2.607063205782761 ?
? Barack_Obama                                       ?           12 ?  0.0034487235202817087 ? 15.615409029600414 ?
? George_W._Bush                                     ?           12 ? 0.00042708730873936023 ? 3.6303098900144937 ?
? Fallout_3                                          ?           12 ?  0.0003568429236849597 ? 2.6193094036745155 ?
???????????????????????????????????????????????????????????????????????????????????????????????????????????????????
34 rows in set. Elapsed: 1.062 sec. Processed 1.22 billion rows, 49.03 GB (1.14 billion rows/s., 46.16 GB/s.)

Their response time is really good, considering the amount of data it needed to scan (the first query scanned 2.57 TB of data).

Conclusion

The ClickHouse column-oriented database looks promising for data analytics, as well as for storing and processing structural event data and time series data. ClickHouse can be ~10x faster than Spark for some workloads.

Appendix: Benchmark details

Hardware

  • CPU: 24xIntel(R) Xeon(R) CPU L5639 @ 2.13GHz (physical = 2, cores = 12, virtual = 24, hyperthreading = yes)
  • Disk: 2 consumer grade SSD in software RAID 0 (mdraid)

Query 1

select count(*) from wikistat

ClickHouse:

:)  select count(*) from wikistat;
SELECT count(*)
FROM wikistat
??????count()??
? 26935251789 ?
???????????????
1 rows in set. Elapsed: 6.610 sec. Processed 26.88 billion rows, 53.77 GB (4.07 billion rows/s., 8.13 GB/s.)

Spark:

spark-sql> select count(*) from wikistat;
26935251789
Time taken: 7.369 seconds, Fetched 1 row(s)

Query 2

select count(*), month(dt) as mon
from wikistat where year(dt)=2008
and month(dt) between 1 and 10
group by month(dt)
order by month(dt);

ClickHouse:

:) select count(*), toMonth(date) as mon from wikistat
where toYear(date)=2008 and toMonth(date) between 1 and 10 group by mon;
SELECT
    count(*),
    toMonth(date) AS mon
FROM wikistat
WHERE (toYear(date) = 2008) AND ((toMonth(date) >= 1) AND (toMonth(date) <= 10))
GROUP BY mon
?????count()???mon??
? 2100162604 ?   1 ?
? 1969757069 ?   2 ?
? 2081371530 ?   3 ?
? 2156878512 ?   4 ?
? 2476890621 ?   5 ?
? 2526662896 ?   6 ?
? 2489723244 ?   7 ?
? 2480356358 ?   8 ?
? 2522746544 ?   9 ?
? 2614372352 ?  10 ?
????????????????????
10 rows in set. Elapsed: 37.450 sec. Processed 23.37 billion rows, 46.74 GB (623.97 million rows/s., 1.25 GB/s.)

Spark:

spark-sql> select count(*), month(dt) as mon from wikistat where year(dt)=2008 and month(dt) between 1 and 10 group by month(dt) order by month(dt);
2100162604      1
1969757069      2
2081371530      3
2156878512      4
2476890621      5
2526662896      6
2489723244      7
2480356358      8
2522746544      9
2614372352      10
Time taken: 792.552 seconds, Fetched 10 row(s)

Query 3

SELECT
path,
count(*),
sum(hits) AS sum_hits,
round(sum(hits) / count(*), 2) AS hit_ratio
FROM wikistat
WHERE project = 'en'
GROUP BY path
ORDER BY sum_hits DESC
LIMIT 100;

ClickHouse:

:) SELECT
:-]     path,
:-]     count(*),
:-]     sum(hits) AS sum_hits,
:-]     round(sum(hits) / count(*), 2) AS hit_ratio
:-] FROM wikistat
:-] WHERE (project = 'en')
:-] GROUP BY path
:-] ORDER BY sum_hits DESC
:-] LIMIT 100;
SELECT
    path,
    count(*),
    sum(hits) AS sum_hits,
    round(sum(hits) / count(*), 2) AS hit_ratio
FROM wikistat
WHERE project = 'en'
GROUP BY path
ORDER BY sum_hits DESC
LIMIT 100
??path??????????????????????????????????????????????????count()?????sum_hits???hit_ratio??
? Special:Search                                      ?   44795 ? 4544605711 ? 101453.41 ?
? Main_Page                                           ?   31930 ? 2115896977 ?  66266.74 ?
? Special:Random                                      ?   30159 ?  533830534 ?  17700.54 ?
? Wiki                                                ?   10237 ?   40488416 ?   3955.11 ?
? Special:Watchlist                                   ?   38206 ?   37200069 ?    973.67 ?
? YouTube                                             ?    9960 ?   34349804 ?   3448.78 ?
? Special:Randompage                                  ?    8085 ?   28959624 ?    3581.9 ?
? Special:AutoLogin                                   ?   34413 ?   24436845 ?    710.11 ?
? Facebook                                            ?    7153 ?   18263353 ?   2553.24 ?
? Wikipedia                                           ?   23732 ?   17848385 ?    752.08 ?
? Barack_Obama                                        ?   13832 ?   16965775 ?   1226.56 ?
? index.html                                          ?    6658 ?   16921583 ?   2541.54 ?
…
100 rows in set. Elapsed: 398.550 sec. Processed 26.88 billion rows, 1.24 TB (67.45 million rows/s., 3.10 GB/s.)

Spark:

spark-sql> SELECT
         >     path,
         >     count(*),
         >     sum(hits) AS sum_hits,
         >     round(sum(hits) / count(*), 2) AS hit_ratio
         > FROM wikistat
         > WHERE (project = 'en')
         > GROUP BY path
         > ORDER BY sum_hits DESC
         > LIMIT 100;
...
Time taken: 2522.903 seconds, Fetched 100 row(s)

 

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com