Oct
17
2017
--

Feedzai closes $50M Series C to help banks and merchants identify fraud with AI

 Feedzai is announcing a $50 million Series C this morning led by an unnamed VC with additional capital from Sapphire Ventures. The six year old startup builds machine learning tools to help banks and merchants spot payment fraud. In today’s rapidly maturing world of fintech, Feedzai is trying to thread the needle between turnkey solution and customizable platform. With 60 clients… Read More

Sep
12
2017
--

Alteryx Promote puts data science to work across the company

 When Alteryx acquired Yhat in June, it was only a matter of time before the startup’s data-science management software began showing up in Alteryx. Just today, the company announced Alteryx Promote, a new tool based on Yhat’s product set.
The company made the announcement at the Alteryx Inspire Europe customer event taking place in London this week.
Alteryx Promote gives data… Read More

Sep
06
2017
--

Dataiku to enhance data tools with $28 million investment led by Battery Ventures

 Dataiku, a French startup that helps data analysts communicate with data scientists to build more meaningful data applications, announced a significant funding round today. The company scored a $28 million Series B investment led by Battery Ventures with help from FirstMark, Serena Capital and Alven. Today’s money brings the total raised to almost $45 million. Its most recent prior round… Read More

Nov
30
2016
--

Eric Schmidt-backed data science startup Civis Analytics raises $22M

Dan Wagner Civis Analytics, a company founded by the chief analytics officer of Barack Obama’s 2012 re-election campaign, has raised $22 million in Series A funding. Founder and CEO Dan Wagner told me that after the election, he discovered that many organizations — whether they were in politics, government or business — were dealing with problems similar to the ones he faced during… Read More

Nov
19
2016
--

How data science and rocket science will get humans to Mars

Laptop computer with red ethernet cable forming a rocket, coming out of the back on a plain background President Obama recently re-affirmed America’s commitment to sending a manned mission to Mars. Think your data science challenges are complicated? Imagine the difficulties involved in mining data to understand the health impacts of a trip to Mars. When sending humans “where no one has gone before,” there are a multitude of variables to consider, and NASA is hard at work… Read More

Nov
19
2015
--

Apixio’s New Iris Platform Uses Your Doctor’s Notes To Derive Insights

Apixio-HCC-Profiler-Dashboard [18837613] Data science applications for healthcare are finally trying to catch up to the rest of the world, with one new effort coming from six-year-old Apixio in San Mateo, CA. This morning, the company is launching a cognitive computing platform called Iris that derives insights from clinical data and other information in the health system. More specifically, Iris uses a powerful data… Read More

Oct
07
2015
--

Using Apache Spark and MySQL for Data Analysis

What is Spark

Apache Spark is a cluster computing framework, similar to Apache Hadoop. Wikipedia has a great description of it:

Apache Spark is an open source cluster computing framework originally developed in the AMPLab at University of California, Berkeley but was later donated to the Apache Software Foundation where it remains today. In contrast to Hadoop’s two-stage disk-based MapReduce paradigm, Spark’s multi-stage in-memory primitives provides performance up to 100 times faster for certain applications. By allowing user programs to load data into a cluster’s memory and query it repeatedly, Spark is well-suited to machine learning algorithms.

Apache Spark

In contrast to popular belief, Spark does not require all data to fit into memory but will use caching to speed up the operations (just like MySQL). Spark can also run in standalone mode and does not require Hadoop; it can also be run on a single server (or even laptop or desktop) and utilize all your CPU cores.

Starting it in a distributed mode is really easy. Start the “master” first. You can run the “slave” on the same node:

root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

Then run Spark Worker on any additional nodes (make sure to add the hostname to /etc/hosts or use DNS):

root@d31:~/spark# ./sbin/start-slave.sh spark://thor:7077

Why Spark and Not MySQL?

There are a number of tasks where MySQL (out-of-the-box) does not show great performance. One of the MySQL limitations is: 1 query = 1 cpu core. It means that even if you have 48 fast cores and a large dataset to process (i.e. group by, sort, etc) it will not utilize the full computing power. Spark, on the contrary, will be able to utilize all your CPU cores.

