Apr
21
2014
--

Using Apache Hadoop and Impala together with MySQL for data analysis

Apache Hadoop is commonly used for data analysis. It is fast for data loads and scalable. In a previous post I showed how to integrate MySQL with Hadoop. In this post I will show how to export a table from  MySQL to Hadoop, load the data to Cloudera Impala (columnar format) and run a reporting on top of that. For the examples below I will use the “ontime flight performance” data from my previous post (Increasing MySQL performance with parallel query execution). I’ve used the Cloudera Manager v.4 to install Apache Hadoop and Impala. For this test I’ve (intentionally) used an old hardware (servers from 2006) to show that Hadoop can utilize the old hardware and still scale. The test cluster consists of 6 datanodes. Below are the specs:

Purpose Server specs
Namenode, Hive metastore, etc + Datanodes 2x PowerEdge 2950, 2x L5335 CPU @ 2.00GHz, 8 cores, 16G RAM, RAID 10 with 8 SAS drives
Datanodes only 4x PowerEdge SC1425, 2x Xeon CPU @ 3.00GHz, 2 cores, 8G RAM, single 4TB drive

As you can see those a pretty old servers; the only thing I’ve changed is added a 4TB drive to be able to store more data. Hadoop provides redundancy on the server level (it writes 3 copies of the same block to all datanodes) so we do not need RAID on the datanodes (need redundancy for namenodes thou).

Data export

There are a couple of ways to export data from MySQL to Hadoop. For the purpose of this test I have simply exported the ontime table into a text file with:

select * into outfile '/tmp/ontime.psv' 
FIELDS TERMINATED BY ','
from ontime;

(you can use “|” or any other symbol as a delimiter) Alternatively, you can download data directly from www.transtats.bts.gov site using this simple script:

for y in {1988..2013}
do
        for i in {1..12}
        do
                u="http://www.transtats.bts.gov/Download/On_Time_On_Time_Performance_${y}_${i}.zip"
                wget $u -o ontime.log
                unzip On_Time_On_Time_Performance_${y}_${i}.zip
        done
done

Load into Hadoop HDFS

First thing we will need to do is to load data into HDFS as a set of files. Hive or Impala it will work with a directory to which you have imported your data and concatenate all files inside this directory. In our case it is easy to simply copy all our files into the directory inside HDFS

$ hdfs dfs -mkdir /data/ontime/
$ hdfs -v dfs -copyFromLocal On_Time_On_Time_Performance_*.csv /data/ontime/

 Create external table in Impala

Now, when we have all data files loaded we can create an external table:

CREATE EXTERNAL TABLE ontime_csv (
YearD int ,
Quarter tinyint ,
MonthD tinyint ,
DayofMonth tinyint ,
DayOfWeek tinyint ,
FlightDate string ,
UniqueCarrier string ,
AirlineID int ,
Carrier string ,
TailNum string ,
FlightNum string ,
OriginAirportID int ,
OriginAirportSeqID int ,
OriginCityMarketID int ,
Origin string ,
OriginCityName string ,
OriginState string ,
OriginStateFips string ,
OriginStateName string ,
OriginWac int ,
DestAirportID int ,
DestAirportSeqID int ,
DestCityMarketID int ,
Dest string ,
...
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE 
LOCATION '/data/ontime';

Note the “EXTERNAL” keyword and LOCATION (LOCATION points to a directory inside HDFS, not a file). The impala will create a meta information only (will not modify the table). We can query this table right away, however, impala will need to scan all files (full scan) for queries.

Example:

[d30.local:21000] > select yeard, count(*) from ontime_psv  group by yeard;
Query: select yeard, count(*) from ontime_psv  group by yeard
+-------+----------+
| yeard | count(*) |
+-------+----------+
| 2010  | 6450117  |
| 2013  | 5349447  |
| 2009  | 6450285  |
| 2002  | 5271359  |
| 2004  | 7129270  |
| 1997  | 5411843  |
| 2012  | 6096762  |
| 2005  | 7140596  |
| 1999  | 5527884  |
| 2007  | 7455458  |
| 1994  | 5180048  |
| 2008  | 7009726  |
| 1988  | 5202096  |
| 2003  | 6488540  |
| 1996  | 5351983  |
| 1989  | 5041200  |
| 2011  | 6085281  |
| 1998  | 5384721  |
| 1991  | 5076925  |
| 2006  | 7141922  |
| 1993  | 5070501  |
| 2001  | 5967780  |
| 1995  | 5327435  |
| 1990  | 5270893  |
| 1992  | 5092157  |
| 2000  | 5683047  |
+-------+----------+
Returned 26 row(s) in 131.38s

(Note that “group by” will not sort the rows, unlike MySQL. To sort we will need to add “ORDER BY yeard”)

Explain plan:

Query: explain select yeard, count(*) from ontime_csv  group by yeard
+-----------------------------------------------------------+
| Explain String                                            |
+-----------------------------------------------------------+
| PLAN FRAGMENT 0                                           |
|   PARTITION: UNPARTITIONED                                |
|                                                           |
|   4:EXCHANGE                                              |
|                                                           |
| PLAN FRAGMENT 1                                           |
|   PARTITION: HASH_PARTITIONED: yeard                      |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 4                                        |
|     UNPARTITIONED                                         |
|                                                           |
|   3:AGGREGATE (merge finalize)                            |
|   |  output: SUM(COUNT(*))                                |
|   |  group by: yeard                                      |
|   |                                                       |
|   2:EXCHANGE                                              |
|                                                           |
| PLAN FRAGMENT 2                                           |
|   PARTITION: RANDOM                                       |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 2                                        |
|     HASH_PARTITIONED: yeard                               |
|                                                           |
|   1:AGGREGATE                                             |
|   |  output: COUNT(*)                                     |
|   |  group by: yeard                                      |
|   |                                                       |
|   0:SCAN HDFS                                             |
|      table=ontime.ontime_csv #partitions=1/1 size=45.68GB |
+-----------------------------------------------------------+
Returned 31 row(s) in 0.13s

As we can see it will scan 45G of data.

Impala with columnar format and compression

The great benefit of the impala is that it supports columnar format and compression. I’ve tried the new “parquet” format with “snappy” compression codec. As our table is very wide (and de-normalized) it will help alot to use columnar format. To take advantages of the “parquet” format we will need to load data into it, which is easy to do when we already have a table inside impala and files inside HDFS:

[d30.local:21000] > set PARQUET_COMPRESSION_CODEC=snappy;
[d30.local:21000] > create table ontime_parquet_snappy LIKE ontime_parquet_snappy STORED AS PARQUET;
[d30.local:21000] > insert into ontime_parquet_snappy select * from ontime_csv;
Query: insert into ontime_parquet_snappy select * from ontime_csv
Inserted 152657276 rows in 729.76s

Then we can test our query against the new table:

Query: explain select yeard, count(*) from ontime_parquet_snappy  group by yeard
+---------------------------------------------------------------------+
| Explain String                                                      |
+---------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                     |
|   PARTITION: UNPARTITIONED                                          |
|                                                                     |
|   4:EXCHANGE                                                        |
|                                                                     |
| PLAN FRAGMENT 1                                                     |
|   PARTITION: HASH_PARTITIONED: yeard                                |
|                                                                     |
|   STREAM DATA SINK                                                  |
|     EXCHANGE ID: 4                                                  |
|     UNPARTITIONED                                                   |
|                                                                     |
|   3:AGGREGATE (merge finalize)                                      |
|   |  output: SUM(COUNT(*))                                          |
|   |  group by: yeard                                                |
|   |                                                                 |
|   2:EXCHANGE                                                        |
|                                                                     |
| PLAN FRAGMENT 2                                                     |
|   PARTITION: RANDOM                                                 |
|                                                                     |
|   STREAM DATA SINK                                                  |
|     EXCHANGE ID: 2                                                  |
|     HASH_PARTITIONED: yeard                                         |
|                                                                     |
|   1:AGGREGATE                                                       |
|   |  output: COUNT(*)                                               |
|   |  group by: yeard                                                |
|   |                                                                 |
|   0:SCAN HDFS                                                       |
|      table=ontime.ontime_parquet_snappy #partitions=1/1 size=3.95GB |
+---------------------------------------------------------------------+
Returned 31 row(s) in 0.02s

As we can see it will scan much smaller amount of data: 3.95 (with compression) compared to 45GB

Results:

Query: select yeard, count(*) from ontime_parquet_snappy  group by yeard
+-------+----------+
| yeard | count(*) |
+-------+----------+
| 2010  | 6450117  |
| 2013  | 5349447  |
| 2009  | 6450285  |
...
Returned 26 row(s) in 4.17s

And the response time is much better as well.

Impala complex query example

I’ve used the complex query from my previous post. I had to adapt it for use with Impala: it does not support “sum(ArrDelayMinutes>30)” notation but “sum(if(ArrDelayMinutes>30, 1, 0)” works fine.

select
   min(yeard), max(yeard), Carrier, count(*) as cnt,
   sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed,
   round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate
FROM ontime_parquet_snappy
WHERE
   DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI')
   and DestState not in ('AK', 'HI', 'PR', 'VI')
   and flightdate < '2010-01-01'
GROUP by carrier
HAVING cnt > 100000 and max(yeard) > 1990
ORDER by rate DESC
LIMIT 1000;

The query is intentionally designed the way it does not take advantage of the indexes: most of the conditions will only filter out less than 30% of the data.

Impala results:

+------------+------------+---------+----------+-----------------+------+
| min(yeard) | max(yeard) | carrier | cnt      | flights_delayed | rate |
+------------+------------+---------+----------+-----------------+------+
| 2003       | 2009       | EV      | 1454777  | 237698          | 0.16 |
| 2003       | 2009       | FL      | 1082489  | 158748          | 0.15 |
| 2006       | 2009       | XE      | 1016010  | 152431          | 0.15 |
| 2003       | 2009       | B6      | 683874   | 103677          | 0.15 |
| 2006       | 2009       | YV      | 740608   | 110389          | 0.15 |
| 2003       | 2005       | DH      | 501056   | 69833           | 0.14 |
| 2001       | 2009       | MQ      | 3238137  | 448037          | 0.14 |
| 2004       | 2009       | OH      | 1195868  | 160071          | 0.13 |
| 2003       | 2006       | RU      | 1007248  | 126733          | 0.13 |
| 2003       | 2006       | TZ      | 136735   | 16496           | 0.12 |
| 1988       | 2009       | UA      | 9593284  | 1197053         | 0.12 |
| 1988       | 2009       | AA      | 10600509 | 1185343         | 0.11 |
| 1988       | 2001       | TW      | 2659963  | 280741          | 0.11 |
| 1988       | 2009       | CO      | 6029149  | 673863          | 0.11 |
| 2007       | 2009       | 9E      | 577244   | 59440           | 0.10 |
| 1988       | 2009       | US      | 10276941 | 991016          | 0.10 |
| 2003       | 2009       | OO      | 2654259  | 257069          | 0.10 |
| 1988       | 2009       | NW      | 7601727  | 725460          | 0.10 |
| 1988       | 2009       | DL      | 11869471 | 1156267         | 0.10 |
| 1988       | 2009       | AS      | 1506003  | 146920          | 0.10 |
| 1988       | 2005       | HP      | 2607603  | 235675          | 0.09 |
| 2005       | 2009       | F9      | 307569   | 28679           | 0.09 |
| 1988       | 1991       | PA      | 206841   | 19465           | 0.09 |
| 1988       | 2009       | WN      | 12722174 | 1107840         | 0.09 |
+------------+------------+---------+----------+-----------------+------+
Returned 24 row(s) in 15.28s

15.28 seconds is significantly faster than original MySQL results (15 min 56.40 sec without parallel execution and  5 min 47 with the parallel execution). However, this is not “apple to apple comparison”:

  • MySQL will scan 45G of data and Impala with parquet will only scan 3.5G
  • MySQL will run on a single server, Hadoop + Impala will run in parallel on 6 servers.

Nevertheless, Hadoop + Implala shows impressive performance and ability to scale out the box, which can help a lot with the large data volume analysis.

Conclusion

Hadoop + Impala will give us an easy way to analyze large datasets using SQL with the ability to scale even on the old hardware.

In my next posts I will plan to explore:

As always, please share your thoughts in the comments.

The post Using Apache Hadoop and Impala together with MySQL for data analysis appeared first on MySQL Performance Blog.

Aug
08
2013
--

Big Data with MySQL and Hadoop at MySQL Connect 2013

I will be talking about Big Data with MySQL and Hadoop at MySQL Connect 2013 (Sept. 21-22) in San Francisco as well as at Percona University at Washington, DC (September 12, 2013). Apache Hadoop is a very popular Big Data solution and we can nowadays easily integrate it with MySQL. I will start with a brief introduction of Apache Hadoop and its components (HFDS, Map/Reduce, Hive, HBase/HCatalog, Flume, Scoop, etc). Next I will show 2 major Big Data scenarios:

  • From file to Hadoop to MySQL. This is an example of “ELT” process: Extract data from external source; Load data into Hadoop; Transform data/Analyze data; Extract results to MySQL. It is similar to the original Data Warehouse ETL (Extract; Transfer; Load) process, however, instead of “transforming” data before loading it to the Data Warehouse, we will load it “as is” and then run the data analysis. As a result of this analysis (map/reduce process) we can generate a report and load it to MySQL (using Sqoop export). To illustrate this process I will show 2 classical examples: Clickstream analysis and Twitter feed analysis. On top of those examples I will also show how to use MySQL / Full Text Search solutions to perform a near real-time reports from HBase.

Picture 1: ELT pipeline, from File to Hadoop to MySQL

clickstream_example

  • From OLTP MySQL to Hadoop to MySQL reporting. In this scenario we extract data (potentially close to real-time) from MySQL, load it to Hadoop for storage and analysis and later generate reports to load it into another MySQL instance (reporting), which can be used to generate and display graphs.

Picture 2: From OLTP MySQL to Hadoop to MySQL reporting.

hadoop_mysql_reporting

Note: The reason why we need an additional storage for MySQL reports is that it may take a long time to generate a Hive report (as it is executed with Map/Reduce which reads all the files/no indexes). So it make sense to “offload” a common reports’ results into a separate storage (MySQL).

In both scenarios we will need a way to integrate Hadoop and MySQL. In my previous post, MySQL and Hadoop integration, I have demonstrated how to integrate Hadoop and MySQL with Sqoop and Hadoop Applier for MySQL. This case is similar, however we can use a different toolset. In the scenario 1 (i.e. Clickstream) we can use Apache Flume to grab files (or read “events”) and load them into Hadoop. With Flume we can define a “source” and a “sink”. Flume supports a range of different sources including HTTP requests, Syslog, TCP, etc. HTTP source is interesting, as we can convert all (or a number of) HTTP requests (“Source”) into an “event” which can be loaded into Hadoop (“Sink”).

During my presentation I will show the exact configurations for the sample Clickstream process, including:

  1. Flume configuration
  2. HiveQL queries to generate a report
  3. Sqoop export queries to load the report into MySQL

See you at MySQL Connect 2013!

The post Big Data with MySQL and Hadoop at MySQL Connect 2013 appeared first on MySQL Performance Blog.

Jul
11
2013
--

MySQL and Hadoop integration

hadoop_and_mysql

Dolphin and Elephant: an Introduction

This post is intended for MySQL DBAs or Sysadmins who need to start using Apache Hadoop and want to integrate those 2 solutions. In this post I will cover some basic information about the Hadoop, focusing on Hive as well as MySQL and Hadoop/Hive integration.

First of all, if you were dealing with MySQL or any other relational database most of your professional life (like I was), Hadoop may look different. Very different. Apparently, Hadoop is the opposite to any relational database. Unlike the database where we have a set of tables and indexes, Hadoop works with a set of text files. And… there are no indexes at all. And yes, this may be shocking, but all scans are sequential (full “table” scans in MySQL terms).

So, when does Hadoop makes sense?

First, Hadoop is great if you need to store huge amounts of data (we are talking about Petabytes now) and those data does not require real-time (milliseconds) response time. Hadoop works as a cluster of nodes (similar to MySQL Cluster) and all data are spread across the cluster (with redundancy), so it provides both high availability (if implemented correctly) and scalability. The data retrieval process (map/reduce) is a parallel process, so the more data nodes you will add to Hadoop the faster the process will be.

Second, Hadoop may be very helpful if you need to store your historical data for a long period of time. For example: store the online orders for the last 3 years in MySQL and store all orders (including those mail and phone orders since 1986 in Hadoop for trend analysis and historical purposes).

Integration

The next step after installing and configuring Hadoop is to implement a data flow between Hadoop and MySQL. If you have an OLTP system based on MySQL and you will want to use Hadoop for data analysis (data science) you may want to add a constant data flow between Hadoop and MySQL. For example, one may want to implement a data archiving, where old data is not deleted but rather placed into Hadoop and will be available for a further analysis. There are 2 major ways of doing it:

  1. Non realtime: Sqoop
  2. Realtime: Hadoop Applier for MySQL

Using Apache Sqoop for MySQL and Hadoop integration

Apache Sqoop can be run from a cronjob to get the data from MySQL and load it into Hadoop. Apache Hive is probably the best way to store data in Hadoop as it uses a table concept and have a SQL like language, HiveQL. Here is how we can import the whole table from MySQL to Hive:

$ sqoop import --connect jdbc:mysql://mysql_host/db_name --table ORDERS --hive-import

If you do not have a BLOBs or TEXTs in your table you can use “–direct” option which will probably be faster (it will use mysqldump). Another useful option is “–default-character-set”, for example for utf8 one can use “–default-character-set=utf8″. “–verify” option will help to check for data integrity.

To constantly import only the new rows from the table we can use option “–where “. For example:

$ sqoop import --connect jdbc:mysql://mysql_host/db_name --table ORDERS --hive-import --where "order_date > '2013-07-01'"

The following picture illustrates the process:

Sqoop

Using MySQL Applier for Hadoop

Sqoop is great if you need to perform a “batch” import. For a realtime data integration, we can use MySQL Applier for Hadoop. With the MySQL applier Hadoop / Hive will be integrated as if it is additional MySQL slave. MySQL Applier will read binlog events from the MySQL and “apply” those to our Hive table.

The following picture illustrate this process:

applier

Conclusion

In this post I have showed the ways to integrate MySQL and Hadoop (the big picture). In the subsequent post I will show how to implement a data archiving with MySQL using Hadoop/Hive as a target.

The post MySQL and Hadoop integration appeared first on MySQL Performance Blog.

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com