Nov
01
2018
--

Rockset launches out of stealth with $21.5 M investment

Rockset, a startup that came out of stealth today, announced $21.5M in previous funding and the launch of its new data platform that is designed to simplify much of the processing to get to querying and application building faster.

As for the funding, it includes $3 million in seed money they got when they started the company, and a more recent $18.5 million Series A, which was led by Sequoia and Greylock.

Jerry Chen, who is a partner at Greylock sees a team that understands the needs of modern developers and data scientists, one that was born in the cloud and can handle a lot of the activities that data scientists have traditionally had to handle manually. “Rockset can ingest any data from anywhere and let developers and data scientists query it using standard SQL. No pipelines. No glue. Just real time operational apps,” he said.

Company co-founder and CEO Venkat Venkataramani is a former Facebook engineer where he learned a bit about processing data at scale. He wanted to start a company that would help data scientists get to insights more quickly.

Data typically requires a lot of massaging before data scientists and developers can make use of it and Rockset has been designed to bypass much of that hard work that can take days, weeks or even months to complete.

“We’re building out our service with innovative architecture and unique capabilities that allows full-featured fast SQL directly on raw data. And we’re offering this as a service. So developers and data scientists can go from useful data in any shape, any form to useful applications in a matter of minutes. And it would take months today,” Venkataramani explained.

To do this you simply connect your data set wherever it lives to Rockset and it deals with the data ingestion, building the schema, cleaning the data, everything. It also makes sure you have the right amount of infrastructure to manage the level of data you are working with. In other words, it can potentially simplify highly complex data processing tasks to start working with the raw data almost immediately using SQL queries.

To achieve the speed, Venkataramani says they use a number of indexing techniques. “Our indexing technology essentially tries to bring the best of search engines and columnar databases into one. When we index the data, we build more than one type of index behind the scenes so that a wide spectrum of pre-processing can be automatically fast out of the box,” he said. That takes the burden of processing and building data pipelines off of the user.

The company was founded in 2016. Chen and Sequoia partners Mike Vernal joined the Rockset board under the terms of the Series A funding, which closed last August.

Oct
12
2018
--

Anaplan hits the ground running with strong stock market debut up over 42 percent

You might think that Anaplan CEO, Frank Calderoni would have had a few sleepless nights this week. His company picked a bad week to go public as market instability rocked tech stocks. Still he wasn’t worried, and today the company had by any measure a successful debut with the stock soaring up over 42 percent. As of 4 pm ET, it hit $24.18, up from the IPO price of $17. Not a bad way to launch your company.

Stock Chart: Yahoo Finance

“I feel good because it really shows the quality of the company, the business model that we have and how we’ve been able to build a growing successful business, and I think it provides us with a tremendous amount of opportunity going forward,” Calderoni told TechCrunch.

Calderoni joined the company a couple of years ago, and seemed to emerge from Silicon Valley central casting as former CFO at Red Hat and Cisco along with stints at IBM and SanDisk. He said he has often wished that there were a tool around like Anaplan when he was in charge of a several thousand person planning operation at Cisco. He indicated that while they were successful, it could have been even more so with a tool like Anaplan.

“The planning phase has not had much change in in several decades. I’ve been part of it and I’ve dealt with a lot of the pain. And so having something like Anaplan, I see it’s really being a disrupter in the planning space because of the breadth of the platform that we have. And then it goes across organizations to sales, supply chain, HR and finance, and as we say, really connects the data, the people and the plan to make for better decision making as a result of all that,” he said.

Calderoni describes Anaplan as a planning and data analysis tool. In his previous jobs he says that he spent a ton of time just gathering data and making sure they had the right data, but precious little time on analysis. In his view Anaplan, lets companies concentrate more on the crucial analysis phase.

“Anaplan allows customers to really spend their time on what I call forward planning where they can start to run different scenarios and be much more predictive, and hopefully be able to, as we’ve seen a lot of our customers do, forecast more accurately,” he said.

Anaplan was founded in 2006 and raised almost $300 million along the way. It achieved a lofty valuation of $1.5 billion in its last round, which was $60 million in 2017. The company has just under 1000 customers including Del Monte, VMware, Box and United.

Calderoni says although the company has 40 percent of its business outside the US, there are plenty of markets left to conquer and they hope to use today’s cash infusion in part to continue to expand into a worldwide company.

Jun
13
2018
--

Tableau gets AI shot in the arm with Empirical Systems acquisition

When Tableau was founded back in 2003, not many people were thinking about artificial intelligence to drive analytics and visualization, but over the years the world has changed and the company recognized that it needed talent to keep up with new trends. Today, it announced it was acquiring Empirical Systems, an early stage startup with AI roots.