Another difference between MySQL and Spark:

  • MySQL uses so called “schema on write” – it will need the data to be converted into MySQL. If our data is not inside MySQL you can’t use “sql” to query it.
  • Spark (and Hadoop/Hive as well) uses “schema on read” – it can apply a table structure on top of a compressed text file, for example, (or any other supported input format)  and see it as a table; then we can use SQL to query this “table.”

In other words, MySQL is storage+processing while Spark’s job is processing only, and it can pipe data directly from/to external datasets, i.e., Hadoop, Amazon S3, local files, JDBC (MySQL/other databases). Spark supports text files (compressed), SequenceFiles, and any other Hadoop InputFormat as well as Parquet Columnar storage. Spark is more flexible in this regard compared to Hadoop: Spark can read data directly from MySQL, for example.

The typical pipeline to load external data to MySQL is:

  1. Uncompress (typically the external data is in compressed text files)
  2. Load it into MySQL’s staging table with “LOAD DATA INFILE”
  3. Only then we can filter/group by and save the result in another table

That can cause additional overhead. In many cases we do not need the “raw” data but we still have to load it into MySQL.

Why Spark Together With MySQL

On the contrary, the result of our analysis (i.e. aggregated data) should be in MySQL. It does not have to be, but it is much more convenient to store the result of your analysis in MySQL. Let’s say you want to analyze a big dataset (i.e. year to year sales comparison) and you will need to present it in the form of a table or graph. The result set will be significantly smaller as it will be aggregated and it will be much easier to store it in MySQL as many standard applications will work with that.

Real World Test Case

One of interesting free datasets is Wikipedia Page Counts. (>1TB compressed, available since 2008). This data can be downloaded (as gzipped space delimited text files) and is also available (limited dataset) on AWS. The data is aggregated by the hour and has the following fields:

  • project (i.e. “en”, “fr”, etc, which is usually a language)
  • title of the page (uri), urlencoded
  • number of requests
  • size of the content returned

(the date field is encoded inside the filename, 1 file per hour)

Our goal will be to find the top 10 pages by the number of requests per day in English Wikipedia, but also to support searching for an arbitrary word so we can show how, for example, the number of requests for the wikipedia article about “myspace” will compare to the article about “facebook” (2008 to 2015).

To do that in MySQL we will have to load it as is into MySQL. The files are distributed with the date part encoded. The uncompressed size of all files is > 10TB. Here are the possible steps (as per our typical MySQL pipeline):

  1. Uncompress the file and run “LOAD DATA INFILE” into a staging (temporary) table:
    load data local infile '$file'
    into table wikistats.wikistats_full CHARACTER SET latin1
    FIELDS TERMINATED BY ' '
    (project_name, title, num_requests, content_size)
    set request_date = STR_TO_DATE('$datestr', '%Y%m%d %H%i%S');
  2. Aggregate with “insert into” a final table
    insert into wikistats.wikistats_by_day
    select date(request_date) as request_day, title, count(*), sum(num_requests)
    from wikistats.wikistats_full
    group by request_day, title;
  3. Somehow url decode the title (may be using UDF).

This is a big overhead. We will uncompress and transform the data into MySQL just to discard most if it.

According to my calculations it should table > 1 month to do the whole pipeline for 6 years of data (this time does not include the uncompress time and does not include the load time depreciation as the table get bigger and bigger and indexes need to be updated). There are a lots of things we can do here to speed it up of course, i.e., load into different MySQL instances, load into MEMORY table first, then group by into InnoDB, etc.

But one of the easiest ways here will be using Apache Spark and Python script (pyspark). Pyspark can read the original gziped text files, query those text files with SQL, apply any filters, functions, i.e. urldecode, group by day and save the resultset into MySQL.

Here is the Python script to perform those actions:

from pyspark import SparkContext
sc=SparkContext()
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
import urllib
from datetime import timedelta, date
def load_day(filename, mydate):
    # Load a text file and convert each line to a Row.
    lines = sc.textFile(filename)
    parts = lines.map(lambda l: l.split(" ")).filter(lambda line: line[0]=="en").filter(lambda line: len(line)>3).cache()
    wiki = parts.map(lambda p: Row(project=p[0],  url=urllib.unquote(p[1]).lower(), num_requests=int(p[2]), content_size=int(p[3])))
    #wiki.count()
    # Infer the schema, and register the DataFrame as a table.
    schemaWiki = sqlContext.createDataFrame(wiki)
    schemaWiki.registerTempTable("wikistats")
    group_res = sqlContext.sql("SELECT '"+ mydate + "' as mydate, url, count(*) as cnt, sum(num_requests) as tot_visits FROM wikistats group by url")
    # Save to MySQL
    mysql_url="jdbc:mysql://thor?user=wikistats&password=wikistats"
    group_res.write.jdbc(url=mysql_url, table="wikistats.wikistats_by_day_spark", mode="append")
    # Write to parquet file - if needed
    group_res.saveAsParquetFile("/ssd/wikistats_parquet_bydate/mydate=" + mydate)
mount = "/data/wikistats/"
d= date(2008, 1, 1)
end_date = date(2008, 2, 1)
delta = timedelta(days=1)
while d < end_date:
    print d.strftime("%Y-%m-%d")
    filename=mount + "wikistats//dumps.wikimedia.org/other/pagecounts-raw/2008/2008-01/pagecounts-200801" + d.strftime("%d") + "-*.gz"
    print(filename)
    load_day(filename, d.strftime("%Y-%m-%d"))
    d += delta

In the script I used Spark to read the original gzip files (1 day at a time). We can use directory as “input” or a list of files. I will then use Resilient Data Set (RDD) transformations; python has lambda functions: map and filter which will allow us to split the “input files” and filter them.

The next step will be to apply the schema (declare fields); here we can also apply any other functions; i.e., I use urllib.unquote to decode the title (urldecode). Finally, we can register the temp table and then use familiar SQL to do the group by.

The script will normally utilize all cpu cores. In addition it is very easy to run it in distributed mode even without Hadoop: just copy the files to all machines in a Spark cluster or use NFS/external storage.

The script took about an hour on 3 boxes to process 1 month of data and load the aggregated data to MySQL (single instance). We can estimate that to load all 6 years (aggregated) to MySQL is ~3 days.

You may now ask, why is it significantly faster (and we still have the result loaded to the same MySQL instance)? The answer is, it is a different, more efficient pipeline. In our original MySQL pipeline (which will probably take months) we load the raw data to MySQL. Here we filter and group on read, and write only what we need to MySQL.

One question may also come up here: do we actually need this whole “pipeline?” Can we simply run our analytical queries on top of the “raw” data? Well, that is possible, but will probably require 1000 nodes Spark Cluster to do it efficiently as it will need to scan through 5TB of data (see “more reading” below).

Multi-treaded Performance for MySQL Inserts

When using group_res.write.jdbc(url=mysql_url, table=”wikistats.wikistats_by_day_spark”, mode=”append”) Spark will use multiple threads to insert into MySQL.

