Feb
05
2019
--

Databricks raises $250M at a $2.75B valuation for its analytics platform

Databricks, the company founded by the original team behind the Apache Spark big data analytics engine, today announced that it has raised a $250 million Series E round led by Andreessen Horowitz. Coatue Management, Green Bay Ventures, Microsoft and NEA, also participated in this round, which brings the company’s total funding to $498.5 million. Microsoft’s involvement here is probably a bit of a surprise, but it’s worth noting that it also worked with Databricks on the launch of Azure Databricks as a first-party service on the platform, something that’s still a rarity in the Azure cloud.

As Databricks also today announced, its annual recurring revenue now exceeds $100 million. The company didn’t share whether it’s cash flow-positive at this point, but Databricks CEO and co-founder Ali Ghodsi shared that the company’s valuation is now $2.75 billion.

Current customers, which the company says number around 2,000, include the likes of Nielsen, Hotels.com, Overstock, Bechtel, Shell and HP.

“What Ali and the Databricks team have built is truly phenomenal,” Green Bay Ventures co-founder Anthony Schiller told me. “Their success is a testament to product innovation at the highest level. Databricks is without question best-in-class and their impact on the industry proves it. We were thrilled to participate in this round.”

While Databricks is obviously known for its contributions to Apache Spark, the company itself monetizes that work by offering its Unified Analytics platform on top of it. This platform allows enterprises to build their data pipelines across data storage systems and prepare data sets for data scientists and engineers. To do this, Databricks offers shared notebooks and tools for building, managing and monitoring data pipelines, and then uses that data to build machine learning models, for example. Indeed, training and deploying these models is one of the company’s focus areas these days, which makes sense, given that this is one of the main use cases for big data, after all.

On top of that, Databricks also offers a fully managed service for hosting all of these tools.

“Databricks is the clear winner in the big data platform race,” said Ben Horowitz, co-founder and general partner at Andreessen Horowitz, in today’s announcement. “In addition, they have created a new category atop their world-beating Apache Spark platform called Unified Analytics that is growing even faster. As a result, we are thrilled to invest in this round.”

Ghodsi told me that Horowitz was also instrumental in getting the company to re-focus on growth. The company was already growing fast, of course, but Horowitz asked him why Databricks wasn’t growing faster. Unsurprisingly, given that it’s an enterprise company, that means aggressively hiring a larger sales force — and that’s costly. Hence the company’s need to raise at this point.

As Ghodsi told me, one of the areas the company wants to focus on is the Asia Pacific region, where overall cloud usage is growing fast. The other area the company is focusing on is support for more verticals like mass media and entertainment, federal agencies and fintech firms, which also comes with its own cost, given that the experts there don’t come cheap.

Ghodsi likes to call this “boring AI,” since it’s not as exciting as self-driving cars. In his view, though, the enterprise companies that don’t start using machine learning now will inevitably be left behind in the long run. “If you don’t get there, there’ll be no place for you in the next 20 years,” he said.

Engineering, of course, will also get a chunk of this new funding, with an emphasis on relatively new products like MLFlow and Delta, two tools Databricks recently developed and that make it easier to manage the life cycle of machine learning models and build the necessary data pipelines to feed them.

Jan
15
2018
--

Sneak Peek of the Percona Live 2018 Open Source Database Conference Breakout Sessions!

Percona Live 2018

Percona Live 2018Take a look at the sneak peek of the breakout sessions for the Percona Live 2018 Open Source Database Conference, taking place April 23-25, 2018 at the Santa Clara Convention Center in Santa Clara, California. Early Bird registration discounts are available until February 4, 2018, and sponsorship opportunities are still available.

Conference breakout sessions will feature a range of in-depth talks related to each of the key areas. Breakout session examples include:

  • Database Security as a Function: Scaling to Your Organization’s Needs – Laine Campbell, Fastly
  • How to Use JSON in MySQL Wrong – Bill Karwin, Square
  • Scaling a High Traffic Database: Moving Tables Across Clusters – Bryana Knight, GitHub
  • MySQL: How to Save Bandwidth – Georgi Kodinov, Oracle
  • MyRocks Roadmaps and Production Deployment at Facebook – Yoshinori Matsunobu, Facebook
  • Securing Your Data on PostgreSQL – Payal Singh, OmniTI Computer Consulting, Inc.
  • The Accidental DBA – Jenni Snyder, Yelp
  • How Microsoft Built MySQL, PostgreSQL and MariaDB for the Cloud – Jun Su, Microsoft
  • MongoDB Cluster Topology, Management and Optimization – Steven Wang, Tesla
  • Ghostferry: A Data Migration Tool for Incompatible Cloud Platforms – Shuhao Wu, Shopify, Inc.

Percona Live Open Source Database Conference 2018 is the premier open source database event. The theme for the upcoming conference is “Championing Open Source Databases,” with a range of topics on MySQL, MongoDB and other open source databases, including time series databases, PostgreSQL and RocksDB. Session tracks include Developers, Operations and Business/Case Studies. A major conference focus will be providing strategies to help attendees meet their business goals by deploying the right mix of database solutions to obtain the performance they need while managing complexity.

Hyatt Regency Santa Clara & The Santa Clara Convention Center

Percona Live 2018 Open Source Database Conference 2018 will be held at the Hyatt Regency Santa Clara & The Santa Clara Convention Center, at 5101 Great America Parkway Santa Clara, CA 95054.

The Hyatt Regency Santa Clara & The Santa Clara Convention Center is a prime location in the heart of the Silicon Valley. Enjoy this spacious venue with complimentary wifi, on-site expert staff and three great restaurants offering Tuscan cuisine, classic American or tantalizing Sushi. Staying for a couple of extra days? Take time to enjoy the Bay Area and enjoy a day in San Francisco located only an hour away. You can reserve a room by booking through the Hyatt’s dedicated Percona Live reservation site.

Book your hotel using Percona’s special room block rate!

Sponsorships

Sponsorship opportunities for Percona Live 2018 Open Source Database Conference 2018 are available and offer the opportunity to interact with the DBAs, sysadmins, developers, CTOs, CEOs, business managers, technology evangelists, solution vendors, and entrepreneurs who typically attend the event. Contact live@percona.com for sponsorship details.

 

Jun
06
2017
--

Databricks releases serverless platform for Apache Spark along with new library supporting deep learning

 Today to kick off Spark Summit, Databricks announced a Serverless Platform for Apache Spark — welcome news for developers looking to reduce time spent on cluster management. The move to simplify developer experiences is set to be a major theme of the event overall. In addition to Serverless, the company also introduced Deep Learning Pipelines, a library that makes it easy to mix… Read More

Mar
17
2017
--

Column Store Database Benchmarks: MariaDB ColumnStore vs. Clickhouse vs. Apache Spark

Column Store Database

This blog shares some column store database benchmark results, and compares the query performance of MariaDB ColumnStore v. 1.0.7 (based on InfiniDB), Clickhouse and Apache Spark.

I’ve already written about ClickHouse (Column Store database).

The purpose of the benchmark is to see how these three solutions work on a single big server, with many CPU cores and large amounts of RAM. Both systems are massively parallel (MPP) database systems, so they should use many cores for SELECT queries.

For the benchmarks, I chose three datasets:

  1. Wikipedia page Counts, loaded full with the year 2008, ~26 billion rows
  2. Query analytics data from Percona Monitoring and Management
  3. Online shop orders

This blog post shares the results for the Wikipedia page counts (same queries as for the Clickhouse benchmark). In the following posts I will use other datasets to compare the performance.

Databases, Versions and Storage Engines Tested

  • MariaDB ColumnStore v. 1.0.7, ColumnStore storage engine
  • Yandex ClickHouse v. 1.1.54164, MergeTree storage engine
  • Apache Spark v. 2.1.0, Parquet files and ORC files

Although all of the above solutions can run in a “cluster” mode (with multiple nodes), I’ve only used one server.

Hardware

This time I’m using newer and faster hardware:

  • CPU: physical = 2, cores = 32, virtual = 64, hyperthreading = yes
  • RAM: 256Gb
  • Disk: Samsung SSD 960 PRO 1TB, NVMe card

Data Sizes

I’ve loaded the above data into Clickhouse, ColumnStore and MySQL (for MySQL the data included a primary key; Wikistat was not loaded to MySQL due to the size). MySQL tables are InnoDB with a primary key.

Dataset Size (GB) Column Store Clickhouse MySQL Spark / Parquet Spark / ORC file
Wikistat 374.24 Gb 211.3 Gb n/a (> 2 Tb) 395 Gb 273 Gb
Query metrics 61.23 Gb 28.35 Gb 520 Gb
Store Orders 9.3 Gb 4.01 Gb 46.55 Gb

 