Tableau did not share the terms of the deal.

The startup was born just two years ago from research on automated statistics at the MIT Probabilistic Computing Project. According to the company website, “Empirical is an analytics engine that automatically models structured, tabular data (such as spreadsheets, tables, or csv files) and allows those models to be queried to uncover statistical insights in data.”

The product was still in private Beta when Tableau bought the company. It is delivered currently as an engine embedded inside other applications. That sounds like something that could slip in nicely into the Tableau analytics platform. What’s more, it will be bringing the engineering team on board for some AI knowledge, while taking advantage of this underlying advanced technology.

Francois Ajenstat, Tableau’s chief product officer says this ability to automate findings could put analytics and trend analysis into the hands of more people inside a business. “Automatic insight generation will enable people without specialized data science skills to easily spot trends in their data, identify areas for further exploration, test different assumptions, and simulate hypothetical situations,” he said in a statement.

Richard Tibbetts, Empirical Systems CEO, says the two companies share this vision of democratizing data analysis. “We developed Empirical to make complex data modeling and sophisticated statistical analysis more accessible, so anyone trying to understand their data can make thoughtful, data-driven decisions based on sound analysis, regardless of their technical expertise,” Tibbets said in a statement.

Instead of moving the team to Seattle where Tableau has its headquarters, it intends to leave the Empirical Systems team in place and establish an office in Cambridge, Massachusetts.

Empirical was founded in 2016 and has raised $2.5 million.

Apr
23
2018
--

Tableau gets new pricing plans and a data preparation tool

Data analytics platform Tableau today announced the launch of both a new data preparation product and a new subscription pricing plan.

Currently, Tableau offers desktop plans for users who want to analyze their data locally, a server plan for businesses that want to deploy the service on-premises or on a cloud platform, and a fully hosted online plan. Prices for these range from $35 to $70 per user and month. The new pricing plans don’t focus so much on where the data is analyzed but on the analyst’s role. The new Creator, Explorer and Viewer plans are tailored toward the different user experiences. They all include access to the new Tableau Prep data preparation tool, Tableau Desktop and new web authoring capabilities — and they are available both on premises or in the cloud.

Existing users can switch their server or desktop subscriptions to the new release today and then assign each user either a creator, explorer or viewer role. As the name indicates, the new viewer role is meant for users who mostly consume dashboards and visualizations, but don’t create their own. The explorer role is for those who need access to a pre-defined data set and the creator role is for analysts and power user who need access to all of Tableau’s capabilities.

“Organizations are facing the urgent need to empower their entire workforce to help drive more revenue, reduce costs, provide better service, increase productivity, discover the next scientific breakthrough and even save lives,” said Adam Selipsky, CEO at Tableau, in today’s announcement. “Our new offerings will help entire organizations make analytics ubiquitous, enabling them to tailor the capabilities required for every employee.”

As for the new data preparation tool, the general idea here is to give users a visual way to shape and clean their data, something that’s especially important as businesses now often pull in data from a variety of sources. Tableau Prep can automate some of this, but the most important aspect of the service is that it gives users a visual interface for creating these kind of workflows. Prep includes support for all the standard Tableau data connectors and lets users perform calculations, too.

“Our customers often tell us that they love working with Tableau, but struggle when data is in the wrong shape for analysis,” said Francois Ajenstat, Chief Product Officer at Tableau. “We believe data prep and data analysis are two sides of the same coin that should be deeply integrated and look forward to bringing fun, easy data prep to everyone regardless of technical skill set.”

Sep
06
2017
--

Dataiku to enhance data tools with $28 million investment led by Battery Ventures

 Dataiku, a French startup that helps data analysts communicate with data scientists to build more meaningful data applications, announced a significant funding round today. The company scored a $28 million Series B investment led by Battery Ventures with help from FirstMark, Serena Capital and Alven. Today’s money brings the total raised to almost $45 million. Its most recent prior round… Read More

Jun
12
2017
--

Microsoft’s Power BI business analytics tool learns new tricks

 Back in 2015, Microsoft launched a highly visual Power BI data exploration and interactive reporting tool into general availability. The service is now in active use at 200,000 different organizations and the team has shipped 400 updates and new features over the course of the last two years. Current users span a wide gamut and include the likes of the Seattle Seahawks, CA Technologies,… Read More

Jan
14
2016
--

Prometheus as an Engine for MySQL Monitoring

prometheusWhen I first discovered Graphite years ago, I was very impressed with its monitoring capabilities.  Compared to many RRD-based tools that were popular at the time (like Cacti), Graphite separated the captured data and graphs, allowing you to do all kinds of math and transformations while visualizing data. For example, I could plot the relationship between system queries and disk IO, and capture how the system was getting more IO bound over time. It also had reasonably high performance, allowing me to capture high-resolution data for medium-sized systems.