+------+-----------+------------+-----------+---------+------+--------+--------------------------------------------------------------------------------------------------------+-----------+---------------+
 | Id   | User      | Host       | db        | Command | Time | State  | Info                                                                                                   | Rows_sent | Rows_examined |
 +------+-----------+------------+-----------+---------+------+--------+--------------------------------------------------------------------------------------------------------+-----------+---------------+
 | 1050 | root      | localhost  | wikistats | Query   |    0 | init   | show processlist                                                                                       |         0 |             0 |
 | 2133 | wikistats | thor:40994 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Colegio+san+ignacio', 1, 1)        |         0 |             0 |
 | 2134 | wikistats | thor:40995 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Miloš_Crnjanski', 2, 3)            |         0 |             0 |
 | 2135 | wikistats | thor:40996 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Robert_Edgar', 6, 7)               |         0 |             0 |
 | 2136 | wikistats | thor:40997 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Eastern_Orange_Tip', 6, 7)         |         0 |             0 |
 | 2137 | wikistats | thor:40998 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Image:Dresden_Augustusbrücke_Al   |         0 |             0 |
 | 2138 | wikistats | thor:40999 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Diamond_and_pearl', 11, 24)        |         0 |             0 |
 | 2139 | wikistats | thor:41000 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Operation_polo', 2, 2)             |         0 |             0 |
 | 2140 | wikistats | thor:41001 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Template_talk:Edit-first-section   |         0 |             0 |
 | 2141 | wikistats | thor:41002 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Bertha_of_Artois', 1, 1)           |         0 |             0 |
 | 2142 | wikistats | thor:41003 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'A Change of Pace', 1, 1)           |         0 |             0 |
 | 2143 | wikistats | thor:41005 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'FAIRCHILD-REPUBLIC A-10 THUNDERB   |         0 |             0 |
 | 2144 | wikistats | thor:41006 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Special:Recentchangeslinked/Wiki   |         0 |             0 |
 | 2145 | wikistats | thor:41007 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Image:Carl-sassenrath-sp-1982.jp   |         0 |             0 |
 | 2146 | wikistats | thor:41008 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'List_of_Fleet_Air_Arm_aircraft_s   |         0 |             0 |
 | 2147 | wikistats | thor:41009 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Systemic_sclerosis', 17, 29)       |         0 |             0 |
 | 2148 | wikistats | thor:41011 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'tataviam', 1, 1)                   |         0 |             0 |
 | 2149 | wikistats | thor:41010 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'The_Devil_Wears_Prada_(film)#_no   |         0 |             0 |
 | 2150 | wikistats | thor:41013 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Seaford_High_School', 5, 7)        |         0 |             0 |
 | 2151 | wikistats | thor:41014 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Talk:Shocker_(hand_gesture)', 3,   |         0 |             0 |
 | 2152 | wikistats | thor:41015 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Paul_Szabo', 14, 23)               |         0 |             0 |
 | 2153 | wikistats | thor:41016 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'ausgereift', 1, 1)                 |         0 |             0 |
 | 2154 | wikistats | thor:41017 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Category:March_2005_news', 1, 2)   |         0 |             0 |
 | 2155 | wikistats | thor:41018 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Foot_Locker_Inc', 10, 10)          |         0 |             0 |
 | 2156 | wikistats | thor:41019 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Abbey_Park,_Nottinghamshire', 3,   |         0 |             0 |
 +------+-----------+------------+-----------+---------+------+--------+--------------------------------------------------------------------------------------------------------+-----------+---------------+
 25 rows in set (0.00 sec)

Monitoring your jobs

Spark provides you with a web interface  to monitor and manage your job. Here is the example: I’m running the wikistats.py application:

Spark Web Interface Screen Shot 2015-09-22 at 2.32.50 PM

Result: Using Parquet Columnar Format vs MySQL InnoDB table

Spark supports Apache Parquet Columnar format, so we can save RDD as a parquet file (it can be saved to a directory to HDFS):

group_res.saveAsParquetFile("/ssd/wikistats_parquet_bydate/mydate=" + mydate)

Here we save the result of our pipeline (aggregated data) into Spark. I also utilize partitioning by day (“mydate=20080101”) and Spark can auto discover partitions in this format.

When I have my results, I want to query it. Let’s say I want to find the top 10 most frequently queried wiki pages in January 2018. I can do this query with MySQL (I will need to filter out main page and search pages):

mysql> SELECT lower(url) as lurl, sum(tot_visits) as max_visits , count(*) FROM wikistats_by_day_spark where lower(url) not like '%special%' and lower(url) not like '%page%' and lower(url) not like '%test%' and lower(url) not like '%wiki%' group by lower(url) order by max_visits desc limit 10;
+--------------------------------------------------------+------------+----------+
| lurl                                                   | max_visits | count(*) |
+--------------------------------------------------------+------------+----------+
| heath_ledger                                           |    4247338 |      131 |
| cloverfield                                            |    3846404 |      131 |
| barack_obama                                           |    2238406 |      153 |
| 1925_in_baseball#negro_league_baseball_final_standings |    1791341 |       11 |
| the_dark_knight_(film)                                 |    1417186 |       64 |
| martin_luther_king,_jr.                                |    1394934 |      136 |
| deaths_in_2008                                         |    1372510 |       67 |
| united_states                                          |    1357253 |      167 |
| scientology                                            |    1349654 |      108 |
| portal:current_events                                  |    1261538 |      125 |
+--------------------------------------------------------+------------+----------+
10 rows in set (1 hour 22 min 10.02 sec)

