Nov
12
2020
--

Databricks launches SQL Analytics

AI and data analytics company Databricks today announced the launch of SQL Analytics, a new service that makes it easier for data analysts to run their standard SQL queries directly on data lakes. And with that, enterprises can now easily connect their business intelligence tools like Tableau and Microsoft’s Power BI to these data repositories as well.

SQL Analytics will be available in public preview on November 18.

In many ways, SQL Analytics is the product Databricks has long been looking to build and that brings its concept of a “lake house” to life. It combines the performance of a data warehouse, where you store data after it has already been transformed and cleaned, with a data lake, where you store all of your data in its raw form. The data in the data lake, a concept that Databricks’ co-founder and CEO Ali Ghodsi has long championed, is typically only transformed when it gets used. That makes data lakes cheaper, but also a bit harder to handle for users.

Image Credits: Databricks

“We’ve been saying Unified Data Analytics, which means unify the data with the analytics. So data processing and analytics, those two should be merged. But no one picked that up,” Ghodsi told me. But “lake house” caught on as a term.

“Databricks has always offered data science, machine learning. We’ve talked about that for years. And with Spark, we provide the data processing capability. You can do [extract, transform, load]. That has always been possible. SQL Analytics enables you to now do the data warehousing workloads directly, and concretely, the business intelligence and reporting workloads, directly on the data lake.”

The general idea here is that with just one copy of the data, you can enable both traditional data analyst use cases (think BI) and the data science workloads (think AI) Databricks was already known for. Ideally, that makes both use cases cheaper and simpler.

The service sits on top of an optimized version of Databricks’ open-source Delta Lake storage layer to enable the service to quickly complete queries. In addition, Delta Lake also provides auto-scaling endpoints to keep the query latency consistent, even under high loads.

While data analysts can query these data sets directly, using standard SQL, the company also built a set of connectors to BI tools. Its BI partners include Tableau, Qlik, Looker and Thoughtspot, as well as ingest partners like Fivetran, Fishtown Analytics, Talend and Matillion.

Image Credits: Databricks

“Now more than ever, organizations need a data strategy that enables speed and agility to be adaptable,” said Francois Ajenstat, chief product officer at Tableau. “As organizations are rapidly moving their data to the cloud, we’re seeing growing interest in doing analytics on the data lake. The introduction of SQL Analytics delivers an entirely new experience for customers to tap into insights from massive volumes of data with the performance, reliability and scale they need.”

In a demo, Ghodsi showed me what the new SQL Analytics workspace looks like. It’s essentially a stripped-down version of the standard code-heavy experience with which Databricks users are familiar. Unsurprisingly, SQL Analytics provides a more graphical experience that focuses more on visualizations and not Python code.

While there are already some data analysts on the Databricks platform, this obviously opens up a large new market for the company — something that would surely bolster its plans for an IPO next year.

Oct
22
2019
--

Databricks announces $400M round on $6.2B valuation as analytics platform continues to grow

Databricks is a SaaS business built on top of a bunch of open-source tools, and apparently it’s been going pretty well on the business side of things. In fact, the company claims to be one of the fastest growing enterprise cloud companies ever. Today the company announced a massive $400 million Series F funding round on a hefty $6.2 billion valuation. Today’s funding brings the total raised to almost a $900 million.

Andreessen Horowitz’s Late Stage Venture Fund led the round with new investors BlackRock, Inc., T. Rowe Price Associates, Inc. and Tiger Global Management also participating. The institutional investors are particularly interesting here because as a late-stage startup, Databricks likely has its eye on a future IPO, and having those investors on board already could give them a head start.

CEO Ali Ghodsi was coy when it came to the IPO, but it sure sounded like that’s a direction he wants to go. “We are one of the fastest growing cloud enterprise software companies on record, which means we have a lot of access to capital as this fundraise shows. The revenue is growing gangbusters, and the brand is also really well known. So an IPO is not something that we’re optimizing for, but it’s something that’s definitely going to happen down the line in the not-too-distant future,” Ghodsi told TechCrunch.

The company announced as of Q3 it’s on a $200 million run rate, and it has a platform that consists of four products, all built on foundational open source: Delta Lake, an open-source data lake product; MLflow, an open-source project that helps data teams operationalize machine learning; Koalas, which creates a single machine framework for Spark and Pandos, greatly simplifying working with the two tools; and, finally, Spark, the open-source analytics engine.

You can download the open-source version of all of these tools for free, but they are not easy to use or manage. The way that Databricks makes money is by offering each of these tools in the form of Software as a Service. They handle all of the management headaches associated with using these tools and they charge you a subscription price.

It’s a model that seems to be working, as the company is growing like crazy. It raised $250 million just last February on a $2.75 billion valuation. Apparently the investors saw room for a lot more growth in the intervening six months, as today’s $6.2 billion valuation shows.

Oct
15
2019
--

Databricks brings its Delta Lake project to the Linux Foundation

Databricks, the big data analytics service founded by the original developers of Apache Spark, today announced that it is bringing its Delta Lake open-source project for building data lakes to the Linux Foundation under an open governance model. The company announced the launch of Delta Lake earlier this year, and, even though it’s still a relatively new project, it has already been adopted by many organizations and has found backing from companies like Intel, Alibaba and Booz Allen Hamilton.

“In 2013, we had a small project where we added SQL to Spark at Databricks […] and donated it to the Apache Foundation,” Databricks CEO and co-founder Ali Ghodsi told me. “Over the years, slowly people have changed how they actually leverage Spark and only in the last year or so it really started to dawn upon us that there’s a new pattern that’s emerging and Spark is being used in a completely different way than maybe we had planned initially.”

This pattern, he said, is that companies are taking all of their data and putting it into data lakes and then doing a couple of things with this data, machine learning and data science being the obvious ones. But they are also doing things that are more traditionally associated with data warehouses, like business intelligence and reporting. The term Ghodsi uses for this kind of usage is “Lake House.” More and more, Databricks is seeing that Spark is being used for this purpose and not just to replace Hadoop and doing ETL (extract, transform, load). “This kind of Lake House patterns we’ve seen emerge more and more and we wanted to double down on it.”

Spark 3.0, which is launching today soon, enables more of these use cases and speeds them up significantly, in addition to the launch of a new feature that enables you to add a pluggable data catalog to Spark.

Delta Lake, Ghodsi said, is essentially the data layer of the Lake House pattern. It brings support for ACID transactions to data lakes, scalable metadata handling and data versioning, for example. All the data is stored in the Apache Parquet format and users can enforce schemas (and change them with relative ease if necessary).

It’s interesting to see Databricks choose the Linux Foundation for this project, given that its roots are in the Apache Foundation. “We’re super excited to partner with them,” Ghodsi said about why the company chose the Linux Foundation. “They run the biggest projects on the planet, including the Linux project but also a lot of cloud projects. The cloud-native stuff is all in the Linux Foundation.”