Just last year I discovered Prometheus, and it also impressed me. I think it has the potential to take Graphite’s flexibility to the next level. Though I am in no way a Prometheus expert, I  want to share my understanding and thoughts on it so far.

Data Model

The data model is perhaps what attracted me to Prometheus the most. While it’s not obvious at first, when you do figure it out it has fantastic flexibility.

In the data model used by Whisper and Carbon in Graphite, you will use something like this to store MySQL data:

myapp.store.mysql.db01.status.questions = 5000

You can set up any hierarchy you like, but it has to have a hierarchy.

What Prometheus does instead is allow you to use a set of key-value pairs. The same data shown above could be presented like this:

questions_total{app=”myapp”,subsystem=”store”,engine=”mysql”,host=”db01”, source=”status”} = 5000

(You most likely wouldn’t use this exact structure, but it’s good for illustration.)

The difference between these approaches it that Prometheus provides you multiple dimensions on which you can filter and aggregate, plus you can add those dimensions later as you need them (without needing to redesign your tree hierarchy).

These labels are very dynamic, and I can change them in a second. For example, a MySQL server reporting as a “Master” might start reporting as a “Slave” in the very next second, and its data will be aggregated differently.

This is especially important in the modern, often cloud-based and virtualized world. For example, using Prometheus it is very easy to tag servers by their region or availability zones. I can also do things like compute MySQL space usage by both the database and storage engine. The possibilities are endless.

Data Capture

Unlike Graphite – where the main model is push and the hosts themselves choose what kind of information they want to push to monitoring system and at which intervals – with Prometheus you set up “Exporters” that have the ability to export the data. It is up to the Prometheus server configuration to choose what data to sample and how frequently.

The clear benefit of Prometheus’ approach is that you can have as many servers as you like pulling the data, so it is very easy to create a development version of your system and play around with it – without affecting production. It also provides a simple pathway to high availability.

(Both the push and pull approaches have their benefits and drawbacks. Brian Brazil wrote an excellent article advertising the pull model of monitoring.)

Prometheus does create a few challenges for me. Unless I want to set up Service Discovery, it is a hassle to monitor any development/test VMs I might spin up (that would otherwise not be open to external access at all). While this isn’t the main use case for Prometheus, it is helpful for me to test the dashboard’s behavior with different operating systems, workloads, etc.

A more significant issue I discovered is dealing with some data that can’t be captured to multiple locations, because the data capture causes the data to change.

Here is specific example: if I look at the

events_statements_summary_by_digest

 table in

PERFORMANCE_SCHEMA

, there is a

MAX_TIMER_WAIT

 field that shows me what the maximum query execution time is for the query pattern. If I want to get the maximum query execution time for every minute, for example, I would need to “truncate” the table to reset the statistics and let the maximum value be computed again. If I don’t perform that operation, the data becomes meaningless. If I make the exporter to reset the statistics during the poll, however, I can’t pull it from two Prometheus servers.

This is one instance where Prometheus’ performance schema design could be better. I could set up a Cron job or Event to clear out the statistics regularly and get a  proper maximum value for every five minutes, but that isn’t an overly convenient solution.

Another issue I discovered is that Prometheus doesn’t have any protection from bad (long) samples, or a very good method of detecting of them. Let’s imagine that I have a MySQL server and I’m sampling status data every second. For some reason the call to

SHOW GLOBAL STATUS

 took five seconds to execute. The truth is we don’t really know where in those five seconds the

SHOW GLOBAL STATUS

 output corresponds – it might be at very start, it might be at the very end. As such, you don’t really know how to process the counters. Whatever you do, you’re likely to be wrong. My preference in this case it to simply discard such samples, because even missing one percent of the samples is unlikely to change the whole picture. Constantly questioning whether you really had a couple of seconds where the QPS spiked to ten times the normal rate, or that it’s an invalid sample, is not something I on which I want to waste a lot of time!

My preferred approach is to configure the

SHOW GLOBAL STATUS

 capture so that if it takes more than ten percent of the capture interval, it will be discarded. For example, with a one second capture I would allow 100ms for the capture. If the system is not keeping up with this scale, I would be better to not fool myself and reduce the capture resolution to around five seconds.

The only protection Prometheus allows is to configure the scrape_timeout, but unfortunately it is only limited to one second resolution at this point. This is too coarse for any high-resolution capture.

Finally, it is also inconvenient to specify different resolutions for different data. In MySQL there is a often a lot of data that I want to capture, but the resolution needed for each capture is different. For example,