Query Performance

Wikipedia page counts queries

Test type (warm) Spark Clickhouse ColumnStore
Query 1: count(*) 5.37 2.14 30.77
Query 2: group by month 205.75 16.36 259.09
Query 3: top 100 wiki pages by hits (group by path) 750.35 171.22 1640.7

Test type (cold) Spark Clickhouse ColumnStore
Query 1: count(*) 21.93 8.01 139.01
Query 2: group by month 217.88 16.65 420.77
Query 3: top 100 wiki pages by hits (group by path) 887.434 182.56 1703.19


Partitioning and Primary Keys

All of the solutions have the ability to take advantage of data “partitioning,” and only scan needed rows.

Clickhouse has “primary keys” (for the MergeTree storage engine) and scans only the needed chunks of data (similar to partition “pruning” in MySQL). No changes to SQL or table definitions is needed when working with Clickhouse.

Clickhouse example:

:) select count(*), toMonth(date) as mon
:-] from wikistat where toYear(date)=2008
:-] and toMonth(date) = 1
:-] group by mon
:-] order by mon;
SELECT
    count(*),
    toMonth(date) AS mon
FROM wikistat
WHERE (toYear(date) = 2008) AND (toMonth(date) = 1)
GROUP BY mon
ORDER BY mon ASC
?????count()???mon??
? 2077594099 ?   1 ?
????????????????????
1 rows in set. Elapsed: 0.787 sec. Processed 2.08 billion rows, 4.16 GB (2.64 billion rows/s., 5.28 GB/s.)
:) select count(*), toMonth(date) as mon from wikistat where toYear(date)=2008 and toMonth(date) between 1 and 10 group by mon order by mon;
SELECT
    count(*),
    toMonth(date) AS mon
FROM wikistat
WHERE (toYear(date) = 2008) AND ((toMonth(date) >= 1) AND (toMonth(date) <= 10))
GROUP BY mon
ORDER BY mon ASC
?????count()???mon??
? 2077594099 ?   1 ?
? 1969757069 ?   2 ?
? 2081371530 ?   3 ?
? 2156878512 ?   4 ?
? 2476890621 ?   5 ?
? 2526662896 ?   6 ?
? 2460873213 ?   7 ?
? 2480356358 ?   8 ?
? 2522746544 ?   9 ?
? 2614372352 ?  10 ?
????????????????????
10 rows in set. Elapsed: 13.426 sec. Processed 23.37 billion rows, 46.74 GB (1.74 billion rows/s., 3.48 GB/s.)

As we can see here, ClickHouse has processed ~two billion rows for one month of data, and ~23 billion rows for ten months of data. Queries that only select one month of data are much faster.

For ColumnStore we need to re-write the SQL query and use “between ‘2008-01-01’ and 2008-01-10′” so it can take advantage of partition elimination (as long as the data is loaded in approximate time order). When using functions (i.e., year(dt) or month(dt)), the current implementation does not use this optimization. (This is similar to MySQL, in that if the WHERE clause has month(dt) or any other functions, MySQL can’t use an index on the dt field.)

ColumnStore example:

MariaDB [wikistat]> select count(*), month(date) as mon
    -> from wikistat where year(date)=2008
    -> and month(date) = 1
    -> group by mon
    -> order by mon;
+------------+------+
| count(*)   | mon  |
+------------+------+
| 2077594099 |    1 |
+------------+------+
1 row in set (2 min 12.34 sec)
MariaDB [wikistat]> select count(*), month(date) as mon
from wikistat
where date between '2008-01-01' and '2008-01-31'
group by mon order by mon;
+------------+------+
| count(*)   | mon  |
+------------+------+
| 2077594099 |    1 |
+------------+------+
1 row in set (12.46 sec)

Apache Spark does have partitioning however. It requires the use of partitioning with parquet format in the table definition. Without declaring partitions, even the modified query (“select count(*), month(date) as mon from wikistat where date between ‘2008-01-01’ and ‘2008-01-31’ group by mon order by mon”) will have to scan all the data.

The following table and graph shows the performance of the updated query:

Test type / updated query Spark Clickhouse ColumnStore
group by month, one month, updated syntax 205.75 0.93 12.46
group by month, ten months, updated syntax 205.75 8.84 170.81

 

Working with Large Datasets

With 1Tb uncompressed data, doing a “GROUP BY” requires lots of memory to store the intermediate results (unlike MySQL, ColumnStore, Clickhouse and Apache Spark use hash tables to store groups by “buckets”). For example, this query requires a very large hash table:

SELECT
path,
count(*),
sum(hits) AS sum_hits,
round(sum(hits) / count(*), 2) AS hit_ratio
FROM wikistat
WHERE project = 'en'
GROUP BY path
ORDER BY sum_hits DESC
LIMIT 100

As “path” is actually a URL (without the hostname), it takes a lot of memory to store the intermediate results (hash table) for GROUP BY.

MariaDB ColumnStore does not allow us to “spill” data on disk for now (only disk-based joins are implemented). If you need to GROUP BY on a large text field, you can decrease the disk block cache setting in Columnstore.xml (i.e., set disk cache to 10% of RAM) to make room for an intermediate GROUP BY:

<DBBC>
                <!-- The percentage of RAM to use for the disk block cache. Defaults to 86% -->
                <NumBlocksPct>10</NumBlocksPct>

In addition, as the query has an ORDER BY, we need to

increase max_length_for_sort_data

 in MySQL:

ERROR 1815 (HY000): Internal error: IDB-2015: Sorting length exceeded. Session variable max_length_for_sort_data needs to be set higher.
mysql> set global max_length_for_sort_data=8*1024*1024;

SQL Support

SQL Spark* Clickhouse ColumnStore
INSERT … VALUES ? yes ? yes ? yes
INSERT SELECT / BULK INSERT ? yes ? yes ? yes
UPDATE ? no ? no ? yes
DELETE ? no ? no ? yes
ALTER … ADD/DROP/MODIFY COLUMN ? no ? yes ? yes
ALTER … change paritions ? yes ? yes ? yes
SELECT with WINDOW functions ? yes ? no ? yes

 

*Spark does not support UPDATE/DELETE. However, Hive supports ACID transactions with UPDATE and DELETE statements. BEGIN, COMMIT, and ROLLBACK are not yet supported (only the ORC file format is supported).

ColumnStore is the only database out of the three that supports a full set of DML and DDL (almost all of the MySQL’s implementation of SQL is supported).

Comparing ColumnStore to Clickhouse and Apache Spark

 Solution  Advantages  Disadvantages
MariaDB ColumnStore
  • MySQL frontend (make it easy to migrate from MySQL)
  • UPDATE and DELETE are supported
  • Window functions support
  • Select queries are slower
  • No replication from normal MySQL server (planned for the future versions)
  • No support for GROUP BY on disk
Yandex ClickHouse
  • Fastest performance
  • Better compression
  • Primary keys
  • Disk-based GROUP BY, etc.
  • No MySQL protocol support
Apache Spark
  • Flexible storage options
  • Machine learning integration (i.e., pyspark ML libraries run inside spark nodes)
  • No MySQL protocol support
  • Slower select queries (compared to ClickHouse)


Conclusion

Yandex ClickHouse is an absolute winner in this benchmark: it shows both better performance (>10x) and better compression than
MariaDB ColumnStore and Apache Spark. If you are looking for the best performance and compression, ClickHouse looks very good.

At the same time, ColumnStore provides a MySQL endpoint (MySQL protocol and syntax), so it is a good option if you are migrating from MySQL. Right now, it can’t replicate directly from MySQL but if this option is available in the future we can attach a ColumnStore replication slave to any MySQL master and use the slave for reporting queries (i.e., BI or data science teams can use a ColumnStore database, which is updated very close to realtime).

Table Structure and List of Queries

Table structure (MySQL / Columnstore version):

CREATE TABLE `wikistat` (
  `date` date DEFAULT NULL,
  `time` datetime DEFAULT NULL,
  `project` varchar(20) DEFAULT NULL,
  `subproject` varchar(2) DEFAULT NULL,
  `path` varchar(1024) DEFAULT NULL,
  `hits` bigint(20) DEFAULT NULL,
  `size` bigint(20) DEFAULT NULL
) ENGINE=Columnstore DEFAULT CHARSET=utf8

Query 1:

select count(*) from wikistat

Query 2a (full scan):

select count(*), month(dt) as mon
from wikistat where year(dt)=2008
and month(dt) between 1 and 10
group by month(dt)
order by month(dt)

Query 2b (for partitioning test)

select count(*), month(date) as mon
from wikistat where
date between '2008-01-01' and '2008-10-31'
group by mon
order by mon;

Query 3:

SELECT
path,
count(*),
sum(hits) AS sum_hits,
round(sum(hits) / count(*), 2) AS hit_ratio
FROM wikistat
WHERE project = 'en'
GROUP BY path
ORDER BY sum_hits DESC
LIMIT 100;

 

Dec
16
2016
--

Percona Live 2017 Sneak Peek Schedule Up Now! See the Available Sessions!

Percona Live 2017

Percona Live 2017We are excited to announce that the sneak peek schedule for the Percona Live 2017 Open Source Database Conference is up! The Percona Live Open Source Database Conference 2017 is April 24th – 27th, at the Hyatt Regency Santa Clara & The Santa Clara Convention Center.

The Percona Live Open Source Database Conference 2017 is the premier event for the rich and diverse MySQL, MongoDB and open source database ecosystems. This conference provides an opportunity to network with peers and technology professionals by bringing together accomplished DBA’s, system architects and developers from around the world to share their knowledge and experience.

Below are some of our top picks for MySQL, MongoDB and open source database sessions:

Tutorials

MySQL 101 Tracks

MongoDB 101 Tracks

Breakout Talks

Register for the Percona Live Open Source Database Conference here.

Early Bird Discounts

Just a reminder to everyone out there: our Early Bird discount rate for the Percona Live Open Source Database Conference 2017 is only available ‘til January 8, 2017, 11:30 pm PST! This rate gets you all the excellent and amazing opportunities that Percona Live offers, at a very reasonable price!

Sponsor Percona Live

Become a conference sponsor! We have sponsorship opportunities available for this annual MySQL, MongoDB and open source database event. Sponsors become a part of a dynamic and growing ecosystem and interact with hundreds of DBAs, sysadmins, developers, CTOs, CEOs, business managers, technology evangelists, solutions vendors, and entrepreneurs who attend the event.

Sep
27
2016
--

IBM releases DataWorks to give enterprise data a home and a brain

IBM DataWorks While the gears of research are turning fast developing new methods of machine intelligence, another, perhaps more impactful, trend is brewing in the field. Open source frameworks like Apache Spark are hitting their stride at the ideal time to put data analytics in the hands of the business development analyst without forgetting about the needs of the data scientist. IBM’s new… Read More

Aug
17
2016
--

How Apache Spark makes your slow MySQL queries 10x faster (or more)

slow MySQL queries

slow MySQL queriesIn this blog post, we’ll discuss how to improve the performance of slow MySQL queries using Apache Spark.

Introduction

In my previous blog post, I wrote about using Apache Spark with MySQL for data analysis and showed how to transform and analyze a large volume of data (text files) with Apache Spark. Vadim also performed a benchmark comparing performance of MySQL and Spark with Parquet columnar format (using Air traffic performance data). That works great, but what if we don’t want to move our data from MySQL to another storage (i.e., columnar format), and instead want to use “ad hock” queries on top of an existing MySQL server? Apache Spark can help here as well.

TL;DR version:

Using Apache Spark on top of the existing MySQL server(s) (without the need to export or even stream data to Spark or Hadoop), we can increase query performance more than ten times. Using multiple MySQL servers (replication or Percona XtraDB Cluster) gives us an additional performance increase for some queries. You can also use the Spark cache function to cache the whole MySQL query results table.

The idea is simple: Spark can read MySQL data via JDBC and can also execute SQL queries, so we can connect it directly to MySQL and run the queries. Why is this faster? For long running (i.e., reporting or BI) queries, it can be much faster as Spark is a massively parallel system. MySQL can only use one CPU core per query, whereas Spark can use all cores on all cluster nodes. In my examples below, MySQL queries are executed inside Spark and run 5-10 times faster (on top of the same MySQL data).

In addition, Spark can add “cluster” level parallelism. In the case of MySQL replication or Percona XtraDB Cluster, Spark can split the query into a set of smaller queries (in the case of a partitioned table it will run one query per each partition for example) and run those in parallel across multiple slave servers of multiple Percona XtraDB Cluster nodes. Finally, it will use map/reduce the type of processing to aggregate the results.

I’ve used the same “Airlines On-Time Performance” database as in previous posts. Vadim created some scripts to download data and upload it to MySQL. You can find the scripts here: https://github.com/Percona-Lab/ontime-airline-performance. I’ve also used Apache Spark 2.0, which was released July 26, 2016.

Apache Spark Setup

Starting Apache Spark in standalone mode is easy. To recap:

  1. Download the Apache Spark 2.0 and place it somewhere.
  2. Start master
  3. Start slave (worker) and attach it to the master
  4. Start the app (in this case spark-shell or spark-sql)

Example:

root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

To connect to Spark we can use spark-shell (Scala), pyspark (Python) or spark-sql. Since spark-sql is similar to MySQL cli, using it would be the easiest option (even “show tables” works). I also wanted to work with Scala in interactive mode so I’ve used spark-shell as well. In all the examples I’m using the same SQL query in MySQL and Spark, so working with Spark is not that different.

To work with MySQL server in Spark we need Connector/J for MySQL. Download the package and copy the mysql-connector-java-5.1.39-bin.jar to the spark directory, then add the class path to the conf/spark-defaults.conf:

spark.driver.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.39-bin.jar
spark.executor.extraClassPath = /usr/local/spark/mysql-connector-java-5.1.39-bin.jar

Running MySQL queries via Apache Spark

For this test I was using one physical server with 12 CPU cores (older Intel(R) Xeon(R) CPU L5639 @ 2.13GHz) and 48G of RAM, SSD disks. I’ve installed MySQL and started spark master and spark slave on the same box.

Now we are ready to run MySQL queries inside Spark. First, start the shell (from the Spark directory, /usr/local/spark in my case):

$ ./bin/spark-shell --driver-memory 4G --master spark://server1:7077

Then we will need to connect to MySQL from spark and register the temporary view:

val jdbcDF = spark.read.format("jdbc").options(
  Map("url" ->  "jdbc:mysql://localhost:3306/ontime?user=root&password=",
  "dbtable" -> "ontime.ontime_part",
  "fetchSize" -> "10000",
  "partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2016", "numPartitions" -> "28"
  )).load()
jdbcDF.createOrReplaceTempView("ontime")

So we have created a “datasource” for Spark (or in other words, a “link” from Spark to MySQL). The Spark table name is “ontime” (linked to MySQL ontime.ontime_part table) and we can run SQL queries in Spark, which in turn parse it and translate it in MySQL queries.

partitionColumn” is very important here. It tells Spark to run multiple queries in parallel, one query per each partition.

Now we can run the query:

val sqlDF = sql("select min(year), max(year) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and (origin = 'RDU' or dest = 'RDU') GROUP by carrier HAVING cnt > 100000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT  10")
sqlDF.show()

MySQL Query Example

Let’s go back to MySQL for a second and look at the query example. I’ve chosen the following query (from my older blog post):

select min(year), max(year) as max_year, Carrier, count(*) as cnt,
sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed,
round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate
FROM ontime
WHERE
DayOfWeek not in (6,7)
and OriginState not in ('AK', 'HI', 'PR', 'VI')
and DestState not in ('AK', 'HI', 'PR', 'VI')
GROUP by carrier HAVING cnt > 100000 and max_year > '1990'
ORDER by rate DESC, cnt desc
LIMIT  10

The query will find the total number of delayed flights per each airline. In addition, the query will calculate the smart “ontime” rating, taking into consideration the number of flights (we do not want to compare smaller air carriers with the large ones, and we want to exclude the older airlines who are not in business anymore).

The main reason I’ve chosen this query is that it is hard to optimize it in MySQL. All conditions in the “where” clause will only filter out ~70% of rows. I’ve done a basic calculation:

mysql> select count(*) FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI');
+-----------+
| count(*)  |
+-----------+
| 108776741 |
+-----------+
mysql> select count(*) FROM ontime;
+-----------+
| count(*)  |
+-----------+
| 152657276 |
+-----------+
mysql> select round((108776741/152657276)*100, 2);
+-------------------------------------+
| round((108776741/152657276)*100, 2) |
+-------------------------------------+
|                               71.26 |
+-------------------------------------+

Table structure:

CREATE TABLE `ontime_part` (
  `YearD` int(11) NOT NULL,
  `Quarter` tinyint(4) DEFAULT NULL,
  `MonthD` tinyint(4) DEFAULT NULL,
  `DayofMonth` tinyint(4) DEFAULT NULL,
  `DayOfWeek` tinyint(4) DEFAULT NULL,
  `FlightDate` date DEFAULT NULL,
  `UniqueCarrier` char(7) DEFAULT NULL,
  `AirlineID` int(11) DEFAULT NULL,
  `Carrier` char(2) DEFAULT NULL,
  `TailNum` varchar(50) DEFAULT NULL,
...
  `id` int(11) NOT NULL AUTO_INCREMENT,
  PRIMARY KEY (`id`,`YearD`),
  KEY `covered` (`DayOfWeek`,`OriginState`,`DestState`,`Carrier`,`YearD`,`ArrDelayMinutes`)
) ENGINE=InnoDB AUTO_INCREMENT=162668935 DEFAULT CHARSET=latin1
/*!50100 PARTITION BY RANGE (YearD)
(PARTITION p1987 VALUES LESS THAN (1988) ENGINE = InnoDB,
 PARTITION p1988 VALUES LESS THAN (1989) ENGINE = InnoDB,
 PARTITION p1989 VALUES LESS THAN (1990) ENGINE = InnoDB,
 PARTITION p1990 VALUES LESS THAN (1991) ENGINE = InnoDB,
 PARTITION p1991 VALUES LESS THAN (1992) ENGINE = InnoDB,
 PARTITION p1992 VALUES LESS THAN (1993) ENGINE = InnoDB,
 PARTITION p1993 VALUES LESS THAN (1994) ENGINE = InnoDB,
 PARTITION p1994 VALUES LESS THAN (1995) ENGINE = InnoDB,
 PARTITION p1995 VALUES LESS THAN (1996) ENGINE = InnoDB,
 PARTITION p1996 VALUES LESS THAN (1997) ENGINE = InnoDB,
 PARTITION p1997 VALUES LESS THAN (1998) ENGINE = InnoDB,
 PARTITION p1998 VALUES LESS THAN (1999) ENGINE = InnoDB,
 PARTITION p1999 VALUES LESS THAN (2000) ENGINE = InnoDB,
 PARTITION p2000 VALUES LESS THAN (2001) ENGINE = InnoDB,
 PARTITION p2001 VALUES LESS THAN (2002) ENGINE = InnoDB,
 PARTITION p2002 VALUES LESS THAN (2003) ENGINE = InnoDB,
 PARTITION p2003 VALUES LESS THAN (2004) ENGINE = InnoDB,
 PARTITION p2004 VALUES LESS THAN (2005) ENGINE = InnoDB,
 PARTITION p2005 VALUES LESS THAN (2006) ENGINE = InnoDB,
 PARTITION p2006 VALUES LESS THAN (2007) ENGINE = InnoDB,
 PARTITION p2007 VALUES LESS THAN (2008) ENGINE = InnoDB,
 PARTITION p2008 VALUES LESS THAN (2009) ENGINE = InnoDB,
 PARTITION p2009 VALUES LESS THAN (2010) ENGINE = InnoDB,
 PARTITION p2010 VALUES LESS THAN (2011) ENGINE = InnoDB,
 PARTITION p2011 VALUES LESS THAN (2012) ENGINE = InnoDB,
 PARTITION p2012 VALUES LESS THAN (2013) ENGINE = InnoDB,
 PARTITION p2013 VALUES LESS THAN (2014) ENGINE = InnoDB,
 PARTITION p2014 VALUES LESS THAN (2015) ENGINE = InnoDB,
 PARTITION p2015 VALUES LESS THAN (2016) ENGINE = InnoDB,
 PARTITION p_new VALUES LESS THAN MAXVALUE ENGINE = InnoDB) */

Even with a “covered” index, MySQL will have to scan ~70M-100M of rows and create a temporary table:

mysql>  explain select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT  10G
*************************** 1. row ***************************
           id: 1
  select_type: SIMPLE
        table: ontime_part
         type: range
possible_keys: covered
          key: covered
      key_len: 2
          ref: NULL
         rows: 70483364
        Extra: Using where; Using index; Using temporary; Using filesort
1 row in set (0.00 sec)

What is the query response time in MySQL:

mysql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT  10;
+------------+----------+---------+----------+-----------------+------+
| min(yearD) | max_year | Carrier | cnt      | flights_delayed | rate |
+------------+----------+---------+----------+-----------------+------+
|       2003 |     2013 | EV      |  2962008 |          464264 | 0.16 |
|       2003 |     2013 | B6      |  1237400 |          187863 | 0.15 |
|       2006 |     2011 | XE      |  1615266 |          230977 | 0.14 |
|       2003 |     2005 | DH      |   501056 |           69833 | 0.14 |
|       2001 |     2013 | MQ      |  4518106 |          605698 | 0.13 |
|       2003 |     2013 | FL      |  1692887 |          212069 | 0.13 |
|       2004 |     2010 | OH      |  1307404 |          175258 | 0.13 |
|       2006 |     2013 | YV      |  1121025 |          143597 | 0.13 |
|       2003 |     2006 | RU      |  1007248 |          126733 | 0.13 |
|       1988 |     2013 | UA      | 10717383 |         1327196 | 0.12 |
+------------+----------+---------+----------+-----------------+------+
10 rows in set (19 min 16.58 sec)

19 minutes is definitely not great.

SQL in Spark

Now we want to run the same query inside Spark and let Spark read data from MySQL. We will create a “datasource” and execute the query:

scala> val jdbcDF = spark.read.format("jdbc").options(
     |   Map("url" ->  "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql",
     |   "dbtable" -> "ontime.ontime_sm",
     |   "fetchSize" -> "10000",
     |   "partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2015", "numPartitions" -> "48"
     |   )).load()
16/08/02 23:24:12 WARN JDBCRelation: The number of partitions is reduced because the specified number of partitions is less than the difference between upper bound and lower bound. Updated number of partitions: 27; Input number of partitions: 48; Lower bound: 1988; Upper bound: 2015.
dbcDF: org.apache.spark.sql.DataFrame = [id: int, YearD: date ... 19 more fields]
scala> jdbcDF.createOrReplaceTempView("ontime")
scala> val sqlDF = sql("select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT  10")
sqlDF: org.apache.spark.sql.DataFrame = [min(yearD): date, max_year: date ... 4 more fields]
scala> sqlDF.show()
+----------+--------+-------+--------+---------------+----+
|min(yearD)|max_year|Carrier|     cnt|flights_delayed|rate|
+----------+--------+-------+--------+---------------+----+
|      2003|    2013|     EV| 2962008|         464264|0.16|
|      2003|    2013|     B6| 1237400|         187863|0.15|
|      2006|    2011|     XE| 1615266|         230977|0.14|
|      2003|    2005|     DH|  501056|          69833|0.14|
|      2001|    2013|     MQ| 4518106|         605698|0.13|
|      2003|    2013|     FL| 1692887|         212069|0.13|
|      2004|    2010|     OH| 1307404|         175258|0.13|
|      2006|    2013|     YV| 1121025|         143597|0.13|
|      2003|    2006|     RU| 1007248|         126733|0.13|
|      1988|    2013|     UA|10717383|        1327196|0.12|
+----------+--------+-------+--------+---------------+----+

spark-shell does not show the query time. This can be retrieved from Web UI or from spark-sql. I’ve re-run the same query in spark-sql:

./bin/spark-sql --driver-memory 4G  --master spark://thor:7077
spark-sql> CREATE TEMPORARY VIEW ontime
         > USING org.apache.spark.sql.jdbc
         > OPTIONS (
         >      url  "jdbc:mysql://localhost:3306/ontime?user=root&password=",
         >      dbtable "ontime.ontime_part",
         >      fetchSize "1000",
         >      partitionColumn "yearD", lowerBound "1988", upperBound "2014", numPartitions "48"
         > );
16/08/04 01:44:27 WARN JDBCRelation: The number of partitions is reduced because the specified number of partitions is less than the difference between upper bound and lower bound. Updated number of partitions: 26; Input number of partitions: 48; Lower bound: 1988; Upper bound: 2014.
Time taken: 3.864 seconds
spark-sql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT  10;
16/08/04 01:45:13 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
2003    2013    EV      2962008 464264  0.16
2003    2013    B6      1237400 187863  0.15
2006    2011    XE      1615266 230977  0.14
2003    2005    DH      501056  69833   0.14
2001    2013    MQ      4518106 605698  0.13
2003    2013    FL      1692887 212069  0.13
2004    2010    OH      1307404 175258  0.13
2006    2013    YV      1121025 143597  0.13
2003    2006    RU      1007248 126733  0.13
1988    2013    UA      10717383        1327196 0.12
Time taken: 139.628 seconds, Fetched 10 row(s)

So the response time of the same query is almost 10x faster (on the same server, just one box). But now how was this query translated to MySQL queries, and why it is so much faster? Here is what is happening inside MySQL:

Inside MySQL

Spark:

scala> sqlDF.show()
[Stage 4:>                                                        (0 + 26) / 26]

MySQL:

mysql> select id, info from information_schema.processlist where info is not NULL and info not like '%information_schema%';
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id    | info                                                                                                                                                                                                                                                    |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 10948 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2001 AND yearD < 2002) |
| 10965 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2007 AND yearD < 2008) |
| 10966 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1991 AND yearD < 1992) |
| 10967 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1994 AND yearD < 1995) |
| 10968 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1998 AND yearD < 1999) |
| 10969 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2010 AND yearD < 2011) |
| 10970 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2002 AND yearD < 2003) |
| 10971 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2006 AND yearD < 2007) |
| 10972 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1990 AND yearD < 1991) |
| 10953 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2009 AND yearD < 2010) |
| 10947 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1993 AND yearD < 1994) |
| 10956 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD < 1989 or yearD is null)  |
| 10951 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2005 AND yearD < 2006) |
| 10954 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1996 AND yearD < 1997) |
| 10955 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2008 AND yearD < 2009) |
| 10961 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1999 AND yearD < 2000) |
| 10962 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2011 AND yearD < 2012) |
| 10963 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2003 AND yearD < 2004) |
| 10964 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1995 AND yearD < 1996) |
| 10957 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2004 AND yearD < 2005) |
| 10949 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1989 AND yearD < 1990) |
| 10950 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1997 AND yearD < 1998) |
| 10952 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2013)                  |
| 10958 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 1992 AND yearD < 1993) |
| 10960 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2000 AND yearD < 2001) |
| 10959 | SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2012 AND yearD < 2013) |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
26 rows in set (0.00 sec)

Spark is running 26 queries in parallel, which is great. As the table is partitioned it only uses one partition per query, but scans the whole partition:

mysql> explain partitions SELECT `YearD`,`ArrDelayMinutes`,`Carrier` FROM ontime.ontime_part WHERE (((NOT (DayOfWeek IN (6, 7)))) AND ((NOT (OriginState IN ('AK', 'HI', 'PR', 'VI')))) AND ((NOT (DestState IN ('AK', 'HI', 'PR', 'VI'))))) AND (yearD >= 2001 AND yearD < 2002)G
*************************** 1. row ***************************
           id: 1
  select_type: SIMPLE
        table: ontime_part
   partitions: p2001
         type: ALL
possible_keys: NULL
          key: NULL
      key_len: NULL
          ref: NULL
         rows: 5814106
        Extra: Using where
1 row in set (0.00 sec)

In this case, as the box has 12 CPU cores / 24 threads, it efficently executes 26 queries in parallel and the partitioned table helps to avoid contention issues (I wish MySQL could scan partitions in parallel, but it can’t at the time of writing).

Another interesting thing is that Spark can “push down” some of the conditions to MySQL, but only those inside the “where” clause. All group by/order by/aggregations are done inside Spark. It  needs to retrieve data from MySQL to satisfy those conditions and will not push down group by/order by/etc to MySQL.

That also means that queries without “where” conditions (for example “select count(*) as cnt, carrier from ontime group by carrier order by cnt desc limit 10”) will have to retrieve all data from MySQL and load it to Spark (as opposed to MySQL will do all group by inside). Running it in Spark might be slower or faster (depending on the amount of data and use of indexes) but it also requires more resources and potentially more memory dedicated for Spark. The above query is translated to 26 queries, each does a “select carrier from ontime_part where (yearD >= N AND yearD < N)”

Pushing down the whole query into MySQL 

If we want to avoid sending all data from MySQL to Spark we have the option of creating a temporary table on top of a query (similar to MySQL’s create temporary table as select …). In Scala:

val tableQuery =
 "(select yeard, count(*) from ontime group by yeard) tmp"
 val jdbcDFtmp = spark.read.format("jdbc").options(
   Map("url" ->  "jdbc:mysql://localhost:3306/ontime?user=root&password=",
   "dbtable" -> tableQuery,
   "fetchSize" -> "10000"
   )).load()
jdbcDFtmp.createOrReplaceTempView("ontime_tmp")

In Spark SQL:

CREATE TEMPORARY VIEW ontime_tmp
USING org.apache.spark.sql.jdbc
OPTIONS (
     url  "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql",
     dbtable "(select yeard, count(*) from ontime_part group by yeard) tmp",
     fetchSize "1000"
);
select * from ontime_tmp;

Please note:

  1. We do not want to use “partitionColumn” here, otherwise we will see 26 queries like this in MySQL: “SELECT yeard, count(*) FROM (select yeard, count(*) from ontime_part group by yeard) tmp where (yearD >= N AND yearD < N)” (obviously not optimal)
  2. This is not a good use of Spark, more like a “hack.” The only good reason to do it is to be able to have the result of the query as a source of an additional query.
Query cache in Spark

Another option is to cache the result of the query (or even the whole table) and then use .filter in Scala for faster processing. This requires sufficient memory dedicated for Spark. The good news is we can add additional nodes to Spark and get more memory for Spark cluster.

Spark SQL example:

CREATE TEMPORARY VIEW ontime_latest
USING org.apache.spark.sql.jdbc
OPTIONS (
     url  "jdbc:mysql://localhost:3306/ontime?user=root&password=",
     dbtable "ontime.ontime_part partition (p2013, p2014)",
     fetchSize "1000",
     partitionColumn "yearD", lowerBound "1988", upperBound "2014", numPartitions "26"
);
cache table ontime_latest;
spark-sql> cache table ontime_latest;
Time taken: 465.076 seconds
spark-sql> select count(*) from ontime_latest;
5349447
Time taken: 0.526 seconds, Fetched 1 row(s)
spark-sql> select count(*), dayofweek from ontime_latest group by dayofweek;
790896  1
634664  6
795540  3
794667  5
808243  4
743282  7
782155  2
Time taken: 0.541 seconds, Fetched 7 row(s)
spark-sql> select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_latest WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and (origin='RDU' or dest = 'RDU') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT  10;
2013    2013    MQ      9339    1734    0.19
2013    2013    B6      3302    516     0.16
2013    2013    EV      9225    1331    0.14
2013    2013    UA      1317    177     0.13
2013    2013    AA      5354    620     0.12
2013    2013    9E      5520    593     0.11
2013    2013    WN      10968   1130    0.1
2013    2013    US      5722    549     0.1
2013    2013    DL      6313    478     0.08
2013    2013    FL      2433    205     0.08
Time taken: 2.036 seconds, Fetched 10 row(s)

Here we cache partitions p2013 and p2014 in Spark. This retrieves the data from MySQL and loads it in Spark. After that all queries run on the cached data and will be much faster.

With Scala we can cache the result of a query and then use filters to only get the information we need:

val sqlDF = sql("SELECT flightdate, origin, dest, depdelayminutes, arrdelayminutes, carrier, TailNum, Cancelled, Diverted, Distance from ontime")
sqlDF.cache().show()
scala> sqlDF.filter("flightdate='1988-01-01'").count()
res5: Long = 862

Using Spark with Percona XtraDB Cluster

As Spark can be used in a cluster mode and scale with more and more nodes, reading data from a single MySQL is a bottleneck. We can use MySQL replication slave servers or Percona XtraDB Cluster (PXC) nodes as a Spark datasource. To test it out, I’ve provisioned Percona XtraDB Cluster with three nodes on AWS (I’ve used m4.2xlarge Ubuntu instances) and also started Apache Spark on each node:

  1. Node1 (pxc1): Percona Server + Spark Master + Spark worker node + Spark SQL running
  2. Node2 (pxc2): Percona Server + Spark worker node
  3. Node3 (pxc3): Percona Server + Spark worker node

All the Spark worker nodes use the memory configuration option:

cat conf/spark-env.sh
export SPARK_WORKER_MEMORY=24g

Then I can start spark-sql (also need to have connector/J JAR file copied to all nodes):

$ ./bin/spark-sql --driver-memory 4G --master spark://pxc1:7077