“Bringing Delta Lake under the neutral home of the Linux Foundation will help the open-source community dependent on the project develop the technology addressing how big data is stored and processed, both on-prem and in the cloud,” said Michael Dolan, VP of Strategic Programs at the Linux Foundation. “The Linux Foundation helps open-source communities leverage an open governance model to enable broad industry contribution and consensus building, which will improve the state of the art for data storage and reliability.”

Feb
05
2019
--

Databricks raises $250M at a $2.75B valuation for its analytics platform

Databricks, the company founded by the original team behind the Apache Spark big data analytics engine, today announced that it has raised a $250 million Series E round led by Andreessen Horowitz. Coatue Management, Green Bay Ventures, Microsoft and NEA, also participated in this round, which brings the company’s total funding to $498.5 million. Microsoft’s involvement here is probably a bit of a surprise, but it’s worth noting that it also worked with Databricks on the launch of Azure Databricks as a first-party service on the platform, something that’s still a rarity in the Azure cloud.

As Databricks also today announced, its annual recurring revenue now exceeds $100 million. The company didn’t share whether it’s cash flow-positive at this point, but Databricks CEO and co-founder Ali Ghodsi shared that the company’s valuation is now $2.75 billion.

Current customers, which the company says number around 2,000, include the likes of Nielsen, Hotels.com, Overstock, Bechtel, Shell and HP.

“What Ali and the Databricks team have built is truly phenomenal,” Green Bay Ventures co-founder Anthony Schiller told me. “Their success is a testament to product innovation at the highest level. Databricks is without question best-in-class and their impact on the industry proves it. We were thrilled to participate in this round.”

While Databricks is obviously known for its contributions to Apache Spark, the company itself monetizes that work by offering its Unified Analytics platform on top of it. This platform allows enterprises to build their data pipelines across data storage systems and prepare data sets for data scientists and engineers. To do this, Databricks offers shared notebooks and tools for building, managing and monitoring data pipelines, and then uses that data to build machine learning models, for example. Indeed, training and deploying these models is one of the company’s focus areas these days, which makes sense, given that this is one of the main use cases for big data, after all.

On top of that, Databricks also offers a fully managed service for hosting all of these tools.

“Databricks is the clear winner in the big data platform race,” said Ben Horowitz, co-founder and general partner at Andreessen Horowitz, in today’s announcement. “In addition, they have created a new category atop their world-beating Apache Spark platform called Unified Analytics that is growing even faster. As a result, we are thrilled to invest in this round.”

Ghodsi told me that Horowitz was also instrumental in getting the company to re-focus on growth. The company was already growing fast, of course, but Horowitz asked him why Databricks wasn’t growing faster. Unsurprisingly, given that it’s an enterprise company, that means aggressively hiring a larger sales force — and that’s costly. Hence the company’s need to raise at this point.

As Ghodsi told me, one of the areas the company wants to focus on is the Asia Pacific region, where overall cloud usage is growing fast. The other area the company is focusing on is support for more verticals like mass media and entertainment, federal agencies and fintech firms, which also comes with its own cost, given that the experts there don’t come cheap.

Ghodsi likes to call this “boring AI,” since it’s not as exciting as self-driving cars. In his view, though, the enterprise companies that don’t start using machine learning now will inevitably be left behind in the long run. “If you don’t get there, there’ll be no place for you in the next 20 years,” he said.

Engineering, of course, will also get a chunk of this new funding, with an emphasis on relatively new products like MLFlow and Delta, two tools Databricks recently developed and that make it easier to manage the life cycle of machine learning models and build the necessary data pipelines to feed them.

Jan
15
2018
--

Sneak Peek of the Percona Live 2018 Open Source Database Conference Breakout Sessions!

Percona Live 2018

Percona Live 2018Take a look at the sneak peek of the breakout sessions for the Percona Live 2018 Open Source Database Conference, taking place April 23-25, 2018 at the Santa Clara Convention Center in Santa Clara, California. Early Bird registration discounts are available until February 4, 2018, and sponsorship opportunities are still available.

Conference breakout sessions will feature a range of in-depth talks related to each of the key areas. Breakout session examples include:

  • Database Security as a Function: Scaling to Your Organization’s Needs – Laine Campbell, Fastly
  • How to Use JSON in MySQL Wrong – Bill Karwin, Square
  • Scaling a High Traffic Database: Moving Tables Across Clusters – Bryana Knight, GitHub
  • MySQL: How to Save Bandwidth – Georgi Kodinov, Oracle
  • MyRocks Roadmaps and Production Deployment at Facebook – Yoshinori Matsunobu, Facebook
  • Securing Your Data on PostgreSQL – Payal Singh, OmniTI Computer Consulting, Inc.
  • The Accidental DBA – Jenni Snyder, Yelp
  • How Microsoft Built MySQL, PostgreSQL and MariaDB for the Cloud – Jun Su, Microsoft
  • MongoDB Cluster Topology, Management and Optimization – Steven Wang, Tesla
  • Ghostferry: A Data Migration Tool for Incompatible Cloud Platforms – Shuhao Wu, Shopify, Inc.

Percona Live Open Source Database Conference 2018 is the premier open source database event. The theme for the upcoming conference is “Championing Open Source Databases,” with a range of topics on MySQL, MongoDB and other open source databases, including time series databases, PostgreSQL and RocksDB. Session tracks include Developers, Operations and Business/Case Studies. A major conference focus will be providing strategies to help attendees meet their business goals by deploying the right mix of database solutions to obtain the performance they need while managing complexity.

Hyatt Regency Santa Clara & The Santa Clara Convention Center

Percona Live 2018 Open Source Database Conference 2018 will be held at the Hyatt Regency Santa Clara & The Santa Clara Convention Center, at 5101 Great America Parkway Santa Clara, CA 95054.

The Hyatt Regency Santa Clara & The Santa Clara Convention Center is a prime location in the heart of the Silicon Valley. Enjoy this spacious venue with complimentary wifi, on-site expert staff and three great restaurants offering Tuscan cuisine, classic American or tantalizing Sushi. Staying for a couple of extra days? Take time to enjoy the Bay Area and enjoy a day in San Francisco located only an hour away. You can reserve a room by booking through the Hyatt’s dedicated Percona Live reservation site.

Book your hotel using Percona’s special room block rate!

Sponsorships

Sponsorship opportunities for Percona Live 2018 Open Source Database Conference 2018 are available and offer the opportunity to interact with the DBAs, sysadmins, developers, CTOs, CEOs, business managers, technology evangelists, solution vendors, and entrepreneurs who typically attend the event. Contact live@percona.com for sponsorship details.

 

Jun
06
2017
--