SHOW GLOBAL STATUS

 with one second resolution is must. At the same time, capturing the table size information from

INFORMATION_SCHEMA

 with a one second resolution would put too much load on MySQL, especially if there are a lot of tables. That level of resolution in this case isn’t really needed.

An attractive thing about Prometheus is that the Prometheus development team uses it a lot for MySQL monitoring, so the MySQL Exporter is really good. Most MySQL monitoring plugins I find resort to reporting just a few basics statistics, which is not nearly enough for advanced diagnostics. The Prometheus MySQL exporter gets tons of stuff and has been adding more in every version.

I also very much like that the Prometheus Exporters are designed using HTTP protocol. This means it is very easy to debug or see what kind of data they capture. They present it simply using a web-browser:

HTTP browser

Computational Model

I think the basic operations in Prometheus are pretty intuitive, but if you look at some of the advanced behaviors you’re going to find some inconveniences and some things that are likely to surprise you.

One inconvenience is that Prometheus is mainly designed for working with high resolution data. If there are more than five minute holes (by default) in the time series, they could disappear from the graphs. As I mentioned, for MySQL there is quite a lot of information that it makes sense to capture at a resolution lower than five minutes.

Prometheus functions are looking in the “past,” and designed in a way that the value of the function at any time (T) when it could be computed is not going to change. It all looks clean and logical, but it causes issues with holes in the data capture.  

As an example, let’s imagine following five seconds where the total number of questions from the start successfully scrapped some seconds but not others (due to a network issue, overload, etc.):

1  –  10
2  –  20
3  –  X
4  –  X
5  –  200

 When we capture data of “counter” type the most important value it has is not the actual counter value at the given time but the rate of change of the counter at different time intervals. If in this case, for example, the query rate was ten QPS for intervals one through two seconds, this can be clearly computed. But what was the query rate in the three through four second interval? We don’t really have exact data, but that is fine: we know there have been 180 queries during the two through five second interval, giving us 60 QPS (which we can use for the three through four seconds interval).

This is NOT, however, how Prometheus will compute it if you use a high irate() function (which is suppose to give you highest resolution possible). When you evaluate irate() at T=4, it doesn’t have access to the T=5 value, even if it is in the database. Instead, it will look back and find the matching previous interval (one through two) and use the corresponding value of ten QPS.

I find this pretty puzzling and inconvenient.

There is also the rate() function, which can be used to get the average rate for the period.  Unfortunately it can’t estimate the rate for a smaller period based on the available data for a longer period. So for example if I ask rate() function to compute a query rate at T=4, looking one second back, it will return no data. This isn’t a big deal when you’re working with data manually, but if you’re building zoomable dashboards it means you can zoom in to the point where the data will disappear (rather than stopping at the best possible value available).

Storage

Prometheus has its own high performance storage system which is based in part on LevelDB. It is highly optimized for time series and can achieve a very high level of compression. Be ready, though: all your label combinations will create a different time series on the low level, and will require a lot of files. This isn’t really a problem with SSD drives and modern file systems, but it something to look out for.

The capture engine and storage systems are rather efficient. Even though Prometheus does not have built in clustering for “scaling out,” you can reportedly get more than 300K metrics per second captured on a single node. You can also use multiple Prometheus servers as needed.

The problem I found with Prometheus’ storage is that it is very self contained: you can only use it from Prometheus or access it from the HTTP API. There are no tools at this point to export it for advanced analysis with R, or to dump the whole database into something like JSON format so it can be loaded into a different database engine. Some of these features might already be on roadmap.

Purging and Aggregation

Retention configuration in Prometheus is pretty spartan. You can set

storage.local.retention

 to the length you want to store the data, but that’s it. You can’t configure it to purge different data at different times. You can run multiple Prometheus instances to achieve this, but it’s quite a hassle.  It’s also not possible to instruct Prometheus to automatically build summaries in order to execute low resolution queries faster.

For example if I have MySQL’s query rate captured every second, but I want to view the data over a long time period (e.g., how it changed over last three months to estimate growth trends), data aggregated at hour intervals would be enough for that purpose.

There is support for recording rules to help achieve some of this, but it is not explicit or convenient in my opinion.

Looking at the Prometheus roadmap, some of these issues might not be fixed in Prometheus but achieved through integrating other systems such as InfluxDB (where experimental support already exists).

Purpose

A lot of these limitations make sense if you look at the purpose for which Prometheus was created: getting high-resolution data and being able to provide as much troubleshooting information as possible to its Alerting engine. It is not really designed for storing extensive history. Too bad! I would very much like to get both of those properties in the single system!

Visualization

As you install Prometheus, it has a built-in Expression Browser, which is great for debugging and interactive analyses. It also allows you to see what data you actually have in the database. It will disappoint you, however, if you’re looking for beautiful graphs!