When creating a table, I still use localhost to connect to MySQL (url “jdbc:mysql://localhost:3306/ontime?user=root&password=xxx”). As Spark worker nodes are running on the same instance as Percona Cluster nodes, it will use the local connection. Then running a Spark SQL will evenly distribute all 26 MySQL queries among the three MySQL nodes.

Alternatively we can run Spark cluster on a separate host and connect it to the HA Proxy, which in turn will load balance selects across multiple Percona XtraDB Cluster nodes.

Query Performance Benchmark

Finally, here is the query response time test on the three AWS Percona XtraDB Cluster nodes:

Query 1:

select min(yearD), max(yearD) as max_year, Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_part WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') GROUP by carrier HAVING cnt > 1000 and max_year > '1990' ORDER by rate DESC, cnt desc LIMIT 10;

Query / Index type MySQL Time Spark Time (3 nodes) Times Improvement
No covered index (partitioned) 19 min 16.58 sec 192.17 sec 6.02
Covered index (partitioned) 2 min 10.81 sec 48.38 sec 2.7

 

Query 2: 

select dayofweek, count(*) from ontime_part group by dayofweek;

Query / Index type MySQL Time Spark Time (3 nodes) Times Improvement
No covered index (partitoned) 19 min 15.21 sec 195.058 sec 5.92
Covered index (partitioned) 1 min 10.38 sec 27.323 sec 2.58

 

Now, this looks really good, but it can be better. With three nodes @ m4.2xlarge we will have 8*3 = 24 cores total (although they are shared between Spark and MySQL). We can expect 10x improvement, especially without a covered index.

However, on m4.2xlarge the amount of RAM did not allow me to run MySQL out of memory, so all reads were from EBS non-provisioned IOPS, which only gave me ~120MB/sec. I’ve redone the test on a set of three dedicated servers:

  • 28 cores E5-2683 v3 @ 2.00GHz
  • 240GB of RAM
  • Samsung 850 PRO

The test was running completely off RAM:

Query 1 (from the above)

Query / Index type MySQL Time Spark Time (3 nodes) Times Improvement
No covered index (partitoned) 3 min 13.94 sec 14.255 sec 13.61
Covered index (partitioned) 2 min 2.11 sec 9.035 sec 13.52

 

Query 2: 

select dayofweek, count(*) from ontime_part group by dayofweek;

Query / Index type MySQL Time Spark Time (3 nodes) Times Improvement
No covered index (partitoned)  2 min 0.36 sec 7.055 sec 17.06
Covered index (partitioned) 1 min 6.85 sec 4.514 sec 14.81

 

With this amount of cores and running out of RAM we actually do not have enough concurrency as the table only have 26 partitions. I’ve tried the unpartitioned table with ID primary key and use 128 partitions.

Note about partitioning

I’ve used partitioned table (partition by year) in my tests to help reduce MySQL level contention. At the same time the “partitionColumn” option in Spark does not require that MySQL table is partitioned. For example, if a table has a primary key, we can use this CREATE VIEW in Spark :

CREATE OR REPLACE TEMPORARY VIEW ontime
USING org.apache.spark.sql.jdbc
OPTIONS (
  url  "jdbc:mysql://127.0.0.1:3306/ontime?user=root&password=",
  dbtable "ontime.ontime",
  fetchSize "1000",
  partitionColumn "id", lowerBound "1", upperBound "162668934", numPartitions "128"
);

Assuming we have enough MySQL servers (i.e., nodes or slaves), we can increase the number of partitions and that can improve the parallelism (as opposed to only 26 partitions when running one partition by year). Actually, the above test gives us even better response time: 6.44 seconds for query 1.

Where Spark doesn’t work well

For faster queries (those that use indexes or can efficiently use an index) it does not make sense to use Spark. Retrieving data from MySQL and loading it into Spark is not free. This overhead can be significant for faster queries. For example, a query like this 

select count(*) from ontime_part where YearD = 2013 and DayOfWeek = 7 and OriginState = 'NC' and DestState = 'NC';

 will only scan 1300 rows and will return instant (0.00 seconds reported by MySQL).

An even better example is this: 

select max(id) from ontime_part

. In MySQL, the query will use the index and all calculations will be done inside MySQL. Spark, on the other hand, will have to retrieve all IDs (select id from ontime_part) from MySQL and calculate maximum. That took 24.267 seconds.

Conclusion

Using Apache Spark as an additional engine level on top of MySQL can help to speed up the slow reporting queries and add much-needed scalability for the long running select queries. In addition, Spark can help with query caching for frequent queries.

PS: Visual explain plan with Spark

Spark Web GUI provides lots of ways of monitoring Spark jobs. For example, it shows the “job” progress:

spark_jobs

And SQL visual explain details:

slow MySQL queries

Jan
15
2016
--

Making Apache Spark Four Times Faster

Apache SparkThis is a followup to my previous post Apache Spark with Air ontime performance data.

To recap an interesting point in that post: when using 48 cores with the server, the result was worse than with 12 cores. I wanted to understand the reason is was true, so I started digging. My primary suspicion was that Java (I never trust Java) was not good dealing with 100GB of memory.

There are few links pointing to the potential issues with a huge HEAP:

http://stackoverflow.com/questions/214362/java-very-large-heap-sizes
https://blog.codecentric.de/en/2014/02/35gb-heap-less-32gb-java-jvm-memory-oddities/

Following the last article’s advice, I ran four instances of Spark’s slaves. This is an old technique to better utilize resources, as often (as is well known from old MySQL times) one instance doesn’t scale well.

I added the following to the config:

export SPARK_WORKER_INSTANCES=4
export SPARK_WORKER_CORES=12
export SPARK_WORKER_MEMORY=25g

The full description of the test can be found in my previous post Apache Spark with Air ontime performance data

The results:
Apache Spark

Although the results for four instances still don’t scale much after using 12 cores, at least there is no extra penalty for using more.

It could be that the dataset is just not big enough to show the setup’s full potential.

I think there is a clear indication that with the 25GB HEAP size, Java performs much better than with 100GB – at least with Oracle’s JDK (there are comments that a third-party commercial JDK may handle this better).

This is something to keep in mind when working with Java-based servers (like Apache Spark) on high end servers.

Jan
07
2016
--

Apache Spark with Air ontime performance data

Apache SparkThere is a growing interest in Apache Spark, so I wanted to play with it (especially after Alexander Rubin’s Using Apache Spark post).

To start, I used the recently released Apache Spark 1.6.0 for this experiment, and I will play with “Airlines On-Time Performance” database from
http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time. You can find the scripts I used here https://github.com/Percona-Lab/ontime-airline-performance. The uncompressed dataset is about 70GB, which is not really that huge overall, but quite convenient to play with.

As a first step, I converted it to Parquet format. It’s a column based format, suitable for parallel processing, and it supports partitioning.

The script I used was the following:

# bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/data/opt/otp/On_Time_On_Time_Performance_*.csv")
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
df.write.partitionBy("Year").parquet("/data/flash/spark/otp")

Conveniently, by using just two commands (three if to count setting compression, “snappy” in this case) we can convert ALL of the .csv files into Parquet (doing it in parallel).

The datasize after compression is only 3.5GB, which is a quite impressive compression factor of 20x. I’m guessing the column format with repeatable data allows this compression level.

In general, Apache Spark makes it very easy to handle the Extract, Transform and Load (ETL) process.

Another one of Spark’s attractive features is that it automatically uses all CPU cores and execute complexes in parallel (something MySQL still can’t do). So I wanted to understand how fast it can execute a query compared to MySQL,  and how efficient it is in using multiple cores.

For this I decided to use a query such as:
"SELECT avg(cnt) FROM (SELECT Year,Month,COUNT(*) FROM otp WHERE DepDel15=1 GROUP BY Year,Month) t1"

Which translates to the following Spark DataFrame manipulation:

(pFile.filter("DepDel15=1").groupBy("Year","Month").count()).agg(avg("count")).show()

I should note that Spark is perfectly capable of executing this as SQL query, but I want to learn more about DataFrame manipulation.

The full script I executed is:

val pFile = sqlContext.read.parquet("/mnt/i3600/spark/otp1").cache();
for( a <- 1 to 6){
println("Try: " +a )
val t1=System.currentTimeMillis;
(pFile.filter("DepDel15=1").groupBy("Year","Month").count()).agg(avg("count")).show();
val t2=System.currentTimeMillis;
println("Try: "+a+" Time: " + (t2-t1))
}
exit

And I used the following command line to call the script:

for i in `seq 2 2 48` ; do bin/spark-shell --executor-cores $i -i run.scala  | tee -a $i.schema.res ; done

which basically tells it to use from 2 to 48 cores (the server I use has 48 CPU cores) in steps of two.

I executed this same query six times. The first time is a cold run, and data is read from the disk. The rest are hot runs, and the query should be executed from memory (this server has 128GB of RAM, and I allocated 100GB to the Spark executor).

I measured the execution time in cold and hot runs, and how it changed as more cores were added.

There was a lot of variance in the execution time of the hot runs, so I show all the results to demonstrate any trends.

Cold runs:
cold

More cores seem to help, but after a certain point – not so much.

Hot runs:
hot

The best execution time was when 14-22 cores were used. Adding more cores after that, seems to actually make things worse. I would guess that the datasize is small enough so that the communication and coordination overhead cost exceeded the benefits of more parallel processing cores.

Comparing to MySQL

Just to have some points for comparison, I executed the same query in MySQL 5.7 using the following table schema: https://github.com/Percona-Lab/ontime-airline-performance/blob/master/mysql/create_table.sql

The hot execution time for the same query in MySQL (MySQL can use only one CPU core to execute one query) is 350 seconds (or 350,000ms to compare with the data on charts) when using the table without indexes. This is about 11 times worse than the best execution time in Spark.

If we use a small trick and createa  covering index in MySQL designed for this query:

"ALTER TABLE ontime ADD KEY (Year,Month,DepDel15)"

then we can improve execution time to 90 seconds. This is still worse than Spark, but the difference is not as big. We can’t, however, create index for each ad-hoc query, while Spark is capable of processing a variety of queries.

In conclusion, I can say that Spark is indeed an attractive option for data analytics queries
(and in fact it can do much more). It is worth keep in mind, however, that in this experiment
it did not scale well with multiple CPU cores. I wonder if the same problem appears when we use multiple server nodes.

If you have recommendations on how I can improve the results, please post it in comments.

Spark configuration I used (in Standalone cluster setup):

export MASTER=spark://`hostname`:7077
export SPARK_MEM=100g
export SPARK_DAEMON_MEMORY=2g
export SPARK_LOCAL_DIRS=/mnt/i3600/spark/tmp
export SPARK_WORKER_DIR=/mnt/i3600/spark/tmp

Oct
07
2015
--

Using Apache Spark and MySQL for Data Analysis

What is Spark

Apache Spark is a cluster computing framework, similar to Apache Hadoop. Wikipedia has a great description of it:

Apache Spark is an open source cluster computing framework originally developed in the AMPLab at University of California, Berkeley but was later donated to the Apache Software Foundation where it remains today. In contrast to Hadoop’s two-stage disk-based MapReduce paradigm, Spark’s multi-stage in-memory primitives provides performance up to 100 times faster for certain applications. By allowing user programs to load data into a cluster’s memory and query it repeatedly, Spark is well-suited to machine learning algorithms.

Apache Spark

In contrast to popular belief, Spark does not require all data to fit into memory but will use caching to speed up the operations (just like MySQL). Spark can also run in standalone mode and does not require Hadoop; it can also be run on a single server (or even laptop or desktop) and utilize all your CPU cores.

Starting it in a distributed mode is really easy. Start the “master” first. You can run the “slave” on the same node:

root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

Then run Spark Worker on any additional nodes (make sure to add the hostname to /etc/hosts or use DNS):

root@d31:~/spark# ./sbin/start-slave.sh spark://thor:7077

Why Spark and Not MySQL?

There are a number of tasks where MySQL (out-of-the-box) does not show great performance. One of the MySQL limitations is: 1 query = 1 cpu core. It means that even if you have 48 fast cores and a large dataset to process (i.e. group by, sort, etc) it will not utilize the full computing power. Spark, on the contrary, will be able to utilize all your CPU cores.

Another difference between MySQL and Spark:

  • MySQL uses so called “schema on write” – it will need the data to be converted into MySQL. If our data is not inside MySQL you can’t use “sql” to query it.
  • Spark (and Hadoop/Hive as well) uses “schema on read” – it can apply a table structure on top of a compressed text file, for example, (or any other supported input format)  and see it as a table; then we can use SQL to query this “table.”

In other words, MySQL is storage+processing while Spark’s job is processing only, and it can pipe data directly from/to external datasets, i.e., Hadoop, Amazon S3, local files, JDBC (MySQL/other databases). Spark supports text files (compressed), SequenceFiles, and any other Hadoop InputFormat as well as Parquet Columnar storage. Spark is more flexible in this regard compared to Hadoop: Spark can read data directly from MySQL, for example.

The typical pipeline to load external data to MySQL is:

  1. Uncompress (typically the external data is in compressed text files)
  2. Load it into MySQL’s staging table with “LOAD DATA INFILE”
  3. Only then we can filter/group by and save the result in another table

That can cause additional overhead. In many cases we do not need the “raw” data but we still have to load it into MySQL.

Why Spark Together With MySQL

On the contrary, the result of our analysis (i.e. aggregated data) should be in MySQL. It does not have to be, but it is much more convenient to store the result of your analysis in MySQL. Let’s say you want to analyze a big dataset (i.e. year to year sales comparison) and you will need to present it in the form of a table or graph. The result set will be significantly smaller as it will be aggregated and it will be much easier to store it in MySQL as many standard applications will work with that.

Real World Test Case

One of interesting free datasets is Wikipedia Page Counts. (>1TB compressed, available since 2008). This data can be downloaded (as gzipped space delimited text files) and is also available (limited dataset) on AWS. The data is aggregated by the hour and has the following fields:

  • project (i.e. “en”, “fr”, etc, which is usually a language)
  • title of the page (uri), urlencoded
  • number of requests
  • size of the content returned

(the date field is encoded inside the filename, 1 file per hour)

Our goal will be to find the top 10 pages by the number of requests per day in English Wikipedia, but also to support searching for an arbitrary word so we can show how, for example, the number of requests for the wikipedia article about “myspace” will compare to the article about “facebook” (2008 to 2015).

To do that in MySQL we will have to load it as is into MySQL. The files are distributed with the date part encoded. The uncompressed size of all files is > 10TB. Here are the possible steps (as per our typical MySQL pipeline):

  1. Uncompress the file and run “LOAD DATA INFILE” into a staging (temporary) table:
    load data local infile '$file'
    into table wikistats.wikistats_full CHARACTER SET latin1
    FIELDS TERMINATED BY ' '
    (project_name, title, num_requests, content_size)
    set request_date = STR_TO_DATE('$datestr', '%Y%m%d %H%i%S');
  2. Aggregate with “insert into” a final table
    insert into wikistats.wikistats_by_day
    select date(request_date) as request_day, title, count(*), sum(num_requests)
    from wikistats.wikistats_full
    group by request_day, title;
  3. Somehow url decode the title (may be using UDF).

This is a big overhead. We will uncompress and transform the data into MySQL just to discard most if it.

According to my calculations it should table > 1 month to do the whole pipeline for 6 years of data (this time does not include the uncompress time and does not include the load time depreciation as the table get bigger and bigger and indexes need to be updated). There are a lots of things we can do here to speed it up of course, i.e., load into different MySQL instances, load into MEMORY table first, then group by into InnoDB, etc.

But one of the easiest ways here will be using Apache Spark and Python script (pyspark). Pyspark can read the original gziped text files, query those text files with SQL, apply any filters, functions, i.e. urldecode, group by day and save the resultset into MySQL.

Here is the Python script to perform those actions:

from pyspark import SparkContext
sc=SparkContext()
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
import urllib
from datetime import timedelta, date
def load_day(filename, mydate):
    # Load a text file and convert each line to a Row.
    lines = sc.textFile(filename)
    parts = lines.map(lambda l: l.split(" ")).filter(lambda line: line[0]=="en").filter(lambda line: len(line)>3).cache()
    wiki = parts.map(lambda p: Row(project=p[0],  url=urllib.unquote(p[1]).lower(), num_requests=int(p[2]), content_size=int(p[3])))
    #wiki.count()
    # Infer the schema, and register the DataFrame as a table.
    schemaWiki = sqlContext.createDataFrame(wiki)
    schemaWiki.registerTempTable("wikistats")
    group_res = sqlContext.sql("SELECT '"+ mydate + "' as mydate, url, count(*) as cnt, sum(num_requests) as tot_visits FROM wikistats group by url")
    # Save to MySQL
    mysql_url="jdbc:mysql://thor?user=wikistats&password=wikistats"
    group_res.write.jdbc(url=mysql_url, table="wikistats.wikistats_by_day_spark", mode="append")
    # Write to parquet file - if needed
    group_res.saveAsParquetFile("/ssd/wikistats_parquet_bydate/mydate=" + mydate)
mount = "/data/wikistats/"
d= date(2008, 1, 1)
end_date = date(2008, 2, 1)
delta = timedelta(days=1)
while d < end_date:
    print d.strftime("%Y-%m-%d")
    filename=mount + "wikistats//dumps.wikimedia.org/other/pagecounts-raw/2008/2008-01/pagecounts-200801" + d.strftime("%d") + "-*.gz"
    print(filename)
    load_day(filename, d.strftime("%Y-%m-%d"))
    d += delta

In the script I used Spark to read the original gzip files (1 day at a time). We can use directory as “input” or a list of files. I will then use Resilient Data Set (RDD) transformations; python has lambda functions: map and filter which will allow us to split the “input files” and filter them.

The next step will be to apply the schema (declare fields); here we can also apply any other functions; i.e., I use urllib.unquote to decode the title (urldecode). Finally, we can register the temp table and then use familiar SQL to do the group by.

The script will normally utilize all cpu cores. In addition it is very easy to run it in distributed mode even without Hadoop: just copy the files to all machines in a Spark cluster or use NFS/external storage.

The script took about an hour on 3 boxes to process 1 month of data and load the aggregated data to MySQL (single instance). We can estimate that to load all 6 years (aggregated) to MySQL is ~3 days.

You may now ask, why is it significantly faster (and we still have the result loaded to the same MySQL instance)? The answer is, it is a different, more efficient pipeline. In our original MySQL pipeline (which will probably take months) we load the raw data to MySQL. Here we filter and group on read, and write only what we need to MySQL.

One question may also come up here: do we actually need this whole “pipeline?” Can we simply run our analytical queries on top of the “raw” data? Well, that is possible, but will probably require 1000 nodes Spark Cluster to do it efficiently as it will need to scan through 5TB of data (see “more reading” below).

Multi-treaded Performance for MySQL Inserts

When using group_res.write.jdbc(url=mysql_url, table=”wikistats.wikistats_by_day_spark”, mode=”append”) Spark will use multiple threads to insert into MySQL.

+------+-----------+------------+-----------+---------+------+--------+--------------------------------------------------------------------------------------------------------+-----------+---------------+
 | Id   | User      | Host       | db        | Command | Time | State  | Info                                                                                                   | Rows_sent | Rows_examined |
 +------+-----------+------------+-----------+---------+------+--------+--------------------------------------------------------------------------------------------------------+-----------+---------------+
 | 1050 | root      | localhost  | wikistats | Query   |    0 | init   | show processlist                                                                                       |         0 |             0 |
 | 2133 | wikistats | thor:40994 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Colegio+san+ignacio', 1, 1)        |         0 |             0 |
 | 2134 | wikistats | thor:40995 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Miloš_Crnjanski', 2, 3)            |         0 |             0 |
 | 2135 | wikistats | thor:40996 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Robert_Edgar', 6, 7)               |         0 |             0 |
 | 2136 | wikistats | thor:40997 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Eastern_Orange_Tip', 6, 7)         |         0 |             0 |
 | 2137 | wikistats | thor:40998 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Image:Dresden_Augustusbrücke_Al   |         0 |             0 |
 | 2138 | wikistats | thor:40999 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Diamond_and_pearl', 11, 24)        |         0 |             0 |
 | 2139 | wikistats | thor:41000 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Operation_polo', 2, 2)             |         0 |             0 |
 | 2140 | wikistats | thor:41001 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Template_talk:Edit-first-section   |         0 |             0 |
 | 2141 | wikistats | thor:41002 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Bertha_of_Artois', 1, 1)           |         0 |             0 |
 | 2142 | wikistats | thor:41003 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'A Change of Pace', 1, 1)           |         0 |             0 |
 | 2143 | wikistats | thor:41005 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'FAIRCHILD-REPUBLIC A-10 THUNDERB   |         0 |             0 |
 | 2144 | wikistats | thor:41006 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Special:Recentchangeslinked/Wiki   |         0 |             0 |
 | 2145 | wikistats | thor:41007 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Image:Carl-sassenrath-sp-1982.jp   |         0 |             0 |
 | 2146 | wikistats | thor:41008 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'List_of_Fleet_Air_Arm_aircraft_s   |         0 |             0 |
 | 2147 | wikistats | thor:41009 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Systemic_sclerosis', 17, 29)       |         0 |             0 |
 | 2148 | wikistats | thor:41011 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'tataviam', 1, 1)                   |         0 |             0 |
 | 2149 | wikistats | thor:41010 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'The_Devil_Wears_Prada_(film)#_no   |         0 |             0 |
 | 2150 | wikistats | thor:41013 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Seaford_High_School', 5, 7)        |         0 |             0 |
 | 2151 | wikistats | thor:41014 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Talk:Shocker_(hand_gesture)', 3,   |         0 |             0 |
 | 2152 | wikistats | thor:41015 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Paul_Szabo', 14, 23)               |         0 |             0 |
 | 2153 | wikistats | thor:41016 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'ausgereift', 1, 1)                 |         0 |             0 |
 | 2154 | wikistats | thor:41017 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Category:March_2005_news', 1, 2)   |         0 |             0 |
 | 2155 | wikistats | thor:41018 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Foot_Locker_Inc', 10, 10)          |         0 |             0 |
 | 2156 | wikistats | thor:41019 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Abbey_Park,_Nottinghamshire', 3,   |         0 |             0 |
 +------+-----------+------------+-----------+---------+------+--------+--------------------------------------------------------------------------------------------------------+-----------+---------------+
 25 rows in set (0.00 sec)