Databricks releases serverless platform for Apache Spark along with new library supporting deep learning

 Today to kick off Spark Summit, Databricks announced a Serverless Platform for Apache Spark — welcome news for developers looking to reduce time spent on cluster management. The move to simplify developer experiences is set to be a major theme of the event overall. In addition to Serverless, the company also introduced Deep Learning Pipelines, a library that makes it easy to mix… Read More

Mar
17
2017
--

Column Store Database Benchmarks: MariaDB ColumnStore vs. Clickhouse vs. Apache Spark

Column Store Database

This blog shares some column store database benchmark results, and compares the query performance of MariaDB ColumnStore v. 1.0.7 (based on InfiniDB), Clickhouse and Apache Spark.

I’ve already written about ClickHouse (Column Store database).

The purpose of the benchmark is to see how these three solutions work on a single big server, with many CPU cores and large amounts of RAM. Both systems are massively parallel (MPP) database systems, so they should use many cores for SELECT queries.

For the benchmarks, I chose three datasets:

  1. Wikipedia page Counts, loaded full with the year 2008, ~26 billion rows
  2. Query analytics data from Percona Monitoring and Management
  3. Online shop orders

This blog post shares the results for the Wikipedia page counts (same queries as for the Clickhouse benchmark). In the following posts I will use other datasets to compare the performance.

Databases, Versions and Storage Engines Tested

  • MariaDB ColumnStore v. 1.0.7, ColumnStore storage engine
  • Yandex ClickHouse v. 1.1.54164, MergeTree storage engine
  • Apache Spark v. 2.1.0, Parquet files and ORC files

Although all of the above solutions can run in a “cluster” mode (with multiple nodes), I’ve only used one server.

Hardware

This time I’m using newer and faster hardware:

  • CPU: physical = 2, cores = 32, virtual = 64, hyperthreading = yes
  • RAM: 256Gb
  • Disk: Samsung SSD 960 PRO 1TB, NVMe card

Data Sizes

I’ve loaded the above data into Clickhouse, ColumnStore and MySQL (for MySQL the data included a primary key; Wikistat was not loaded to MySQL due to the size). MySQL tables are InnoDB with a primary key.

Dataset Size (GB) Column Store Clickhouse MySQL Spark / Parquet Spark / ORC file
Wikistat 374.24 Gb 211.3 Gb n/a (> 2 Tb) 395 Gb 273 Gb
Query metrics 61.23 Gb 28.35 Gb 520 Gb
Store Orders 9.3 Gb 4.01 Gb 46.55 Gb

 

Query Performance

Wikipedia page counts queries

Test type (warm) Spark Clickhouse ColumnStore
Query 1: count(*) 5.37 2.14 30.77
Query 2: group by month 205.75 16.36 259.09
Query 3: top 100 wiki pages by hits (group by path) 750.35 171.22 1640.7

Test type (cold) Spark Clickhouse ColumnStore
Query 1: count(*) 21.93 8.01 139.01
Query 2: group by month 217.88 16.65 420.77
Query 3: top 100 wiki pages by hits (group by path) 887.434 182.56 1703.19


Partitioning and Primary Keys

All of the solutions have the ability to take advantage of data “partitioning,” and only scan needed rows.

Clickhouse has “primary keys” (for the MergeTree storage engine) and scans only the needed chunks of data (similar to partition “pruning” in MySQL). No changes to SQL or table definitions is needed when working with Clickhouse.

Clickhouse example:

:) select count(*), toMonth(date) as mon
:-] from wikistat where toYear(date)=2008
:-] and toMonth(date) = 1
:-] group by mon
:-] order by mon;
SELECT
    count(*),
    toMonth(date) AS mon
FROM wikistat
WHERE (toYear(date) = 2008) AND (toMonth(date) = 1)
GROUP BY mon
ORDER BY mon ASC
?????count()???mon??
? 2077594099 ?   1 ?
????????????????????
1 rows in set. Elapsed: 0.787 sec. Processed 2.08 billion rows, 4.16 GB (2.64 billion rows/s., 5.28 GB/s.)
:) select count(*), toMonth(date) as mon from wikistat where toYear(date)=2008 and toMonth(date) between 1 and 10 group by mon order by mon;
SELECT
    count(*),
    toMonth(date) AS mon
FROM wikistat
WHERE (toYear(date) = 2008) AND ((toMonth(date) >= 1) AND (toMonth(date) <= 10))
GROUP BY mon
ORDER BY mon ASC
?????count()???mon??
? 2077594099 ?   1 ?
? 1969757069 ?   2 ?
? 2081371530 ?   3 ?
? 2156878512 ?   4 ?
? 2476890621 ?   5 ?
? 2526662896 ?   6 ?
? 2460873213 ?   7 ?
? 2480356358 ?   8 ?
? 2522746544 ?   9 ?
? 2614372352 ?  10 ?
????????????????????
10 rows in set. Elapsed: 13.426 sec. Processed 23.37 billion rows, 46.74 GB (1.74 billion rows/s., 3.48 GB/s.)

As we can see here, ClickHouse has processed ~two billion rows for one month of data, and ~23 billion rows for ten months of data. Queries that only select one month of data are much faster.

For ColumnStore we need to re-write the SQL query and use “between ‘2008-01-01’ and 2008-01-10′” so it can take advantage of partition elimination (as long as the data is loaded in approximate time order). When using functions (i.e., year(dt) or month(dt)), the current implementation does not use this optimization. (This is similar to MySQL, in that if the WHERE clause has month(dt) or any other functions, MySQL can’t use an index on the dt field.)

ColumnStore example:

MariaDB [wikistat]> select count(*), month(date) as mon
    -> from wikistat where year(date)=2008
    -> and month(date) = 1
    -> group by mon
    -> order by mon;
+------------+------+
| count(*)   | mon  |
+------------+------+
| 2077594099 |    1 |
+------------+------+
1 row in set (2 min 12.34 sec)
MariaDB [wikistat]> select count(*), month(date) as mon
from wikistat
where date between '2008-01-01' and '2008-01-31'
group by mon order by mon;
+------------+------+
| count(*)   | mon  |
+------------+------+
| 2077594099 |    1 |
+------------+------+
1 row in set (12.46 sec)

Apache Spark does have partitioning however. It requires the use of partitioning with parquet format in the table definition. Without declaring partitions, even the modified query (“select count(*), month(date) as mon from wikistat where date between ‘2008-01-01’ and ‘2008-01-31’ group by mon order by mon”) will have to scan all the data.

The following table and graph shows the performance of the updated query:

Test type / updated query Spark Clickhouse ColumnStore
group by month, one month, updated syntax 205.75 0.93 12.46
group by month, ten months, updated syntax 205.75 8.84 170.81

 

Working with Large Datasets