HTTP installer

This shows I have the information about MySQL query rate from two servers, as well as the available and configured labels.

If I want to pick one server and look at the average rate of queries per five minutes, I can do this:

HTTP graphs

There are some tools available in the graph to chose the time range and resolution.

You should aware that visualizing data with the rate() function often shows you things that do not exist. In this case, it looks like the number of queries was gradually creeping up. In reality, I just started the benchmark so the number of queries jumped almost immediately. This is what the real situation looks like (using irate()):

HTTP graphs 2

As I explained before, irate() does not handle missing data points very well, plus it behaves somewhat bizarrely when you “zoom out” – providing instant rate information at sparse intervals (e.g., the instant rate computed every one second over 60 seconds) rather than smoothing things to averages.

There is also the PromDash tool available for Prometheus, which gives you nicer looking dashboards and supports a lot of Prometheus’ features. Now that Grafana has official support for Prometheus, it is my preferred tool to build dashboards – especially since it supports multiple data sources besides Prometheus.

Summary

I’m very excited about Prometheus. It allows me to get a lot of information easily and use it for Performance analyses in benchmarking or troubleshooting. It would be great if it also had a simple integrated solution for long term storage and trending. I am also looking forward to better integration with Grafana and better documentation on how to create Prometheus-based dashboards – especially with some Prometheus-based examples!  

Note: All above was written about Prometheus 0.16.1. Prometheus is rapidly evolving and may  change with newer versions.

Oct
07
2015
--

Using Apache Spark and MySQL for Data Analysis

What is Spark

Apache Spark is a cluster computing framework, similar to Apache Hadoop. Wikipedia has a great description of it:

Apache Spark is an open source cluster computing framework originally developed in the AMPLab at University of California, Berkeley but was later donated to the Apache Software Foundation where it remains today. In contrast to Hadoop’s two-stage disk-based MapReduce paradigm, Spark’s multi-stage in-memory primitives provides performance up to 100 times faster for certain applications. By allowing user programs to load data into a cluster’s memory and query it repeatedly, Spark is well-suited to machine learning algorithms.

Apache Spark

In contrast to popular belief, Spark does not require all data to fit into memory but will use caching to speed up the operations (just like MySQL). Spark can also run in standalone mode and does not require Hadoop; it can also be run on a single server (or even laptop or desktop) and utilize all your CPU cores.

Starting it in a distributed mode is really easy. Start the “master” first. You can run the “slave” on the same node:

root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

Then run Spark Worker on any additional nodes (make sure to add the hostname to /etc/hosts or use DNS):

root@d31:~/spark# ./sbin/start-slave.sh spark://thor:7077

Why Spark and Not MySQL?

There are a number of tasks where MySQL (out-of-the-box) does not show great performance. One of the MySQL limitations is: 1 query = 1 cpu core. It means that even if you have 48 fast cores and a large dataset to process (i.e. group by, sort, etc) it will not utilize the full computing power. Spark, on the contrary, will be able to utilize all your CPU cores.

Another difference between MySQL and Spark:

  • MySQL uses so called “schema on write” – it will need the data to be converted into MySQL. If our data is not inside MySQL you can’t use “sql” to query it.
  • Spark (and Hadoop/Hive as well) uses “schema on read” – it can apply a table structure on top of a compressed text file, for example, (or any other supported input format)  and see it as a table; then we can use SQL to query this “table.”

In other words, MySQL is storage+processing while Spark’s job is processing only, and it can pipe data directly from/to external datasets, i.e., Hadoop, Amazon S3, local files, JDBC (MySQL/other databases). Spark supports text files (compressed), SequenceFiles, and any other Hadoop InputFormat as well as Parquet Columnar storage. Spark is more flexible in this regard compared to Hadoop: Spark can read data directly from MySQL, for example.

The typical pipeline to load external data to MySQL is:

  1. Uncompress (typically the external data is in compressed text files)
  2. Load it into MySQL’s staging table with “LOAD DATA INFILE”
  3. Only then we can filter/group by and save the result in another table

That can cause additional overhead. In many cases we do not need the “raw” data but we still have to load it into MySQL.

Why Spark Together With MySQL

On the contrary, the result of our analysis (i.e. aggregated data) should be in MySQL. It does not have to be, but it is much more convenient to store the result of your analysis in MySQL. Let’s say you want to analyze a big dataset (i.e. year to year sales comparison) and you will need to present it in the form of a table or graph. The result set will be significantly smaller as it will be aggregated and it will be much easier to store it in MySQL as many standard applications will work with that.

Real World Test Case