Monitoring your jobs

Spark provides you with a web interface  to monitor and manage your job. Here is the example: I’m running the wikistats.py application:

Spark Web Interface Screen Shot 2015-09-22 at 2.32.50 PM

Result: Using Parquet Columnar Format vs MySQL InnoDB table

Spark supports Apache Parquet Columnar format, so we can save RDD as a parquet file (it can be saved to a directory to HDFS):

group_res.saveAsParquetFile("/ssd/wikistats_parquet_bydate/mydate=" + mydate)

Here we save the result of our pipeline (aggregated data) into Spark. I also utilize partitioning by day (“mydate=20080101”) and Spark can auto discover partitions in this format.

When I have my results, I want to query it. Let’s say I want to find the top 10 most frequently queried wiki pages in January 2018. I can do this query with MySQL (I will need to filter out main page and search pages):

mysql> SELECT lower(url) as lurl, sum(tot_visits) as max_visits , count(*) FROM wikistats_by_day_spark where lower(url) not like '%special%' and lower(url) not like '%page%' and lower(url) not like '%test%' and lower(url) not like '%wiki%' group by lower(url) order by max_visits desc limit 10;
+--------------------------------------------------------+------------+----------+
| lurl                                                   | max_visits | count(*) |
+--------------------------------------------------------+------------+----------+
| heath_ledger                                           |    4247338 |      131 |
| cloverfield                                            |    3846404 |      131 |
| barack_obama                                           |    2238406 |      153 |
| 1925_in_baseball#negro_league_baseball_final_standings |    1791341 |       11 |
| the_dark_knight_(film)                                 |    1417186 |       64 |
| martin_luther_king,_jr.                                |    1394934 |      136 |
| deaths_in_2008                                         |    1372510 |       67 |
| united_states                                          |    1357253 |      167 |
| scientology                                            |    1349654 |      108 |
| portal:current_events                                  |    1261538 |      125 |
+--------------------------------------------------------+------------+----------+
10 rows in set (1 hour 22 min 10.02 sec)