With 1Tb uncompressed data, doing a “GROUP BY” requires lots of memory to store the intermediate results (unlike MySQL, ColumnStore, Clickhouse and Apache Spark use hash tables to store groups by “buckets”). For example, this query requires a very large hash table:

SELECT
path,
count(*),
sum(hits) AS sum_hits,
round(sum(hits) / count(*), 2) AS hit_ratio
FROM wikistat
WHERE project = 'en'
GROUP BY path
ORDER BY sum_hits DESC
LIMIT 100

As “path” is actually a URL (without the hostname), it takes a lot of memory to store the intermediate results (hash table) for GROUP BY.

MariaDB ColumnStore does not allow us to “spill” data on disk for now (only disk-based joins are implemented). If you need to GROUP BY on a large text field, you can decrease the disk block cache setting in Columnstore.xml (i.e., set disk cache to 10% of RAM) to make room for an intermediate GROUP BY:

<DBBC>
                <!-- The percentage of RAM to use for the disk block cache. Defaults to 86% -->
                <NumBlocksPct>10</NumBlocksPct>

In addition, as the query has an ORDER BY, we need to

increase max_length_for_sort_data

 in MySQL:

ERROR 1815 (HY000): Internal error: IDB-2015: Sorting length exceeded. Session variable max_length_for_sort_data needs to be set higher.
mysql> set global max_length_for_sort_data=8*1024*1024;

SQL Support

SQL Spark* Clickhouse ColumnStore
INSERT … VALUES ? yes ? yes ? yes
INSERT SELECT / BULK INSERT ? yes ? yes ? yes
UPDATE ? no ? no ? yes
DELETE ? no ? no ? yes
ALTER … ADD/DROP/MODIFY COLUMN ? no ? yes ? yes
ALTER … change paritions ? yes ? yes ? yes
SELECT with WINDOW functions ? yes ? no ? yes

 

*Spark does not support UPDATE/DELETE. However, Hive supports ACID transactions with UPDATE and DELETE statements. BEGIN, COMMIT, and ROLLBACK are not yet supported (only the ORC file format is supported).

ColumnStore is the only database out of the three that supports a full set of DML and DDL (almost all of the MySQL’s implementation of SQL is supported).

Comparing ColumnStore to Clickhouse and Apache Spark

 Solution  Advantages  Disadvantages
MariaDB ColumnStore
  • MySQL frontend (make it easy to migrate from MySQL)
  • UPDATE and DELETE are supported
  • Window functions support
  • Select queries are slower
  • No replication from normal MySQL server (planned for the future versions)
  • No support for GROUP BY on disk
Yandex ClickHouse
  • Fastest performance
  • Better compression
  • Primary keys
  • Disk-based GROUP BY, etc.
  • No MySQL protocol support
Apache Spark
  • Flexible storage options
  • Machine learning integration (i.e., pyspark ML libraries run inside spark nodes)
  • No MySQL protocol support
  • Slower select queries (compared to ClickHouse)


Conclusion

Yandex ClickHouse is an absolute winner in this benchmark: it shows both better performance (>10x) and better compression than
MariaDB ColumnStore and Apache Spark. If you are looking for the best performance and compression, ClickHouse looks very good.

At the same time, ColumnStore provides a MySQL endpoint (MySQL protocol and syntax), so it is a good option if you are migrating from MySQL. Right now, it can’t replicate directly from MySQL but if this option is available in the future we can attach a ColumnStore replication slave to any MySQL master and use the slave for reporting queries (i.e., BI or data science teams can use a ColumnStore database, which is updated very close to realtime).

Table Structure and List of Queries

Table structure (MySQL / Columnstore version):

CREATE TABLE `wikistat` (
  `date` date DEFAULT NULL,
  `time` datetime DEFAULT NULL,
  `project` varchar(20) DEFAULT NULL,
  `subproject` varchar(2) DEFAULT NULL,
  `path` varchar(1024) DEFAULT NULL,
  `hits` bigint(20) DEFAULT NULL,
  `size` bigint(20) DEFAULT NULL
) ENGINE=Columnstore DEFAULT CHARSET=utf8

Query 1:

select count(*) from wikistat

Query 2a (full scan):

select count(*), month(dt) as mon
from wikistat where year(dt)=2008
and month(dt) between 1 and 10
group by month(dt)
order by month(dt)

Query 2b (for partitioning test)

select count(*), month(date) as mon
from wikistat where
date between '2008-01-01' and '2008-10-31'
group by mon
order by mon;

Query 3:

SELECT
path,
count(*),
sum(hits) AS sum_hits,
round(sum(hits) / count(*), 2) AS hit_ratio
FROM wikistat
WHERE project = 'en'
GROUP BY path
ORDER BY sum_hits DESC
LIMIT 100;

 

Dec
16
2016
--

Percona Live 2017 Sneak Peek Schedule Up Now! See the Available Sessions!

Percona Live 2017

Percona Live 2017We are excited to announce that the sneak peek schedule for the Percona Live 2017 Open Source Database Conference is up! The Percona Live Open Source Database Conference 2017 is April 24th – 27th, at the Hyatt Regency Santa Clara & The Santa Clara Convention Center.

The Percona Live Open Source Database Conference 2017 is the premier event for the rich and diverse MySQL, MongoDB and open source database ecosystems. This conference provides an opportunity to network with peers and technology professionals by bringing together accomplished DBA’s, system architects and developers from around the world to share their knowledge and experience.

Below are some of our top picks for MySQL, MongoDB and open source database sessions:

Tutorials

MySQL 101 Tracks

MongoDB 101 Tracks

Breakout Talks

Register for the Percona Live Open Source Database Conference here.

Early Bird Discounts

Just a reminder to everyone out there: our Early Bird discount rate for the Percona Live Open Source Database Conference 2017 is only available ‘til January 8, 2017, 11:30 pm PST! This rate gets you all the excellent and amazing opportunities that Percona Live offers, at a very reasonable price!

Sponsor Percona Live

Become a conference sponsor! We have sponsorship opportunities available for this annual MySQL, MongoDB and open source database event. Sponsors become a part of a dynamic and growing ecosystem and interact with hundreds of DBAs, sysadmins, developers, CTOs, CEOs, business managers, technology evangelists, solutions vendors, and entrepreneurs who attend the event.

Sep
27
2016
--

IBM releases DataWorks to give enterprise data a home and a brain

IBM DataWorks While the gears of research are turning fast developing new methods of machine intelligence, another, perhaps more impactful, trend is brewing in the field. Open source frameworks like Apache Spark are hitting their stride at the ideal time to put data analytics in the hands of the business development analyst without forgetting about the needs of the data scientist. IBM’s new… Read More

Aug
17
2016
--

How Apache Spark makes your slow MySQL queries 10x faster (or more)