One of interesting free datasets is Wikipedia Page Counts. (>1TB compressed, available since 2008). This data can be downloaded (as gzipped space delimited text files) and is also available (limited dataset) on AWS. The data is aggregated by the hour and has the following fields:

  • project (i.e. “en”, “fr”, etc, which is usually a language)
  • title of the page (uri), urlencoded
  • number of requests
  • size of the content returned

(the date field is encoded inside the filename, 1 file per hour)

Our goal will be to find the top 10 pages by the number of requests per day in English Wikipedia, but also to support searching for an arbitrary word so we can show how, for example, the number of requests for the wikipedia article about “myspace” will compare to the article about “facebook” (2008 to 2015).

To do that in MySQL we will have to load it as is into MySQL. The files are distributed with the date part encoded. The uncompressed size of all files is > 10TB. Here are the possible steps (as per our typical MySQL pipeline):

  1. Uncompress the file and run “LOAD DATA INFILE” into a staging (temporary) table:
    load data local infile '$file'
    into table wikistats.wikistats_full CHARACTER SET latin1
    FIELDS TERMINATED BY ' '
    (project_name, title, num_requests, content_size)
    set request_date = STR_TO_DATE('$datestr', '%Y%m%d %H%i%S');
  2. Aggregate with “insert into” a final table
    insert into wikistats.wikistats_by_day
    select date(request_date) as request_day, title, count(*), sum(num_requests)
    from wikistats.wikistats_full
    group by request_day, title;
  3. Somehow url decode the title (may be using UDF).

This is a big overhead. We will uncompress and transform the data into MySQL just to discard most if it.

According to my calculations it should table > 1 month to do the whole pipeline for 6 years of data (this time does not include the uncompress time and does not include the load time depreciation as the table get bigger and bigger and indexes need to be updated). There are a lots of things we can do here to speed it up of course, i.e., load into different MySQL instances, load into MEMORY table first, then group by into InnoDB, etc.

But one of the easiest ways here will be using Apache Spark and Python script (pyspark). Pyspark can read the original gziped text files, query those text files with SQL, apply any filters, functions, i.e. urldecode, group by day and save the resultset into MySQL.

Here is the Python script to perform those actions:

from pyspark import SparkContext
sc=SparkContext()
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
import urllib
from datetime import timedelta, date
def load_day(filename, mydate):
    # Load a text file and convert each line to a Row.
    lines = sc.textFile(filename)
    parts = lines.map(lambda l: l.split(" ")).filter(lambda line: line[0]=="en").filter(lambda line: len(line)>3).cache()
    wiki = parts.map(lambda p: Row(project=p[0],  url=urllib.unquote(p[1]).lower(), num_requests=int(p[2]), content_size=int(p[3])))
    #wiki.count()
    # Infer the schema, and register the DataFrame as a table.
    schemaWiki = sqlContext.createDataFrame(wiki)
    schemaWiki.registerTempTable("wikistats")
    group_res = sqlContext.sql("SELECT '"+ mydate + "' as mydate, url, count(*) as cnt, sum(num_requests) as tot_visits FROM wikistats group by url")
    # Save to MySQL
    mysql_url="jdbc:mysql://thor?user=wikistats&password=wikistats"
    group_res.write.jdbc(url=mysql_url, table="wikistats.wikistats_by_day_spark", mode="append")
    # Write to parquet file - if needed
    group_res.saveAsParquetFile("/ssd/wikistats_parquet_bydate/mydate=" + mydate)
mount = "/data/wikistats/"
d= date(2008, 1, 1)
end_date = date(2008, 2, 1)
delta = timedelta(days=1)
while d < end_date:
    print d.strftime("%Y-%m-%d")
    filename=mount + "wikistats//dumps.wikimedia.org/other/pagecounts-raw/2008/2008-01/pagecounts-200801" + d.strftime("%d") + "-*.gz"
    print(filename)
    load_day(filename, d.strftime("%Y-%m-%d"))
    d += delta

In the script I used Spark to read the original gzip files (1 day at a time). We can use directory as “input” or a list of files. I will then use Resilient Data Set (RDD) transformations; python has lambda functions: map and filter which will allow us to split the “input files” and filter them.

The next step will be to apply the schema (declare fields); here we can also apply any other functions; i.e., I use urllib.unquote to decode the title (urldecode). Finally, we can register the temp table and then use familiar SQL to do the group by.

The script will normally utilize all cpu cores. In addition it is very easy to run it in distributed mode even without Hadoop: just copy the files to all machines in a Spark cluster or use NFS/external storage.

The script took about an hour on 3 boxes to process 1 month of data and load the aggregated data to MySQL (single instance). We can estimate that to load all 6 years (aggregated) to MySQL is ~3 days.