Please note, here we are using our already aggregated (summary by data) table, not the “raw” data.

As we can see, the query took 1 hour 22 mins. I have also saved the same results to Parquet (see the script), so now I can use it with Spark-SQL:

./bin/spark-sql --master local

This will use a local version of spark-sql, using 1 host only.

spark-sql> CREATE TEMPORARY TABLE wikistats_parquet
USING org.apache.spark.sql.parquet
OPTIONS (
  path "/ssd/wikistats_parquet_bydate"
);
Time taken: 3.466 seconds
spark-sql> select count(*) from wikistats_parquet;
select count(*) from wikistats_parquet;
227846039
Time taken: 82.185 seconds, Fetched 1 row(s)
spark-sql> SELECT lower(url) as lurl, sum(tot_visits) as max_visits , count(*) FROM wikistats_parquet where lower(url) not like '%special%' and lower(url) not like '%page%' and lower(url) not like '%test%' and lower(url) not like '%wiki%' group by lower(url) order by max_visits desc limit 10;
heath_ledger    4247335 42
cloverfield     3846400 42
barack_obama    2238402 53
1925_in_baseball#negro_league_baseball_final_standings  1791341 11
the_dark_knight_(film)  1417183 36
martin_luther_king,_jr. 1394934 46
deaths_in_2008  1372510 38
united_states   1357251 55
scientology     1349650 44
portal:current_events   1261305 44
Time taken: 1239.014 seconds, Fetched 10 row(s)

That took ~20 minutes, which is much faster.

Conclusion

Apache Spark provides a great and easy way to analyze and aggregate data. What I love about Spark vs other big data and analytical frameworks:

  • Open-source and actively developed
  • No dependency on tools, i.e., the input data and output data does not have to be in Hadoop
  • Standalone mode for quick start, easy to deploy
  • Massively parallel, easy to add nodes
  • Support of variety of input and output format; i.e., it can read/write to MySQL (vs JDBC driver) and Parquet Columnar format

However, there are a number of drawbacks:

  • It is still new so you can expect some bugs and undocumented behavior. Many of the errors are hard to explain.
  • It requires Java; Spark 1.5 only supports Java 7 and higher.  That also means it will require additional memory, which is reasonable nowadays.
  • You will need to run jobs through “spark-submit”

I believe Apache Spark is a great tool and can complement MySQL for data analytical and BI purposes.

More reading

The post Using Apache Spark and MySQL for Data Analysis appeared first on MySQL Performance Blog.

Jul
14
2015
--

DataCamp Gets $1M Seed Round To Develop Data Science Learning Platform

Laptop with bookshelf on screen to symbolized e learning. DataCamp wants to teach data science skills to a generation of people, and it got a million in seed money to continue developing its online data-science learning platform.
The round was led by Chris Lynch at Accomplice, an early stage venture capital firm in Cambridge, Massachusetts. The company had raised $300,000 in seed money previously
DataCamp is not unlike coding bootcamps such… Read More

May
30
2015
--

YC Grad Yhat Scores $1.5M In Second Seed Round

zeros and ones on a blue background. When Yhat, the company that has developed solutions to help organize data scientist teams, graduated from the Y Combinator, winter 2015 class, the founders had a goal to raise a million dollars to keep growing the company when they returned to New York. They may have aimed too low.  The team actually was able to raise $1.5 million in their oversubscribed round, thanks to the interest in… Read More

Apr
21
2014
--

Using Apache Hadoop and Impala together with MySQL for data analysis