slow MySQL queries

slow MySQL queriesIn this blog post, we’ll discuss how to improve the performance of slow MySQL queries using Apache Spark.

Introduction

In my previous blog post, I wrote about using Apache Spark with MySQL for data analysis and showed how to transform and analyze a large volume of data (text files) with Apache Spark. Vadim also performed a benchmark comparing performance of MySQL and Spark with Parquet columnar format (using Air traffic performance data). That works great, but what if we don’t want to move our data from MySQL to another storage (i.e., columnar format), and instead want to use “ad hock” queries on top of an existing MySQL server? Apache Spark can help here as well.

TL;DR version:

Using Apache Spark on top of the existing MySQL server(s) (without the need to export or even stream data to Spark or Hadoop), we can increase query performance more than ten times. Using multiple MySQL servers (replication or Percona XtraDB Cluster) gives us an additional performance increase for some queries. You can also use the Spark cache function to cache the whole MySQL query results table.

The idea is simple: Spark can read MySQL data via JDBC and can also execute SQL queries, so we can connect it directly to MySQL and run the queries. Why is this faster? For long running (i.e., reporting or BI) queries, it can be much faster as Spark is a massively parallel system. MySQL can only use one CPU core per query, whereas Spark can use all cores on all cluster nodes. In my examples below, MySQL queries are executed inside Spark and run 5-10 times faster (on top of the same MySQL data).

In addition, Spark can add “cluster” level parallelism. In the case of MySQL replication or Percona XtraDB Cluster, Spark can split the query into a set of smaller queries (in the case of a partitioned table it will run one query per each partition for example) and run those in parallel across multiple slave servers of multiple Percona XtraDB Cluster nodes. Finally, it will use map/reduce the type of processing to aggregate the results.

I’ve used the same “Airlines On-Time Performance” database as in previous posts. Vadim created some scripts to download data and upload it to MySQL. You can find the scripts here: https://github.com/Percona-Lab/ontime-airline-performance. I’ve also used Apache Spark 2.0, which was released July 26, 2016.

Apache Spark Setup

Starting Apache Spark in standalone mode is easy. To recap:

  1. Download the Apache Spark 2.0 and place it somewhere.
  2. Start master
  3. Start slave (worker) and attach it to the master
  4. Start the app (in this case spark-shell or spark-sql)

Example:

root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

To connect to Spark we can use spark-shell (Scala), pyspark (Python) or spark-sql. Since spark-sql is similar to MySQL cli, using it would be the easiest option (even “show tables” works). I also wanted to work with Scala in interactive mode so I’ve used spark-shell as well. In all the examples I’m using the same SQL query in MySQL and Spark, so working with Spark is not that different.

To work with MySQL server in Spark we need Connector/J for MySQL. Download the package and copy the mysql-connector-java-5.1.39-bin.jar to the spark directory, then add the class path to the conf/spark-defaults.conf:

spark.driver.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.39-bin.jar
spark.executor.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.39-bin.jar

Running MySQL queries via Apache Spark

For this test I was using one physical server with 12 CPU cores (older Intel(R) Xeon(R) CPU L5639 @ 2.13GHz) and 48G of RAM, SSD disks. I’ve installed MySQL and started spark master and spark slave on the same box.

Now we are ready to run MySQL queries inside Spark. First, start the shell (from the Spark directory, /usr/local/spark in my case):

$ ./bin/spark-shell --driver-memory 4G --master spark://server1:7077

Then we will need to connect to MySQL from spark and register the temporary view:

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" ->  "jdbc:mysql://localhost:3306/ontime?user=root&password=",
  "dbtable" -> "ontime.ontime_part",
  "fetchSize" -> "10000",
  "partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2016", "numPartitions" -> "28"
  )).load()
jdbcDF.createOrReplaceTempView("ontime")

So we have created a “datasource” for Spark (or in other words, a “link” from Spark to MySQL). The Spark table name is “ontime” (linked to MySQL ontime.ontime_part table) and we can run SQL queries in Spark, which in turn parse it and translate it in MySQL queries.

partitionColumn” is very important here. It tells Spark to run multiple queries in parallel, one query per each partition.

Now we can run the query:

val sqlDF = sql("select min(year), max(year) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and (origin = 'RDU' or dest = 'RDU') GROUP by carrier HAVING cnt > 100000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT  10")
sqlDF.show()

MySQL Query Example

Let’s go back to MySQL for a second and look at the query example. I’ve chosen the following query (from my older blog post):

select min(year), max(year) as max_year, Carrier, count(*) as cnt,
sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed,
round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate
FROM ontime
WHERE
DayOfWeek not in (6,7)
and OriginState not in ('AK', 'HI', 'PR', 'VI')
and DestState not in ('AK', 'HI', 'PR', 'VI')
GROUP by carrier HAVING cnt > 100000 and max_year > '1990'
ORDER by rate DESC, cnt desc
LIMIT  10

The query will find the total number of delayed flights per each airline. In addition, the query will calculate the smart “ontime” rating, taking into consideration the number of flights (we do not want to compare smaller air carriers with the large ones, and we want to exclude the older airlines who are not in business anymore).

The main reason I’ve chosen this query is that it is hard to optimize it in MySQL. All conditions in the “where” clause will only filter out ~70% of rows. I’ve done a basic calculation:

mysql> select count(*) FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI');
+-----------+
| count(*)  |
+-----------+
| 108776741 |
+-----------+
mysql> select count(*) FROM ontime;
+-----------+
| count(*)  |
+-----------+
| 152657276 |
+-----------+
mysql> select round((108776741/152657276)*100, 2);
+-------------------------------------+
| round((108776741/152657276)*100, 2) |
+-------------------------------------+
|                               71.26 |
+-------------------------------------+

Table structure:

CREATE TABLE `ontime_part` (
  `YearD` int(11) NOT NULL,
  `Quarter` tinyint(4) DEFAULT NULL,
  `MonthD` tinyint(4) DEFAULT NULL,
  `DayofMonth` tinyint(4) DEFAULT NULL,
  `DayOfWeek` tinyint(4) DEFAULT NULL,
  `FlightDate` date DEFAULT NULL,
  `UniqueCarrier` char(7) DEFAULT NULL,
  `AirlineID` int(11) DEFAULT NULL,
  `Carrier` char(2) DEFAULT NULL,
  `TailNum` varchar(50) DEFAULT NULL,
...
  `id` int(11) NOT NULL AUTO_INCREMENT,
  PRIMARY KEY (`id`,`YearD`),
  KEY `covered` (`DayOfWeek`,`OriginState`,`DestState`,`Carrier`,`YearD`,`ArrDelayMinutes`)
) ENGINE=InnoDB AUTO_INCREMENT=162668935 DEFAULT CHARSET=latin1
/*!50100 PARTITION BY RANGE (YearD)
(PARTITION p1987 VALUES LESS THAN (1988) ENGINE = InnoDB,
 PARTITION p1988 VALUES LESS THAN (1989) ENGINE = InnoDB,
 PARTITION p1989 VALUES LESS THAN (1990) ENGINE = InnoDB,
 PARTITION p1990 VALUES LESS THAN (1991) ENGINE = InnoDB,
 PARTITION p1991 VALUES LESS THAN (1992) ENGINE = InnoDB,
 PARTITION p1992 VALUES LESS THAN (1993) ENGINE = InnoDB,
 PARTITION p1993 VALUES LESS THAN (1994) ENGINE = InnoDB,
 PARTITION p1994 VALUES LESS THAN (1995) ENGINE = InnoDB,
 PARTITION p1995 VALUES LESS THAN (1996) ENGINE = InnoDB,
 PARTITION p1996 VALUES LESS THAN (1997) ENGINE = InnoDB,
 PARTITION p1997 VALUES LESS THAN (1998) ENGINE = InnoDB,
 PARTITION p1998 VALUES LESS THAN (1999) ENGINE = InnoDB,
 PARTITION p1999 VALUES LESS THAN (2000) ENGINE = InnoDB,
 PARTITION p2000 VALUES LESS THAN (2001) ENGINE = InnoDB,
 PARTITION p2001 VALUES LESS THAN (2002) ENGINE = InnoDB,
 PARTITION p2002 VALUES LESS THAN (2003) ENGINE = InnoDB,
 PARTITION p2003 VALUES LESS THAN (2004) ENGINE = InnoDB,
 PARTITION p2004 VALUES LESS THAN (2005) ENGINE = InnoDB,
 PARTITION p2005 VALUES LESS THAN (2006) ENGINE = InnoDB,
 PARTITION p2006 VALUES LESS THAN (2007) ENGINE = InnoDB,
 PARTITION p2007 VALUES LESS THAN (2008) ENGINE = InnoDB,
 PARTITION p2008 VALUES LESS THAN (2009) ENGINE = InnoDB,
 PARTITION p2009 VALUES LESS THAN (2010) ENGINE = InnoDB,
 PARTITION p2010 VALUES LESS THAN (2011) ENGINE = InnoDB,
 PARTITION p2011 VALUES LESS THAN (2012) ENGINE = InnoDB,
 PARTITION p2012 VALUES LESS THAN (2013) ENGINE = InnoDB,
 PARTITION p2013 VALUES LESS THAN (2014) ENGINE = InnoDB,
 PARTITION p2014 VALUES LESS THAN (2015) ENGINE = InnoDB,
 PARTITION p2015 VALUES LESS THAN (2016) ENGINE = InnoDB,
 PARTITION p_new VALUES LESS THAN MAXVALUE ENGINE = InnoDB) */

Even with a “covered” index, MySQL will have to scan ~70M-100M of rows and create a temporary table:

mysql>  explain select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT  10G
*************************** 1. row ***************************
           id: 1
  select_type: SIMPLE
        table: ontime_part
         type: range
possible_keys: covered
          key: covered
      key_len: 2
          ref: NULL
         rows: 70483364
        Extra: Using where; Using index; Using temporary; Using filesort
1 row in set (0.00 sec)

What is the query response time in MySQL:

mysql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT  10;
+------------+----------+---------+----------+-----------------+------+
| min(yearD) | max_year | Carrier | cnt      | flights_delayed | rate |
+------------+----------+---------+----------+-----------------+------+
|       2003 |     2013 | EV      |  2962008 |          464264 | 0.16 |
|       2003 |     2013 | B6      |  1237400 |          187863 | 0.15 |
|       2006 |     2011 | XE      |  1615266 |          230977 | 0.14 |
|       2003 |     2005 | DH      |   501056 |           69833 | 0.14 |
|       2001 |     2013 | MQ      |  4518106 |          605698 | 0.13 |
|       2003 |     2013 | FL      |  1692887 |          212069 | 0.13 |
|       2004 |     2010 | OH      |  1307404 |          175258 | 0.13 |
|       2006 |     2013 | YV      |  1121025 |          143597 | 0.13 |
|       2003 |     2006 | RU      |  1007248 |          126733 | 0.13 |
|       1988 |     2013 | UA      | 10717383 |         1327196 | 0.12 |
+------------+----------+---------+----------+-----------------+------+
10 rows in set (19 min 16.58 sec)

19 minutes is definitely not great.

SQL in Spark

Now we want to run the same query inside Spark and let Spark read data from MySQL. We will create a “datasource” and execute the query:

scala> val jdbcDF = spark.read.format("jdbc").options(
     |   Map("url" ->  "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql",
     |   "dbtable" -> "ontime.ontime_sm",
     |   "fetchSize" -> "10000",
     |   "partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2015", "numPartitions" -> "48"
     |   )).load()
16/08/02 23:24:12 WARN JDBCRelation: The number of partitions is reduced because the specified number of partitions is less than the difference between upper bound and lower bound. Updated number of partitions: 27; Input number of partitions: 48; Lower bound: 1988; Upper bound: 2015.
dbcDF: org.apache.spark.sql.DataFrame = [id: int, YearD: date ... 19 more fields]
scala> jdbcDF.createOrReplaceTempView("ontime")
scala> val sqlDF = sql("select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT  10")
sqlDF: org.apache.spark.sql.DataFrame = [min(yearD): date, max_year: date ... 4 more fields]
scala> sqlDF.show()
+----------+--------+-------+--------+---------------+----+
|min(yearD)|max_year|Carrier|     cnt|flights_delayed|rate|
+----------+--------+-------+--------+---------------+----+
|      2003|    2013|     EV| 2962008|         464264|0.16|
|      2003|    2013|     B6| 1237400|         187863|0.15|
|      2006|    2011|     XE| 1615266|         230977|0.14|
|      2003|    2005|     DH|  501056|          69833|0.14|
|      2001|    2013|     MQ| 4518106|         605698|0.13|
|      2003|    2013|     FL| 1692887|         212069|0.13|
|      2004|    2010|     OH| 1307404|         175258|0.13|
|      2006|    2013|     YV| 1121025|         143597|0.13|
|      2003|    2006|     RU| 1007248|         126733|0.13|
|      1988|    2013|     UA|10717383|        1327196|0.12|
+----------+--------+-------+--------+---------------+----+

spark-shell does not show the query time. This can be retrieved from Web UI or from spark-sql. I’ve re-run the same query in spark-sql:

./bin/spark-sql --driver-memory 4G  --master spark://thor:7077
spark-sql> CREATE TEMPORARY VIEW ontime
         > USING org.apache.spark.sql.jdbc
         > OPTIONS (
         >      url  "jdbc:mysql://localhost:3306/ontime?user=root&password=",
         >      dbtable "ontime.ontime_part",
         >      fetchSize "1000",
         >      partitionColumn "yearD", lowerBound "1988", upperBound "2014", numPartitions "48"
         > );
16/08/04 01:44:27 WARN JDBCRelation: The number of partitions is reduced because the specified number of partitions is less than the difference between upper bound and lower bound. Updated number of partitions: 26; Input number of partitions: 48; Lower bound: 1988; Upper bound: 2014.
Time taken: 3.864 seconds
spark-sql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT  10;
16/08/04 01:45:13 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
2003    2013    EV      2962008 464264  0.16
2003    2013    B6      1237400 187863  0.15
2006    2011    XE      1615266 230977  0.14
2003    2005    DH      501056  69833   0.14
2001    2013    MQ      4518106 605698  0.13
2003    2013    FL      1692887 212069  0.13
2004    2010    OH      1307404 175258  0.13
2006    2013    YV      1121025 143597  0.13
2003    2006    RU      1007248 126733  0.13
1988    2013    UA      10717383        1327196 0.12
Time taken: 139.628 seconds, Fetched 10 row(s)

So the response time of the same query is almost 10x faster (on the same server, just one box). But now how was this query translated to MySQL queries, and why it is so much faster? Here is what is happening inside MySQL:

Inside MySQL

Spark:

scala> sqlDF.show()
[Stage 4:>                                                        (0 + 26) / 26]

MySQL:

mysql> select id, info from information_schema.processlist where info is not NULL and info not like '%information_schema%';
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id    | info                                                                                                                                                                                                                                                    |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 10948 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2001 AND yearD < 2002) |
| 10965 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2007 AND yearD < 2008) |
| 10966 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1991 AND yearD < 1992) |
| 10967 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1994 AND yearD < 1995) |
| 10968 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1998 AND yearD < 1999) |
| 10969 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2010 AND yearD < 2011) |
| 10970 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2002 AND yearD < 2003) |
| 10971 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2006 AND yearD < 2007) |
| 10972 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1990 AND yearD < 1991) |
| 10953 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2009 AND yearD < 2010) |
| 10947 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1993 AND yearD < 1994) |
| 10956 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD < 1989 or yearD is null)  |
| 10951 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2005 AND yearD < 2006) |
| 10954 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1996 AND yearD < 1997) |
| 10955 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2008 AND yearD < 2009) |
| 10961 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1999 AND yearD < 2000) |
| 10962 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2011 AND yearD < 2012) |
| 10963 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2003 AND yearD < 2004) |
| 10964 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1995 AND yearD < 1996) |
| 10957 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2004 AND yearD < 2005) |
| 10949 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1989 AND yearD < 1990) |
| 10950 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1997 AND yearD < 1998) |
| 10952 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2013)                  |
| 10958 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1992 AND yearD < 1993) |
| 10960 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2000 AND yearD < 2001) |
| 10959 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2012 AND yearD < 2013) |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
26 rows in set (0.00 sec)

Spark is running 26 queries in parallel, which is great. As the table is partitioned it only uses one partition per query, but scans the whole partition:

mysql> explain partitions SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2001 AND yearD < 2002)G
*************************** 1. row ***************************
           id: 1
  select_type: SIMPLE
        table: ontime_part
   partitions: p2001
         type: ALL
possible_keys: NULL
          key: NULL
      key_len: NULL
          ref: NULL
         rows: 5814106
        Extra: Using where
1 row in set (0.00 sec)

In this case, as the box has 12 CPU cores / 24 threads, it efficently executes 26 queries in parallel and the partitioned table helps to avoid contention issues (I wish MySQL could scan partitions in parallel, but it can’t at the time of writing).

Another interesting thing is that Spark can “push down” some of the conditions to MySQL, but only those inside the “where” clause. All group by/order by/aggregations are done inside Spark. It  needs to retrieve data from MySQL to satisfy those conditions and will not push down group by/order by/etc to MySQL.

That also means that queries without “where” conditions (for example “select count(*) as cnt, carrier from ontime group by carrier order by cnt desc limit 10”) will have to retrieve all data from MySQL and load it to Spark (as opposed to MySQL will do all group by inside). Running it in Spark might be slower or faster (depending on the amount of data and use of indexes) but it also requires more resources and potentially more memory dedicated for Spark. The above query is translated to 26 queries, each does a “select carrier from ontime_part where (yearD >= N AND yearD < N)”

Pushing down the whole query into MySQL 

If we want to avoid sending all data from MySQL to Spark we have the option of creating a temporary table on top of a query (similar to MySQL’s create temporary table as select …). In Scala:

val tableQuery =
 "(select yeard, count(*) from ontime group by yeard) tmp"
 val jdbcDFtmp = spark.read.format("jdbc").options(
   Map("url" ->  "jdbc:mysql://localhost:3306/ontime?user=root&password=",
   "dbtable" -> tableQuery,
   "fetchSize" -> "10000"
   )).load()
jdbcDFtmp.createOrReplaceTempView("ontime_tmp")

In Spark SQL:

CREATE TEMPORARY VIEW ontime_tmp
USING org.apache.spark.sql.jdbc
OPTIONS (
     url  "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql",
     dbtable "(select yeard, count(*) from ontime_part group by yeard) tmp",
     fetchSize "1000"
);
select * from ontime_tmp;

Please note:

  1. We do not want to use “partitionColumn” here, otherwise we will see 26 queries like this in MySQL: “SELECT yeard, count(*) FROM (select yeard, count(*) from ontime_part group by yeard) tmp where (yearD >= N AND yearD < N)” (obviously not optimal)
  2. This is not a good use of Spark, more like a “hack.” The only good reason to do it is to be able to have the result of the query as a source of an additional query.
Query cache in Spark

Another option is to cache the result of the query (or even the whole table) and then use .filter in Scala for faster processing. This requires sufficient memory dedicated for Spark. The good news is we can add additional nodes to Spark and get more memory for Spark cluster.

Spark SQL example:

CREATE TEMPORARY VIEW ontime_latest
USING org.apache.spark.sql.jdbc
OPTIONS (
     url  "jdbc:mysql://localhost:3306/ontime?user=root&password=",
     dbtable "ontime.ontime_part partition (p2013, p2014)",
     fetchSize "1000",
     partitionColumn "yearD", lowerBound "1988", upperBound "2014", numPartitions "26"
);
cache table ontime_latest;
spark-sql> cache table ontime_latest;
Time taken: 465.076 seconds
spark-sql> select count(*) from ontime_latest;
5349447
Time taken: 0.526 seconds, Fetched 1 row(s)
spark-sql> select count(*), dayofweek from ontime_latest group by dayofweek;
790896  1
634664  6
795540  3
794667  5
808243  4
743282  7
782155  2
Time taken: 0.541 seconds, Fetched 7 row(s)
spark-sql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_latest WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and (origin='RDU' or dest = 'RDU') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT  10;
2013    2013    MQ      9339    1734    0.19
2013    2013    B6      3302    516     0.16
2013    2013    EV      9225    1331    0.14
2013    2013    UA      1317    177     0.13
2013    2013    AA      5354    620     0.12
2013    2013    9E      5520    593     0.11
2013    2013    WN      10968   1130    0.1
2013    2013    US      5722    549     0.1
2013    2013    DL      6313    478     0.08
2013    2013    FL      2433    205     0.08
Time taken: 2.036 seconds, Fetched 10 row(s)