Please note, here we are using our already aggregated (summary by data) table, not the “raw” data.

As we can see, the query took 1 hour 22 mins. I have also saved the same results to Parquet (see the script), so now I can use it with Spark-SQL:

./bin/spark-sql --master local

This will use a local version of spark-sql, using 1 host only.

spark-sql> CREATE TEMPORARY TABLE wikistats_parquet
USING org.apache.spark.sql.parquet
OPTIONS (
  path "/ssd/wikistats_parquet_bydate"
);
Time taken: 3.466 seconds
spark-sql> select count(*) from wikistats_parquet;
select count(*) from wikistats_parquet;
227846039
Time taken: 82.185 seconds, Fetched 1 row(s)
spark-sql> SELECT lower(url) as lurl, sum(tot_visits) as max_visits , count(*) FROM wikistats_parquet where lower(url) not like '%special%' and lower(url) not like '%page%' and lower(url) not like '%test%' and lower(url) not like '%wiki%' group by lower(url) order by max_visits desc limit 10;
heath_ledger    4247335 42
cloverfield     3846400 42
barack_obama    2238402 53
1925_in_baseball#negro_league_baseball_final_standings  1791341 11
the_dark_knight_(film)  1417183 36
martin_luther_king,_jr. 1394934 46
deaths_in_2008  1372510 38
united_states   1357251 55
scientology     1349650 44
portal:current_events   1261305 44
Time taken: 1239.014 seconds, Fetched 10 row(s)

That took ~20 minutes, which is much faster.

Conclusion

Apache Spark provides a great and easy way to analyze and aggregate data. What I love about Spark vs other big data and analytical frameworks:

  • Open-source and actively developed
  • No dependency on tools, i.e., the input data and output data does not have to be in Hadoop
  • Standalone mode for quick start, easy to deploy
  • Massively parallel, easy to add nodes
  • Support of variety of input and output format; i.e., it can read/write to MySQL (vs JDBC driver) and Parquet Columnar format

However, there are a number of drawbacks:

  • It is still new so you can expect some bugs and undocumented behavior. Many of the errors are hard to explain.
  • It requires Java; Spark 1.5 only supports Java 7 and higher.  That also means it will require additional memory, which is reasonable nowadays.
  • You will need to run jobs through “spark-submit”

I believe Apache Spark is a great tool and can complement MySQL for data analytical and BI purposes.

More reading

The post Using Apache Spark and MySQL for Data Analysis appeared first on MySQL Performance Blog.

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com