Apache Hadoop is commonly used for data analysis. It is fast for data loads and scalable. In a previous post I showed how to integrate MySQL with Hadoop. In this post I will show how to export a table from  MySQL to Hadoop, load the data to Cloudera Impala (columnar format) and run a reporting on top of that. For the examples below I will use the “ontime flight performance” data from my previous post (Increasing MySQL performance with parallel query execution). I’ve used the Cloudera Manager v.4 to install Apache Hadoop and Impala. For this test I’ve (intentionally) used an old hardware (servers from 2006) to show that Hadoop can utilize the old hardware and still scale. The test cluster consists of 6 datanodes. Below are the specs:

Purpose Server specs
Namenode, Hive metastore, etc + Datanodes 2x PowerEdge 2950, 2x L5335 CPU @ 2.00GHz, 8 cores, 16G RAM, RAID 10 with 8 SAS drives
Datanodes only 4x PowerEdge SC1425, 2x Xeon CPU @ 3.00GHz, 2 cores, 8G RAM, single 4TB drive

As you can see those a pretty old servers; the only thing I’ve changed is added a 4TB drive to be able to store more data. Hadoop provides redundancy on the server level (it writes 3 copies of the same block to all datanodes) so we do not need RAID on the datanodes (need redundancy for namenodes thou).

Data export

There are a couple of ways to export data from MySQL to Hadoop. For the purpose of this test I have simply exported the ontime table into a text file with:

select * into outfile '/tmp/ontime.psv' 
FIELDS TERMINATED BY ','
from ontime;

(you can use “|” or any other symbol as a delimiter) Alternatively, you can download data directly from www.transtats.bts.gov site using this simple script:

for y in {1988..2013}
do
        for i in {1..12}
        do
                u="http://www.transtats.bts.gov/Download/On_Time_On_Time_Performance_${y}_${i}.zip"
                wget $u -o ontime.log
                unzip On_Time_On_Time_Performance_${y}_${i}.zip
        done
done

Load into Hadoop HDFS

First thing we will need to do is to load data into HDFS as a set of files. Hive or Impala it will work with a directory to which you have imported your data and concatenate all files inside this directory. In our case it is easy to simply copy all our files into the directory inside HDFS

$ hdfs dfs -mkdir /data/ontime/
$ hdfs -v dfs -copyFromLocal On_Time_On_Time_Performance_*.csv /data/ontime/

 Create external table in Impala

Now, when we have all data files loaded we can create an external table:

CREATE EXTERNAL TABLE ontime_csv (
YearD int ,
Quarter tinyint ,
MonthD tinyint ,
DayofMonth tinyint ,
DayOfWeek tinyint ,
FlightDate string ,
UniqueCarrier string ,
AirlineID int ,
Carrier string ,
TailNum string ,
FlightNum string ,
OriginAirportID int ,
OriginAirportSeqID int ,
OriginCityMarketID int ,
Origin string ,
OriginCityName string ,
OriginState string ,
OriginStateFips string ,
OriginStateName string ,
OriginWac int ,
DestAirportID int ,
DestAirportSeqID int ,
DestCityMarketID int ,
Dest string ,
...
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE 
LOCATION '/data/ontime';

Note the “EXTERNAL” keyword and LOCATION (LOCATION points to a directory inside HDFS, not a file). The impala will create a meta information only (will not modify the table). We can query this table right away, however, impala will need to scan all files (full scan) for queries.

Example:

[d30.local:21000] > select yeard, count(*) from ontime_psv  group by yeard;
Query: select yeard, count(*) from ontime_psv  group by yeard
+-------+----------+
| yeard | count(*) |
+-------+----------+
| 2010  | 6450117  |
| 2013  | 5349447  |
| 2009  | 6450285  |
| 2002  | 5271359  |
| 2004  | 7129270  |
| 1997  | 5411843  |
| 2012  | 6096762  |
| 2005  | 7140596  |
| 1999  | 5527884  |
| 2007  | 7455458  |
| 1994  | 5180048  |
| 2008  | 7009726  |
| 1988  | 5202096  |
| 2003  | 6488540  |
| 1996  | 5351983  |
| 1989  | 5041200  |
| 2011  | 6085281  |
| 1998  | 5384721  |
| 1991  | 5076925  |
| 2006  | 7141922  |
| 1993  | 5070501  |
| 2001  | 5967780  |
| 1995  | 5327435  |
| 1990  | 5270893  |
| 1992  | 5092157  |
| 2000  | 5683047  |
+-------+----------+
Returned 26 row(s) in 131.38s

(Note that “group by” will not sort the rows, unlike MySQL. To sort we will need to add “ORDER BY yeard”)

Explain plan:

Query: explain select yeard, count(*) from ontime_csv  group by yeard
+-----------------------------------------------------------+
| Explain String                                            |
+-----------------------------------------------------------+
| PLAN FRAGMENT 0                                           |
|   PARTITION: UNPARTITIONED                                |
|                                                           |
|   4:EXCHANGE                                              |
|                                                           |
| PLAN FRAGMENT 1                                           |
|   PARTITION: HASH_PARTITIONED: yeard                      |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 4                                        |
|     UNPARTITIONED                                         |
|                                                           |
|   3:AGGREGATE (merge finalize)                            |
|   |  output: SUM(COUNT(*))                                |
|   |  group by: yeard                                      |
|   |                                                       |
|   2:EXCHANGE                                              |
|                                                           |
| PLAN FRAGMENT 2                                           |
|   PARTITION: RANDOM                                       |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 2                                        |
|     HASH_PARTITIONED: yeard                               |
|                                                           |
|   1:AGGREGATE                                             |
|   |  output: COUNT(*)                                     |
|   |  group by: yeard                                      |
|   |                                                       |
|   0:SCAN HDFS                                             |
|      table=ontime.ontime_csv #partitions=1/1 size=45.68GB |
+-----------------------------------------------------------+
Returned 31 row(s) in 0.13s

As we can see it will scan 45G of data.

Impala with columnar format and compression

The great benefit of the impala is that it supports columnar format and compression. I’ve tried the new “parquet” format with “snappy” compression codec. As our table is very wide (and de-normalized) it will help alot to use columnar format. To take advantages of the “parquet” format we will need to load data into it, which is easy to do when we already have a table inside impala and files inside HDFS:

[d30.local:21000] > set PARQUET_COMPRESSION_CODEC=snappy;
[d30.local:21000] > create table ontime_parquet_snappy LIKE ontime_parquet_snappy STORED AS PARQUET;
[d30.local:21000] > insert into ontime_parquet_snappy select * from ontime_csv;
Query: insert into ontime_parquet_snappy select * from ontime_csv
Inserted 152657276 rows in 729.76s

Then we can test our query against the new table:

Query: explain select yeard, count(*) from ontime_parquet_snappy  group by yeard
+---------------------------------------------------------------------+
| Explain String                                                      |
+---------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                     |
|   PARTITION: UNPARTITIONED                                          |
|                                                                     |
|   4:EXCHANGE                                                        |
|                                                                     |
| PLAN FRAGMENT 1                                                     |
|   PARTITION: HASH_PARTITIONED: yeard                                |
|                                                                     |
|   STREAM DATA SINK                                                  |
|     EXCHANGE ID: 4                                                  |
|     UNPARTITIONED                                                   |
|                                                                     |
|   3:AGGREGATE (merge finalize)                                      |
|   |  output: SUM(COUNT(*))                                          |
|   |  group by: yeard                                                |
|   |                                                                 |
|   2:EXCHANGE                                                        |
|                                                                     |
| PLAN FRAGMENT 2                                                     |
|   PARTITION: RANDOM                                                 |
|                                                                     |
|   STREAM DATA SINK                                                  |
|     EXCHANGE ID: 2                                                  |
|     HASH_PARTITIONED: yeard                                         |
|                                                                     |
|   1:AGGREGATE                                                       |
|   |  output: COUNT(*)                                               |
|   |  group by: yeard                                                |
|   |                                                                 |
|   0:SCAN HDFS                                                       |
|      table=ontime.ontime_parquet_snappy #partitions=1/1 size=3.95GB |
+---------------------------------------------------------------------+
Returned 31 row(s) in 0.02s

As we can see it will scan much smaller amount of data: 3.95 (with compression) compared to 45GB

Results:

Query: select yeard, count(*) from ontime_parquet_snappy  group by yeard
+-------+----------+
| yeard | count(*) |
+-------+----------+
| 2010  | 6450117  |
| 2013  | 5349447  |
| 2009  | 6450285  |
...
Returned 26 row(s) in 4.17s

And the response time is much better as well.

Impala complex query example

I’ve used the complex query from my previous post. I had to adapt it for use with Impala: it does not support “sum(ArrDelayMinutes>30)” notation but “sum(if(ArrDelayMinutes>30, 1, 0)” works fine.

select
   min(yeard), max(yeard), Carrier, count(*) as cnt,
   sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed,
   round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate
FROM ontime_parquet_snappy
WHERE
   DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI')
   and DestState not in ('AK', 'HI', 'PR', 'VI')
   and flightdate < '2010-01-01'
GROUP by carrier
HAVING cnt > 100000 and max(yeard) > 1990
ORDER by rate DESC
LIMIT 1000;

The query is intentionally designed the way it does not take advantage of the indexes: most of the conditions will only filter out less than 30% of the data.

Impala results:

+------------+------------+---------+----------+-----------------+------+
| min(yeard) | max(yeard) | carrier | cnt      | flights_delayed | rate |
+------------+------------+---------+----------+-----------------+------+
| 2003       | 2009       | EV      | 1454777  | 237698          | 0.16 |
| 2003       | 2009       | FL      | 1082489  | 158748          | 0.15 |
| 2006       | 2009       | XE      | 1016010  | 152431          | 0.15 |
| 2003       | 2009       | B6      | 683874   | 103677          | 0.15 |
| 2006       | 2009       | YV      | 740608   | 110389          | 0.15 |
| 2003       | 2005       | DH      | 501056   | 69833           | 0.14 |
| 2001       | 2009       | MQ      | 3238137  | 448037          | 0.14 |
| 2004       | 2009       | OH      | 1195868  | 160071          | 0.13 |
| 2003       | 2006       | RU      | 1007248  | 126733          | 0.13 |
| 2003       | 2006       | TZ      | 136735   | 16496           | 0.12 |
| 1988       | 2009       | UA      | 9593284  | 1197053         | 0.12 |
| 1988       | 2009       | AA      | 10600509 | 1185343         | 0.11 |
| 1988       | 2001       | TW      | 2659963  | 280741          | 0.11 |
| 1988       | 2009       | CO      | 6029149  | 673863          | 0.11 |
| 2007       | 2009       | 9E      | 577244   | 59440           | 0.10 |
| 1988       | 2009       | US      | 10276941 | 991016          | 0.10 |
| 2003       | 2009       | OO      | 2654259  | 257069          | 0.10 |
| 1988       | 2009       | NW      | 7601727  | 725460          | 0.10 |
| 1988       | 2009       | DL      | 11869471 | 1156267         | 0.10 |
| 1988       | 2009       | AS      | 1506003  | 146920          | 0.10 |
| 1988       | 2005       | HP      | 2607603  | 235675          | 0.09 |
| 2005       | 2009       | F9      | 307569   | 28679           | 0.09 |
| 1988       | 1991       | PA      | 206841   | 19465           | 0.09 |
| 1988       | 2009       | WN      | 12722174 | 1107840         | 0.09 |
+------------+------------+---------+----------+-----------------+------+
Returned 24 row(s) in 15.28s

15.28 seconds is significantly faster than original MySQL results (15 min 56.40 sec without parallel execution and  5 min 47 with the parallel execution). However, this is not “apple to apple comparison”:

  • MySQL will scan 45G of data and Impala with parquet will only scan 3.5G
  • MySQL will run on a single server, Hadoop + Impala will run in parallel on 6 servers.

Nevertheless, Hadoop + Implala shows impressive performance and ability to scale out the box, which can help a lot with the large data volume analysis.

Conclusion

Hadoop + Impala will give us an easy way to analyze large datasets using SQL with the ability to scale even on the old hardware.

In my next posts I will plan to explore:

As always, please share your thoughts in the comments.

The post Using Apache Hadoop and Impala together with MySQL for data analysis appeared first on MySQL Performance Blog.

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com