Here we cache partitions p2013 and p2014 in Spark. This retrieves the data from MySQL and loads it in Spark. After that all queries run on the cached data and will be much faster.

With Scala we can cache the result of a query and then use filters to only get the information we need:

val sqlDF = sql("SELECT flightdate, origin, dest, depdelayminutes, arrdelayminutes, carrier, TailNum, Cancelled, Diverted, Distance from ontime")
sqlDF.cache().show()
scala> sqlDF.filter("flightdate='1988-01-01'").count()
res5: Long = 862

Using Spark with Percona XtraDB Cluster

As Spark can be used in a cluster mode and scale with more and more nodes, reading data from a single MySQL is a bottleneck. We can use MySQL replication slave servers or Percona XtraDB Cluster (PXC) nodes as a Spark datasource. To test it out, I’ve provisioned Percona XtraDB Cluster with three nodes on AWS (I’ve used m4.2xlarge Ubuntu instances) and also started Apache Spark on each node:

  1. Node1 (pxc1): Percona Server + Spark Master + Spark worker node + Spark SQL running
  2. Node2 (pxc2): Percona Server + Spark worker node
  3. Node3 (pxc3): Percona Server + Spark worker node

All the Spark worker nodes use the memory configuration option:

cat conf/spark-env.sh
export SPARK_WORKER_MEMORY=24g

Then I can start spark-sql (also need to have connector/J JAR file copied to all nodes):

$ ./bin/spark-sql --driver-memory 4G --master spark://pxc1:7077

When creating a table, I still use localhost to connect to MySQL (url “jdbc:mysql://localhost:3306/ontime?user=root&password=xxx”). As Spark worker nodes are running on the same instance as Percona Cluster nodes, it will use the local connection. Then running a Spark SQL will evenly distribute all 26 MySQL queries among the three MySQL nodes.

Alternatively we can run Spark cluster on a separate host and connect it to the HA Proxy, which in turn will load balance selects across multiple Percona XtraDB Cluster nodes.

Query Performance Benchmark

Finally, here is the query response time test on the three AWS Percona XtraDB Cluster nodes:

Query 1:

select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10;

Query / Index type MySQL Time Spark Time (3 nodes) Times Improvement
No covered index (partitioned) 19 min 16.58 sec 192.17 sec 6.02
Covered index (partitioned) 2 min 10.81 sec 48.38 sec 2.7

 

Query 2: 

select dayofweek, count(*) from ontime_part group by dayofweek;

Query / Index type MySQL Time Spark Time (3 nodes) Times Improvement
No covered index (partitoned) 19 min 15.21 sec 195.058 sec 5.92
Covered index (partitioned) 1 min 10.38 sec 27.323 sec 2.58

 

Now, this looks really good, but it can be better. With three nodes @ m4.2xlarge we will have 8*3 = 24 cores total (although they are shared between Spark and MySQL). We can expect 10x improvement, especially without a covered index.

However, on m4.2xlarge the amount of RAM did not allow me to run MySQL out of memory, so all reads were from EBS non-provisioned IOPS, which only gave me ~120MB/sec. I’ve redone the test on a set of three dedicated servers:

  • 28 cores E5-2683 v3 @ 2.00GHz
  • 240GB of RAM
  • Samsung 850 PRO

The test was running completely off RAM:

Query 1 (from the above)

Query / Index type MySQL Time Spark Time (3 nodes) Times Improvement
No covered index (partitoned) 3 min 13.94 sec 14.255 sec 13.61
Covered index (partitioned) 2 min 2.11 sec 9.035 sec 13.52

 

Query 2: 

select dayofweek, count(*) from ontime_part group by dayofweek;

Query / Index type MySQL Time Spark Time (3 nodes) Times Improvement
No covered index (partitoned)  2 min 0.36 sec 7.055 sec 17.06
Covered index (partitioned) 1 min 6.85 sec 4.514 sec 14.81

 

With this amount of cores and running out of RAM we actually do not have enough concurrency as the table only have 26 partitions. I’ve tried the unpartitioned table with ID primary key and use 128 partitions.

Note about partitioning

I’ve used partitioned table (partition by year) in my tests to help reduce MySQL level contention. At the same time the “partitionColumn” option in Spark does not require that MySQL table is partitioned. For example, if a table has a primary key, we can use this CREATE VIEW in Spark :

CREATE OR REPLACE TEMPORARY VIEW ontime
USING org.apache.spark.sql.jdbc
OPTIONS (
  url  "jdbc:mysql://127.0.0.1:3306/ontime?user=root&password=",
  dbtable "ontime.ontime",
  fetchSize "1000",
  partitionColumn "id", lowerBound "1", upperBound "162668934", numPartitions "128"
);

Assuming we have enough MySQL servers (i.e., nodes or slaves), we can increase the number of partitions and that can improve the parallelism (as opposed to only 26 partitions when running one partition by year). Actually, the above test gives us even better response time: 6.44 seconds for query 1.

Where Spark doesn’t work well

For faster queries (those that use indexes or can efficiently use an index) it does not make sense to use Spark. Retrieving data from MySQL and loading it into Spark is not free. This overhead can be significant for faster queries. For example, a query like this 

select count(*) from ontime_part where YearD = 2013 and DayOfWeek = 7 and OriginState = 'NC' and DestState = 'NC';

 will only scan 1300 rows and will return instant (0.00 seconds reported by MySQL).

An even better example is this: 

select max(id) from ontime_part

. In MySQL, the query will use the index and all calculations will be done inside MySQL. Spark, on the other hand, will have to retrieve all IDs (select id from ontime_part) from MySQL and calculate maximum. That took 24.267 seconds.

Conclusion

Using Apache Spark as an additional engine level on top of MySQL can help to speed up the slow reporting queries and add much-needed scalability for the long running select queries. In addition, Spark can help with query caching for frequent queries.

PS: Visual explain plan with Spark

Spark Web GUI provides lots of ways of monitoring Spark jobs. For example, it shows the “job” progress:

spark_jobs

And SQL visual explain details:

slow MySQL queries

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com