You may now ask, why is it significantly faster (and we still have the result loaded to the same MySQL instance)? The answer is, it is a different, more efficient pipeline. In our original MySQL pipeline (which will probably take months) we load the raw data to MySQL. Here we filter and group on read, and write only what we need to MySQL.

One question may also come up here: do we actually need this whole “pipeline?” Can we simply run our analytical queries on top of the “raw” data? Well, that is possible, but will probably require 1000 nodes Spark Cluster to do it efficiently as it will need to scan through 5TB of data (see “more reading” below).

Multi-treaded Performance for MySQL Inserts

When using group_res.write.jdbc(url=mysql_url, table=”wikistats.wikistats_by_day_spark”, mode=”append”) Spark will use multiple threads to insert into MySQL.

+------+-----------+------------+-----------+---------+------+--------+--------------------------------------------------------------------------------------------------------+-----------+---------------+
 | Id   | User      | Host       | db        | Command | Time | State  | Info                                                                                                   | Rows_sent | Rows_examined |
 +------+-----------+------------+-----------+---------+------+--------+--------------------------------------------------------------------------------------------------------+-----------+---------------+
 | 1050 | root      | localhost  | wikistats | Query   |    0 | init   | show processlist                                                                                       |         0 |             0 |
 | 2133 | wikistats | thor:40994 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Colegio+san+ignacio', 1, 1)        |         0 |             0 |
 | 2134 | wikistats | thor:40995 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Miloš_Crnjanski', 2, 3)            |         0 |             0 |
 | 2135 | wikistats | thor:40996 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Robert_Edgar', 6, 7)               |         0 |             0 |
 | 2136 | wikistats | thor:40997 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Eastern_Orange_Tip', 6, 7)         |         0 |             0 |
 | 2137 | wikistats | thor:40998 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Image:Dresden_Augustusbrücke_Al   |         0 |             0 |
 | 2138 | wikistats | thor:40999 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Diamond_and_pearl', 11, 24)        |         0 |             0 |
 | 2139 | wikistats | thor:41000 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Operation_polo', 2, 2)             |         0 |             0 |
 | 2140 | wikistats | thor:41001 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Template_talk:Edit-first-section   |         0 |             0 |
 | 2141 | wikistats | thor:41002 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Bertha_of_Artois', 1, 1)           |         0 |             0 |
 | 2142 | wikistats | thor:41003 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'A Change of Pace', 1, 1)           |         0 |             0 |
 | 2143 | wikistats | thor:41005 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'FAIRCHILD-REPUBLIC A-10 THUNDERB   |         0 |             0 |
 | 2144 | wikistats | thor:41006 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Special:Recentchangeslinked/Wiki   |         0 |             0 |
 | 2145 | wikistats | thor:41007 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Image:Carl-sassenrath-sp-1982.jp   |         0 |             0 |
 | 2146 | wikistats | thor:41008 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'List_of_Fleet_Air_Arm_aircraft_s   |         0 |             0 |
 | 2147 | wikistats | thor:41009 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Systemic_sclerosis', 17, 29)       |         0 |             0 |
 | 2148 | wikistats | thor:41011 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'tataviam', 1, 1)                   |         0 |             0 |
 | 2149 | wikistats | thor:41010 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'The_Devil_Wears_Prada_(film)#_no   |         0 |             0 |
 | 2150 | wikistats | thor:41013 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Seaford_High_School', 5, 7)        |         0 |             0 |
 | 2151 | wikistats | thor:41014 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Talk:Shocker_(hand_gesture)', 3,   |         0 |             0 |
 | 2152 | wikistats | thor:41015 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Paul_Szabo', 14, 23)               |         0 |             0 |
 | 2153 | wikistats | thor:41016 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'ausgereift', 1, 1)                 |         0 |             0 |
 | 2154 | wikistats | thor:41017 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Category:March_2005_news', 1, 2)   |         0 |             0 |
 | 2155 | wikistats | thor:41018 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Foot_Locker_Inc', 10, 10)          |         0 |             0 |
 | 2156 | wikistats | thor:41019 | NULL      | Query   |    0 | update | INSERT INTO wikistats.wikistats_by_day_spark VALUES ('2008-01-04', 'Abbey_Park,_Nottinghamshire', 3,   |         0 |             0 |
 +------+-----------+------------+-----------+---------+------+--------+--------------------------------------------------------------------------------------------------------+-----------+---------------+
 25 rows in set (0.00 sec)

Monitoring your jobs

Spark provides you with a web interface  to monitor and manage your job. Here is the example: I’m running the wikistats.py application:

Spark Web Interface Screen Shot 2015-09-22 at 2.32.50 PM

Result: Using Parquet Columnar Format vs MySQL InnoDB table

Spark supports Apache Parquet Columnar format, so we can save RDD as a parquet file (it can be saved to a directory to HDFS):

group_res.saveAsParquetFile("/ssd/wikistats_parquet_bydate/mydate=" + mydate)

Here we save the result of our pipeline (aggregated data) into Spark. I also utilize partitioning by day (“mydate=20080101”) and Spark can auto discover partitions in this format.

When I have my results, I want to query it. Let’s say I want to find the top 10 most frequently queried wiki pages in January 2018. I can do this query with MySQL (I will need to filter out main page and search pages):

mysql> SELECT lower(url) as lurl, sum(tot_visits) as max_visits , count(*) FROM wikistats_by_day_spark where lower(url) not like '%special%' and lower(url) not like '%page%' and lower(url) not like '%test%' and lower(url) not like '%wiki%' group by lower(url) order by max_visits desc limit 10;
+--------------------------------------------------------+------------+----------+
| lurl                                                   | max_visits | count(*) |
+--------------------------------------------------------+------------+----------+
| heath_ledger                                           |    4247338 |      131 |
| cloverfield                                            |    3846404 |      131 |
| barack_obama                                           |    2238406 |      153 |
| 1925_in_baseball#negro_league_baseball_final_standings |    1791341 |       11 |
| the_dark_knight_(film)                                 |    1417186 |       64 |
| martin_luther_king,_jr.                                |    1394934 |      136 |
| deaths_in_2008                                         |    1372510 |       67 |
| united_states                                          |    1357253 |      167 |
| scientology                                            |    1349654 |      108 |
| portal:current_events                                  |    1261538 |      125 |
+--------------------------------------------------------+------------+----------+
10 rows in set (1 hour 22 min 10.02 sec)

Please note, here we are using our already aggregated (summary by data) table, not the “raw” data.

As we can see, the query took 1 hour 22 mins. I have also saved the same results to Parquet (see the script), so now I can use it with Spark-SQL:

./bin/spark-sql --master local

This will use a local version of spark-sql, using 1 host only.

spark-sql> CREATE TEMPORARY TABLE wikistats_parquet
USING org.apache.spark.sql.parquet
OPTIONS (
  path "/ssd/wikistats_parquet_bydate"
);
Time taken: 3.466 seconds
spark-sql> select count(*) from wikistats_parquet;
select count(*) from wikistats_parquet;
227846039
Time taken: 82.185 seconds, Fetched 1 row(s)
spark-sql> SELECT lower(url) as lurl, sum(tot_visits) as max_visits , count(*) FROM wikistats_parquet where lower(url) not like '%special%' and lower(url) not like '%page%' and lower(url) not like '%test%' and lower(url) not like '%wiki%' group by lower(url) order by max_visits desc limit 10;
heath_ledger    4247335 42
cloverfield     3846400 42
barack_obama    2238402 53
1925_in_baseball#negro_league_baseball_final_standings  1791341 11
the_dark_knight_(film)  1417183 36
martin_luther_king,_jr. 1394934 46
deaths_in_2008  1372510 38
united_states   1357251 55
scientology     1349650 44
portal:current_events   1261305 44
Time taken: 1239.014 seconds, Fetched 10 row(s)

That took ~20 minutes, which is much faster.

Conclusion

Apache Spark provides a great and easy way to analyze and aggregate data. What I love about Spark vs other big data and analytical frameworks:

  • Open-source and actively developed
  • No dependency on tools, i.e., the input data and output data does not have to be in Hadoop
  • Standalone mode for quick start, easy to deploy
  • Massively parallel, easy to add nodes
  • Support of variety of input and output format; i.e., it can read/write to MySQL (vs JDBC driver) and Parquet Columnar format

However, there are a number of drawbacks:

  • It is still new so you can expect some bugs and undocumented behavior. Many of the errors are hard to explain.
  • It requires Java; Spark 1.5 only supports Java 7 and higher.  That also means it will require additional memory, which is reasonable nowadays.
  • You will need to run jobs through “spark-submit”

I believe Apache Spark is a great tool and can complement MySQL for data analytical and BI purposes.

More reading

The post Using Apache Spark and MySQL for Data Analysis appeared first on MySQL Performance Blog.

May
19
2015
--

DataHero Snags $6.1M To Democratize Data Visualization

GDP by country displayed on a world map. DataHero, the SaaS company that wants to bring data visualization to non-technical end users, announced $6.1 million in Series A funding today. It also announced a new chief executive and a partnership with Hubspot. The round was led by existing investor Foundry Group. The company also announced it was bringing on veteran executive Ed Miller, who has been CEO at several startups with… Read More

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com