May
15
2023
--

Proof of Concept: Horizontal Write Scaling for MySQL With Kubernetes Operator

horizontal write scaling kubernetes

Historically MySQL is great in horizontal READ scale. The scaling, in that case, is offered by the different number of Replica nodes, no matter if using standard asynchronous replication or synchronous replication.

However, those solutions do not offer the same level of scaling for writes operation.

Why? Because the solutions still rely on writing in one single node that works as Primary. Also, in the case of multi-Primary, the writes will be distributed by transaction. In both cases, when using virtually-synchronous replication, the process will require certification from each node and local (by node) write; as such, the number of writes is NOT distributed across multiple nodes but duplicated.

The main reason behind this is that MySQL is a relational database system (RDBMS), and any data that is going to be written in it must respect the RDBMS rules. In short, any data that is written must be consistent with the data present. To achieve that, the data needs to be checked with the existing through defined relations and constraints. This action is something that can affect very large datasets and be very expensive. Think about updating a table with millions of rows that refer to another table with another million rows.

An image may help:

data model for ecommerce

Every time I will insert an order, I must be sure that all the related elements are in place and consistent.

This operation is quite expensive but our database can run it in a few milliseconds or less, thanks to several optimizations that allow the node to execute most of them in memory with no or little access to mass storage.

The key factor is that the whole data structure resides in the same location (node), facilitating the operations.

Once we have understood that, it will also become clear why we cannot have relational data split in multiple nodes and have to distribute writes by table. If I have a node that manages only the items, another one the orders, and another one the payments, I will need to have my solution able to deal with distributed transactions, each of which needs to certify and verify other nodes’ data.

This level of distribution will seriously affect the efficiency of the operation, which will increase the response time significantly. This is it. Nothing is impossible; however, the performances will be so impacted that each operation may take seconds instead of milliseconds or a fraction of it unless lifting some of the rules breaks the relational model.

MySQL, as well as other RDBMS, are designed to work respecting the model and cannot scale in any way by fragmenting and distributing a schema, so what can be done to scale?

The alternative is to split a consistent set of data into fragments. What is a consistent set of data? It all depends on the kind of information we are dealing with. Keeping in mind the example above, where we have a shop online serving multiple customers, we need to identify which is the most effective way to split the data.

For instance, if we try to split the data by Product Type (Books, CD/DVD, etc.), we will have a huge duplication of data related to customers/orders/shipments and so on, and all this data is also quite dynamic given I will have customers constantly ordering things.

Why duplicate the data? Because if I do not duplicate that data, I will not know if a customer has already bought or not that specific item, or I will have to ask again about the shipment address and so on. This also means that whenever a customer buys something or puts something on the wish list, I have to reconcile the data in all my nodes/clusters.

On the other hand, if I choose to split my data by country of customer’s residence, the only data I will have to duplicate and keep in sync is the one related to the products, of which the most dynamic one will be the number of items in stock. This, of course, is unless I can organize my products by country as well, which is a bit unusual nowadays but not impossible.

Another possible case is if I am a health organization and I manage several hospitals. As for the example above, it will be easier to split my data by hospital, given most of the data related to patients is bound to the hospital itself, as well as treatments and any other element related to hospital management. In contrast, it will make no sense to split by patient’s country of residence.

This technique of splitting the data into smaller pieces is called sharding and is currently the only way we have to scale RDBMS horizontally. 

In the MySQL open source ecosystem, we have only two consolidated ways to perform sharding — Vitess and ProxySQL. The first one is a complete solution that takes ownership of your database and manages almost any aspect of its operations in a sharded environment and includes a lot of specific features for DBAs to deal with daily operations like table modifications, backup, and more.

While this may look great, it also has some strings attached, including the complexity and proprietary environment. That makes Vitess a good fit for “complex” sharding scenarios where other solutions may not be enough.

ProxySQL does not have a sharding mechanism “per se,” but given the way it works and the features it has, it allows us to build simple sharding solutions.

It is important to note that most of the DBA operations will still be on DBA to be executed, with incremented complexity given the sharding environment.

There is a third option which is application-aware sharding.

This solution sees the application aware of the need to split the data into smaller fragments and internally point the data to different “connectors” that are connected to multiple data sources.

In this case, the application is aware of a customer’s country and will redirect all the operations related to him to the datasource responsible for the specific fragment.

Normally this solution requires a full code redesign and could be quite difficult to achieve when it is injected after the initial code architecture definition.

On the other hand, if done at design, it is probably the best solution because it will allow the application to define the sharding rules and can also optimize the different data sources using different technologies for different uses.

One example could be using an RDBMS for most of the Online transaction processing (OLTP) data shared by country and having the products as distributed memory cache with a different technology. At the same time, all the data related to orders, payments, and customer history can be consolidated in a data warehouse used to generate reporting.    

As said, the last one is probably the most powerful, scalable, and difficult to design, and unfortunately, it represents probably less than 5% of the solution currently deployed. 

As well, very few cases are in need to have a full system/solution to provide scalability with sharding.

By experience, most of the needs for horizontal scaling fell in the simple scenario, where there is the need to achieve sharding and data separation, very often with sharding-nothing architecture. In shared-nothing, each shard can live in a totally separate logical schema instance / physical database server/data center/continent. There is no ongoing need to retain shared access (from between shards) to the other unpartitioned tables in other shards.

The POC

Why this POC?

Over the years, I have faced a lot of customers talking about scaling their database solution and looking at very complex sharding as Vitess as the first and only way to go.

This without even considering if their needs were driving them there for real.

In my experience and in talking with several colleagues, I am not alone when analyzing the real needs. After discussing with all the parties impacted, only a very small percentage of customers were in real need of complex solutions. Most of the others were just trying to avoid a project that will implement simple shared-nothing solutions. Why? Because apparently, it is simpler to migrate data to a platform that does all for you than accept a bit of additional work and challenge at the beginning but keep a simple approach. Also, going for the last shining things always has its magic.

On top of that, with the rise of Kubernetes and MySQL Operators, a lot of confusion started to circulate, most of which was generated by the total lack of understanding that a database and a relational database are two separate things. That lack of understanding of the difference and the real problems attached to an RDBMS had brought some to talk about horizontal scaling for databases, with a concerning superficiality and without clarifying if they were talking about RDBMS or not. As such, some clarification is long due as well as putting back the KISS principle as the main focus.

Given that, I thought that refreshing how ProxySQL could help in building a simple sharding solution may help to clarify the issues, reset the expectations and show how we can do things in a simpler way.  (See my old post, MySQL Sharding with ProxySQL.)

To do so, I built a simple POC that illustrates how you can use Percona Operator for MySQL (POM) and ProxySQL to build a sharded environment with a good level of automation for some standard operations like backup/restore software upgrade and resource scaling.

Why ProxySQL?

In the following example, we mimic a case where we need a simple sharding solution, which means we just need to redirect the data to different data containers, keeping the database maintenance operations on us. In this common case, we do not need to implement a full sharding system such as Vitess.  

As illustrated above, ProxySQL allows us to set up a common entry point for the application and then redirect the traffic on the base of identified sharding keys. It will also allow us to redirect read/write traffic to the primary and read-only traffic to all secondaries. 

The other interesting thing is that we can have ProxySQL as part of the application pod, or as an independent service. Best practices indicate that having ProxySQL closer to the application will be more efficient, especially if we decide to activate the caching feature.  

Why POM?

Percona Operator for MySQL has three main solutions: Percona Operator for Percona XtraDB Cluster, Percona Operator for MySQL Group Replication, and Percona Operator for Percona Server for MySQL. The first two are based on virtually-synchronous replication and allow the cluster to keep the data state consistent across all pods, guaranteeing that the service will always offer consistent data. In the K8s context, we can see POM as a single service with native horizontal scalability for reads, while for writes, we will adopt the mentioned sharding approach. 

The other important aspect of using a POM-based solution is the automation it comes with. Deploying POM, you will be able to set automation for backups, software updates, monitoring (using Percona Monitoring and Management (PMM)), and last but not least, the possibility to scale UP or DOWN just by changing the needed resources. 

The elements used

kubernetes sharding

In our POC, I will use a modified version of sysbench (https://github.com/Tusamarco/sysbench) that has an additional field continent and I will use that as a sharding key. At the moment, and for the purpose of this simple POC, I will only have two shards.

As the diagram above illustrates here, we have a simple deployment but good enough to illustrate the sharding approach.

We have:

  • The application(s) node(s) — it is really up to you if you want to test with one application node or more. Nothing will change, as well as for the ProxySQL nodes, but just keep in mind that if you use more ProxySQL nodes is better to activate the internal cluster support or use consul to synchronize them.
  • Shard one is based on POM with PXC; it has the following:
  • Load balancer for service entry point
    • Entry point for r/w
    • Entry point for read only
  • Three Pods for Haproxy
    • Haproxy container
    • Pmm agent container
  • Three Pods with data nodes (PXC)
    • PXC cluster node container
    • Log streaming
    • Pmm container 
  • Backup/restore service 
  • Shard two is based on POM for Percona Server for MySQL and Group Replication (technical preview)
    • Load balancer for service entry point
      • Entry point for r/w
      • Entry point for read-only
    • Three Pods for MySQL Router (testing)
      • MySQL router container
    • Three Pods with data nodes (PS with GR)
      • PS -GR cluster node container
      • Log streaming
      • Pmm container 
    • Backup/restore on scheduler

Now you may have noticed that the representation of the nodes is different in size; this is not a mistake while drawing. It indicates that I have allocated more resources (CPU and Memory) to shard1 than shard2. Why? Because I can and I am simulating a situation where a shard2 gets less traffic, at least temporarily, as such I do not want to give it the same resources as shard1. I will eventually increase them if I see the need.

The settings

Data layer

Let us start with the easy one, the data layer configuration. Configuring the environment correctly is the key, and to do so, I am using a tool that I wrote specifically to calculate the needed configuration in a K8s POM environment. You can find it here (https://github.com/Tusamarco/mysqloperatorcalculator). 

Once you have compiled it and run it, you can simply ask what “dimensions” are supported, or you can define a custom level of resources, but you will still need to indicate the expected load level. In any case, please refer to the README in the repository with all the instructions.

The full cr.yaml for PXC shard1 is here, while the one for PS-GR is here

For Shard1: I asked for resources to cover traffic of type 2 (Light OLTP), configuration type 5 (2XLarge) 1000 connections.

For Shard2: I ask for resources to cover traffic of type 2 (Light OLTP), configuration type 2 (Small), 100 connections.     

Once you have the CRs defined, you can follow the official guidelines to set the environment up:

It is time now to see the ProxySQL settings.

ProxySQL and sharding rules

As mentioned before, we will test the load sharding by continent, and we know that ProxySQL will not provide additional help to automatically manage the sharded environment. 

Given that one way to do it is to create a DBA account per shard or to inject shard information in the commands while executing. I will use the less comfortable one just to prove if it works, the different DBA accounts. 

We will have two shards: the sharding key is the continent field, and the continents will be grouped as follows:

  • Shard one:
    • Asia
    • Africa
    • Antarctica
    • Europe
    • North America
  • Shard two:
    • Oceania
    • South America

The DBAs users:

  • dba_g1
  • dba_g2

The application user:

  • app_test

The host groups will be:

  • Shard one
    • 100 Read and Write
    • 101 Read only
  • Shard two
    • 200 Read and Write
    • 201 Read only

Once that is defined, we need to identify which query rules will serve us and how. What we want is to redirect all the incoming queries for:

  • Asia, Africa, Antarctica, Europe, and North America to shard1.
  • Oceania and South America to shard2
  • Split the queries in R/W and Read only
  • Prevent the execution of any query that does not have a shard key
  • Backup data at regular intervals and store it in a safe place

ProxySQL and sharding rules

Given the above, we first define the rules for the DBAs accounts.

We set the Hostgroup for each DBA and then if the query matches the sharding rule, we redirect it to the proper sharding. Otherwise, the HG will remain as set.

This allows us to execute queries like CREATE/DROP table on our shard without a problem but will allow us to send data where needed. 

For instance, the one below is the output of the queries that sysbench will run.

Prepare:

INSERT INTO windmills_test1 /*  continent=Asia */ (uuid,millid,kwatts_s,date,location,continent,active,strrecordtype) VALUES(UUID(), 79, 3949999,NOW(),'mr18n2L9K88eMlGn7CcctT9RwKSB1FebW397','Asia',0,'quq')

In this case, I have the application simply injecting a comment in the INSERT SQL declaring the shard key; given I am using the account dba_g1 to create/prepare the schemas, rules 32/32 will be used and given I have sett apply=1, ProxySQL will exit the query rules parsing and send the command to the relevant hostgroup.

Run:

SELECT id, millid, date,continent,active,kwatts_s FROM windmills_test1 WHERE id BETWEEN ? AND ? AND continent='South America'

SELECT SUM(kwatts_s) FROM windmills_test1 WHERE id BETWEEN ? AND ?  and active=1  AND continent='Asia'
SELECT id, millid, date,continent,active,kwatts_s  FROM windmills_test1 WHERE id BETWEEN ? AND ?  AND continent='Oceania' ORDER BY millid

SELECT DISTINCT millid,continent,active,kwatts_s   FROM windmills_test1 WHERE id BETWEEN ? AND ? AND active =1  AND continent='Oceania' ORDER BY millid

UPDATE windmills_test1 SET active=? WHERE id=?  AND continent='Asia'
UPDATE windmills_test1 SET strrecordtype=? WHERE id=?  AND continent='North America'

DELETE FROM windmills_test1 WHERE id=?  AND continent='Antarctica'

INSERT INTO windmills_test1 /* continent=Antarctica */ (id,uuid,millid,kwatts_s,date,location,continent,active,strrecordtype) VALUES (?, UUID(), ?, ?, NOW(), ?, ?, ?,?) ON DUPLICATE KEY UPDATE kwatts_s=kwatts_s+1

The above are executed during the tests.  In all of them, the sharding key is present, either in the WHERE clause OR as a comment. 

Of course, if I execute one of them without the sharding key, the firewall rule will stop the query execution, i.e.:

mysql> SELECT id, millid, date,continent,active,kwatts_s FROM windmills_test1 WHERE id BETWEEN ? AND ?;
ERROR 1148 (42000): It is impossible to redirect this command to a defined shard. Please be sure you Have the Continent definition in your query, or that you use a defined DBA account (dba_g{1/2})


Check
here for the full command list.

Setting up the dataset

Once the rules are set, it is time to set up the schemas and the data using sysbench (https://github.com/Tusamarco/sysbench). Remember to use windmills_sharding tests.  

The first operation is to build the schema on SHARD2 without filling it with data. This is a DBA action; as such, we will execute it using the dba_g2 account:

sysbench ./src/lua/windmills_sharding/oltp_read.lua  --mysql-host=10.0.1.96  --mysql-port=6033 --mysql-user=dba_g2 --mysql-password=xxx --mysql-db=windmills_large --mysql_storage_engine=innodb --db-driver=mysql --tables=4 --table_size=0 --table_name=windmills --mysql-ignore-errors=all --threads=1  prepare

Setting table_size and pointing to the ProxySQL IP/port will do, and I will have the following:

mysql> select current_user(), @@hostname;
+----------------+-------------------+
| current_user() | @@hostname        |
+----------------+-------------------+
| dba_g2@%       | ps-mysql1-mysql-0 |
+----------------+-------------------+
1 row in set (0.01 sec)

mysql> use windmills_large;
Database changed
mysql> show tables;
+---------------------------+
| Tables_in_windmills_large |
+---------------------------+
| windmills1                |
| windmills2                |
| windmills3                |
| windmills4                |
+---------------------------+
4 rows in set (0.01 sec)

mysql> select count(*) from windmills1;
+----------+
| count(*) |
+----------+
|        0 |
+----------+
1 row in set (0.09 sec)

All set but empty.

Now let us do the same but with the other DBA user:

sysbench ./src/lua/windmills_sharding/oltp_read.lua  --mysql-host=10.0.1.96  --mysql-port=6033 --mysql-user=dba_g1 --mysql-password=xxx --mysql-db=windmills_large --mysql_storage_engine=innodb --db-driver=mysql --tables=4 --table_size=400 --table_name=windmills --mysql-ignore-errors=all --threads=1  prepare

If I do now the select above with user dba_g2:

mysql> select current_user(), @@hostname;select count(*) from windmills1;
+----------------+-------------------+
| current_user() | @@hostname        |
+----------------+-------------------+
| dba_g2@%       | ps-mysql1-mysql-0 |
+----------------+-------------------+
1 row in set (0.00 sec)

+----------+
| count(*) |
+----------+
|      113 |
+----------+
1 row in set (0.00 sec)

While If I reconnect and use dba_g1:

mysql> select current_user(), @@hostname;select count(*) from windmills1;
+----------------+--------------------+
| current_user() | @@hostname         |
+----------------+--------------------+
| dba_g1@%       | mt-cluster-1-pxc-0 |
+----------------+--------------------+
1 row in set (0.00 sec)

+----------+
| count(*) |
+----------+
|      287 |
+----------+
1 row in set (0.01 sec)

I can also check on ProxySQL to see which rules were utilized:

select active,hits,destination_hostgroup, mysql_query_rules.rule_id, match_digest, match_pattern, replace_pattern, cache_ttl, apply,flagIn,flagOUT FROM mysql_query_rules NATURAL JOIN stats.stats_mysql_query_rules ORDER BY mysql_query_rules.rule_id;

+------+-----------------------+---------+---------------------+----------------------------------------------------------------------------+-------+--------+---------+
| hits | destination_hostgroup | rule_id | match_digest        | match_pattern                                                              | apply | flagIN | flagOUT |
+------+-----------------------+---------+---------------------+----------------------------------------------------------------------------+-------+--------+---------+
| 3261 | 100                   | 20      | NULL                | NULL                                                                       | 0     | 0      | 500     |
| 51   | 200                   | 21      | NULL                | NULL                                                                       | 0     | 0      | 600     |
| 2320 | 100                   | 31      | NULL                | scontinents*(=|like)s*'*(Asia|Africa|Antarctica|Europe|North America)'* | 1     | 500    | 0       |
| 880  | 200                   | 32      | NULL                | scontinents*(=|like)s*'*(Oceania|South America)'*                       | 1     | 500    | 0       |
| 0    | 100                   | 34      | NULL                | scontinents*(=|like)s*'*(Asia|Africa|Antarctica|Europe|North America)'* | 1     | 600    | 0       |
| 0    | 200                   | 35      | NULL                | scontinents*(=|like)s*'*(Oceania|South America)'*                       | 1     | 600    | 0       |
| 2    | 100                   | 51      | NULL                | scontinents*(=|like)s*'*(Asia|Africa|Antarctica|Europe|North America)'* | 0     | 0      | 1001    |
| 0    | 200                   | 54      | NULL                | scontinents*(=|like)s*'*(Oceania|South America)'*                       | 0     | 0      | 1002    |
| 0    | 100                   | 60      | NULL                | NULL                                                                       | 0     | 50     | 1001    |
| 0    | 200                   | 62      | NULL                | NULL                                                                       | 0     | 60     | 1002    |
| 7    | NULL                  | 2000    | .                   | NULL                                                                       | 1     | 0      | NULL    |
| 0    | 100                   | 2040    | ^SELECT.*FOR UPDATE | NULL                                                                       | 1     | 1001   | NULL    |
| 2    | 101                   | 2041    | ^SELECT.*$          | NULL                                                                       | 1     | 1001   | NULL    |
| 0    | 200                   | 2050    | ^SELECT.*FOR UPDATE | NULL                                                                       | 1     | 1002   | NULL    |
| 0    | 201                   | 2051    | ^SELECT.*$          | NULL                                                                       | 1     | 1002   | NULL    |
+------+-----------------------+---------+---------------------+----------------------------------------------------------------------------+-------+--------+---------+

Running the application

Now that the data load test was successful let us do the real load following the indication as above but use 80 Tables and just a bit more records like 20000, nothing huge. 

Once the data is loaded, we will have the two shards with different numbers of records. If all went well, the shard2 should have ¼ of the total and shard1 ¾.

When the load is over, I have, as expected:

mysql> select current_user(), @@hostname;select count(*) as shard1 from windmills_large.windmills80;select /* continent=shard2 */ count(*) as shard2 from windmills_large.windmills80;
+----------------+--------------------+
| current_user() | @@hostname         |
+----------------+--------------------+
| dba_g1@%       | mt-cluster-1-pxc-0 |
+----------------+--------------------+
1 row in set (0.00 sec)

+--------+
| shard1 |
+--------+
|  14272 | ← Table windmills80 in SHARD1
+--------+
+--------+
| shard2 |
+--------+
|   5728 | ← Table windmills80 in SHARD2
+--------+

As you may have already noticed, I used a trick to query the other shard using the dba_g1 user, I just passed in the query the shard2 definition as a comment. That is all we need.

Let us execute the run command for writes in sysbench and see what happens.

The first thing we can notice while doing writes is the query distribution:

+--------+-----------+----------------------------------------------------------------------------+----------+--------+----------+----------+--------+---------+-------------+---------+
| weight | hostgroup | srv_host                                                                   | srv_port | status | ConnUsed | ConnFree | ConnOK | ConnERR | MaxConnUsed | Queries |
+--------+-----------+----------------------------------------------------------------------------+----------+--------+----------+----------+--------+---------+-------------+---------+
| 10000  | 100       | ac966f7d46c04400fb92a3603f0e2634-193113472.eu-central-1.elb.amazonaws.com  | 3306     | ONLINE | 24	     | 0        | 138    | 66      | 25          | 1309353 |
| 100    | 101       | a5c8836b7c05b41928ca84f2beb48aee-1936458168.eu-central-1.elb.amazonaws.com | 3306     | ONLINE | 0	     | 0        | 0      | 0       | 0           |       0 |
| 10000  | 200       | a039ab70e9f564f5e879d5e1374d9ffa-1769267689.eu-central-1.elb.amazonaws.com | 3306     | ONLINE | 24	     | 1        | 129    | 66      | 25          |  516407 |
| 10000  | 201       | a039ab70e9f564f5e879d5e1374d9ffa-1769267689.eu-central-1.elb.amazonaws.com | 6447     | ONLINE | 0	     | 0        | 0      | 0       | 0           |       0 |
+--------+-----------+----------------------------------------------------------------------------+----------+--------+----------+----------+--------+---------+-------------+---------+

Where we can notice that the load in connection is evenly distributed, while the load is mainly going to shard1 as we expected, given we have an unbalanced sharding by design.

At the MySQL level, we had:

Questions

MySQL

Com Type

The final point is, what is the gain of using this sharding approach?

Well, we still need to consider the fact we are testing on a very small set of data. However, if we can already identify some benefits here, that will be an interesting result. 

Let’s see the write operations with 24 and 64 threads:

MySQL writes

MySQL latency

We get a gain of ~33% just using sharding, while for latency, we do not have a cost. On the contrary, also with a small load increase, we can see how the sharded solution performs better. Of course, we are still talking about a low number of rows and running threads but the gain is there. 

Backup

The backup and restore operation when using POM is completely managed by the operator (see instructions in the POM documentation https://docs.percona.com/percona-operator-for-mysql/pxc/backups.html and https://docs.percona.com/percona-operator-for-mysql/ps/backups.html). 

The interesting part is that we can have multiple kinds of backup solutions, like:

  • On-demand
  • Scheduled 
  • Full Point in time recovery with log streaming

Automation will allow us to set a schedule as simple as this:

schedule:
     - name: "sat-night-backup"
        schedule: "0 0 * * 6"
        keep: 3
        storageName: s3-eu-west
      - name: "daily-backup"
        schedule: "0 3 * * *"
        keep: 7
        storageName: s3-eu-west

Or, if you want to run the on-demand:

kubectl apply -f backup.yaml

Where the backup.yaml file has very simple information:

apiVersion: ps.percona.com/v1alpha1
kind: PerconaServerMySQLBackup
metadata:
  name: ps-gr-sharding-test-2nd-of-may
#  finalizers:
#    - delete-backup
spec:
  clusterName: ps-mysql1
  storageName: s3-ondemand

Using both methods, we will be able to soon have a good set of backups like:

POM (PXC)

cron-mt-cluster-1-s3-eu-west-20234293010-3vsve   mt-cluster-1   s3-eu-west    s3://mt-bucket-backup-tl/scheduled/mt-cluster-1-2023-04-29-03:00:10-full   Succeeded   3d9h        3d9h
cron-mt-cluster-1-s3-eu-west-20234303010-3vsve   mt-cluster-1   s3-eu-west    s3://mt-bucket-backup-tl/scheduled/mt-cluster-1-2023-04-30-03:00:10-full   Succeeded   2d9h        2d9h
cron-mt-cluster-1-s3-eu-west-2023513010-3vsve    mt-cluster-1   s3-eu-west    s3://mt-bucket-backup-tl/scheduled/mt-cluster-1-2023-05-01-03:00:10-full   Succeeded   33h         33h
cron-mt-cluster-1-s3-eu-west-2023523010-3vsve    mt-cluster-1   s3-eu-west    s3://mt-bucket-backup-tl/scheduled/mt-cluster-1-2023-05-02-03:00:10-full   Succeeded   9h          9h

POM (PS) *

NAME                             STORAGE       DESTINATION                                                                     STATE       COMPLETED   AGE
ps-gr-sharding-test              s3-ondemand   s3://mt-bucket-backup-tl/ondemand/ondemand/ps-mysql1-2023-05-01-15:10:04-full   Succeeded   21h         21h
ps-gr-sharding-test-2nd-of-may   s3-ondemand   s3://mt-bucket-backup-tl/ondemand/ondemand/ps-mysql1-2023-05-02-12:22:24-full   Succeeded   27m         27m

Note that as DBA, we still need to validate the backups with a restore procedure. That part is not automated (yet). 

*Note that Backup for POM PS is available only on demand, given the solution is still in technical preview.

When will this solution fit in?

As mentioned multiple times, this solution can cover simple cases of sharding; better if you have shared-nothing. 

It also requires work from the DBA side in case of DDL operations or resharding. 

You also need to be able to change some SQL code to be sure to have present the sharding key/information in any SQL executed.

When will this solution not fit in?

Several things could prevent you from using this solution. The most common ones are:

  • You need to query multiple shards at the same time. This is not possible with ProxySQL.
  • You do not have a DBA to perform administrative work and need to rely on an automated system.
  • Distributed transaction cross-shard.
  • No access to SQL code.

Conclusions

We do not have the Amletic dilemma about sharding or not sharding. 

When using an RDBMS like MySQL, if you need horizontal scalability, you need to shard. 

The point is there is no magic wand or solution; moving to sharding is an expensive and impacting operation. If you choose it at the beginning, before doing any application development, the effort can be significantly less. 

Doing sooner will also allow you to test proper solutions, where proper is a KISS solution. Always go for the less complex things, because in two years you will be super happy about your decision.  

If, instead, you must convert a current solution, then prepare for bloodshed, or at least for a long journey. 

In any case, we need to keep in mind a few key points:

  • Do not believe most of the articles on the internet that promise you infinite scalability for your database. If there is no distinction in the article between a simple database and an RDBMS, run away. 
  • Do not go for the last shiny things just because they shine. Test them and evaluate IF it makes sense for you. Better to spend a quarter testing now a few solutions than fight for years with something that you do not fully comprehend.  
  • Using containers/operators/Kubernetes does not scale per se; you must find a mechanism to have the solution scaling. There is absolutely NO difference with premises. What you may get is a good level of automation. However, that will come with a good level of complexity, and it is up to you to evaluate if it makes sense or not.  

As said at the beginning, for MySQL, the choice is limited. Vitess is the full complete solution, with a lot of coding to provide you with a complete platform to deal with your scaling needs.

However, do not be so fast to exclude ProxySQL as a possible solution. There are out there already many using it also for sharding. 

This small POC used a synthetic case, but it also shows that with just four rules, you can achieve a decent solution. A real scenario could be a bit more complex … or not. 

References

Vitess (https://vitess.io/docs/)

ProxySQL (https://proxysql.com/documentation/)

Firewalling with ProxySQL (https://www.tusacentral.com/joomla/index.php/mysql-blogs/197-proxysql-firewalling)

Sharding:

 

The Percona Kubernetes Operators automate the creation, alteration, or deletion of members in your Percona Distribution for MySQL, MongoDB, or PostgreSQL environment.

 

Learn More About Percona Kubernetes Operators

Oct
09
2020
--

Amazon Aurora Multi-Primary First Impression

Amazon Aurora Multi-Primary First Impression

Amazon Aurora Multi-Primary First ImpressionFor what reason should I use a real multi-primary setup?

To be clear, not a multi-writer solution where any node can become the active writer in case of needs, as for Percona XtraDB Cluster (PXC) or Percona Server for MySQL using Group_replication. No, we are talking about a multi-primary setup where I can write at the same time on multiple nodes. I want to insist on this “why?”.

After having excluded the possible solutions mentioned above, both covering the famous 99.995% availability, which is 26.30 minutes of downtime in a year, what is left?

Disaster Recovery? Well, that is something I would love to have, but to be a real DR solution we need to put several kilometers (miles for imperial) in the middle.

And we know (see here and here) that aside from some misleading advertising, we cannot have a tightly coupled cluster solution across geographical regions.

So, what is left? I may need more HA, ok, that is a valid reason. Or I may need to scale the number of writes, which is a valid reason as well. This means, in the end, that I am looking to a multi-primary because:

  • Scale writes (more nodes more writes)
    • Consistent reads (what I write on A must be visible on B)
  • Gives me 0 (zero) downtime, or close to that (5 nines is a maximum downtime of 864 milliseconds per day!!)
  • Allow me to shift the writer pointer at any time from A to B and vice versa, consistently.

Now, keeping myself bound to the MySQL ecosystem, my natural choice would be MySQL NDB cluster. But my (virtual) boss was at AWS re-invent and someone mentioned to him that Aurora Multi-Primary does what I was looking for. This (long) article is my voyage in discovering if that is true or … not.

Given I am focused on the behavior first, and NOT interested in absolute numbers to shock the audience with millions of QPS, I will use low-level Aurora instances. And will perform tests from two EC2 in the same VPC/region of the nodes.

You can find the details about the tests on GitHub here.

Finally, I will test:

  • Connection speed
  • Stale read
  • Write single node for baseline
  • Write on both node:
    • Scaling splitting the load by schema
    • Scaling same schema

Test Results

Let us start to have some real fun. The first test is …

Connection Speed

The purpose of this test is to evaluate the time taken in opening a new connection and time taken to close it. The action of the open/close connection can be a very expensive operation, especially if applications do not use a connection pool mechanism.

Amazon Aurora Multi-Primary


As we can see, ProxySQL results to be the most efficient way to deal with opening connections, which was expected given the way it is designed to reuse open connections towards the backend.


Different is the close connection operation, in which ProxySQL seems to take a little bit longer.

As a global observation, we can say that by using ProxySQL we have more consistent behavior. Of course, this test is a simplistic one, and we are not checking the scalability (from 1 to N connections) but it is good enough to give us the initial feeling. Specific connection tests will be the focus of the next blog on Aurora MM.

Stale Reads

Aurora multi-primary uses the same mechanism of the default Aurora to update the buffer pool:


Using the Page Cache update, just doing both ways. This means that the Buffer Pool of Node2 is updated with the modification performed in Node1 and vice versa.

To verify if an application would be really able to have consistent reads, I have run this test. This test is meant to measure if, and how many, stale reads we will have when writing on a node and reading from the other.

Amazon Aurora multi-primary has two consistency models:

Aurora consistency model
As an interesting fact, the result was that with the default consistency model (INSTANCE_RAW), we got a 100% stale read.
Given that I focused on identifying the level of the cost that exists when using the other consistency model (REGIONAL_RAW), that allows an application to have consistent reads.

The results indicate an increase of 44% in total execution time, and of 95% (22 times slower) in write execution.

It is interesting to note that the time taken is in some way predictable and consistent between the two consistency models.

The graph below shows in yellow how long the application must wait to see the correct data on the reader node. In blue is the amount of time the application waits to get back the same consistent read because it must wait for the commit on the writer.

lag time in nanoseconds

As you can see, the two are more or less aligned.

Given the performance cost imposed by using REGIONAL_RAW,  all the other tests are done with the default INSTANCE_RAW, unless explicitly stated.

Writing Tests

All tests run in this section were done using sysbench-tpcc with the following settings:

sysbench ./tpcc.lua --mysql-host=<> --mysql-port=3306 --mysql-user=<> --mysql-password=<> --mysql-db=tpcc --time=300 --threads=32 --report-interval=1 --tables=10 --scale=15  --mysql_table_options=" CHARSET=utf8 COLLATE=utf8_bin"  --db-driver=mysql prepare

 sysbench /opt/tools/sysbench-tpcc/tpcc.lua --mysql-host=$mysqlhost --mysql-port=$port --mysql-user=<> --mysql-password=<> --mysql-db=tpcc --db-driver=mysql --tables=10 --scale=15 --time=$time  --rand-type=zipfian --rand-zipfian-exp=0 --report-interval=1 --mysql-ignore-errors=all --histogram  --report_csv=yes --stats_format=csv --db-ps-mode=disable --threads=$threads run

Write Single Node (Baseline)

Before starting the comparative analysis, I was looking to define what was the “limit” of traffic/load for this platform.

baseline reads/writes

From the graph above, we can see that this setup scales up to 128 threads and after that, the performance remains more or less steady.

Amazon claims that we can mainly double the performance when using both nodes in write mode and use a different schema to avoid conflict.

aurora scalability

Once more, remember I am not interested in the absolute numbers here, but I am expecting the same behavior. Given that, our expectation is to see:

expected scalability

Write on Both Nodes, Different Schemas

So AWS recommend this as the scaling solution:


And I diligently follow the advice. I used two EC2 nodes in the same subnet of the Aurora Node, writing to a different schema (tpcc & tpcc2).

Overview

Let us make it short and go straight to the point. Did we get the expected scalability?

Well, no:


We just had a 26% increase, quite far to be the expected 100% Let us see what happened in detail (if not interested just skip and go to the next test).

Node 1

Schema read writes Aurora

Node 2


As you can see, Node1 was (more or less) keeping up with the expectations and being close to the expected performance. But Node2 was just not keeping up, and performances there were just terrible.

The graphs below show what happened.

While Node1 was (again more or less) scaling up to the baseline expectations (128 threads), Node2 collapsed on its knees at 16 threads. Node2 was never able to scale up.

Reads

Node 1


Node1 is scaling the reads as expected, but also here and there we can see performance deterioration.

Node 2


Node2 is not scaling Reads at all.

Writes

Node 1


Same as Read.

Node 2


Same as read.

Now someone may think I was making a mistake and I was writing on the same schema. I assure you I was not. Check the next test to see what happened if using the same schema.

Write on Both Nodes,  Same Schema

Overview

Now, now, Marco, this is unfair. You know this will cause contention. Yes, I do! But nonetheless, I was curious to see what was going to happen and how the platform would deal with that level of contention.

My expectations were to have a lot of performance degradation and an increased number of locks. About conflict I was not wrong, node2 after the test reported:

+-------------+---------+-------------------------+
| table       | index   | PHYSICAL_CONFLICTS_HIST |
+-------------+---------+-------------------------+
| district9   | PRIMARY |                    3450 |
| district6   | PRIMARY |                    3361 |
| district2   | PRIMARY |                    3356 |
| district8   | PRIMARY |                    3271 |
| district4   | PRIMARY |                    3237 |
| district10  | PRIMARY |                    3237 |
| district7   | PRIMARY |                    3237 |
| district3   | PRIMARY |                    3217 |
| district5   | PRIMARY |                    3156 |
| district1   | PRIMARY |                    3072 |
| warehouse2  | PRIMARY |                    1867 |
| warehouse10 | PRIMARY |                    1850 |
| warehouse6  | PRIMARY |                    1808 |
| warehouse5  | PRIMARY |                    1781 |
| warehouse3  | PRIMARY |                    1773 |
| warehouse9  | PRIMARY |                    1769 |
| warehouse4  | PRIMARY |                    1745 |
| warehouse7  | PRIMARY |                    1736 |
| warehouse1  | PRIMARY |                    1735 |
| warehouse8  | PRIMARY |                    1635 |
+-------------+---------+-------------------------+

Which is obviously a strong indication something was not working right. In terms of performance gain, if we compare ONLY the result with the 128 Threads:


Also with the high level of conflict, we still have 12% of performance gain.

The problem is that in general, we have the two nodes behaving quite badly. If you check the graph below you can see that the level of conflict is such to prevent the nodes not only to scale but to act consistently.

Node 1

Write on Both Nodes,  Same Schema

Node 2


Reads

In the following graphs, we can see how node1 had issues and it actually crashed three times, during tests with 32/64/512 threads. Node2 was always up but the performances were very low.

Node 1


Node 2


Writes

Node 1


Node 2


Recovery From Crashed Node

About recovery time, reading the AWS documentation and listening to presentations, I often heard that Aurora Multi-Primary is a 0 downtime solution. Or other statements like: “in applications where you can’t afford even brief downtime for database write operations, a multi-master cluster can help to avoid an outage when a writer instance becomes unavailable. The multi-master cluster doesn’t use the failover mechanism, because it doesn’t need to promote another DB instance to have read/write capability”

To achieve this the suggestion, the solution I found was to have applications pointing directly to the Nodes endpoint and not use the Cluster endpoint.

In this context, the solution pointing to the Nodes should be able to failover within a second or so, while the cluster endpoint:

Recovery From Crashed Node

Personally, I think that designing an architecture where the application is responsible for the connection to the database and failover is some kind of refuse from 2001. But if you feel this is the way, well, go for it.

What I did for testing is to use ProxySQL, as plain as possible with nothing else, and the basic monitor coming from the native monitor. I then compared the results with the tests using the Cluster endpoint. In this way, I adopt the advice of pointing directly at the nodes, but I was doing things in our time.

The results are below and they confirm (more or less) the data coming from Amazon.


A downtime of seven seconds is quite a long time nowadays, especially if I am targeting the 5 nines solution that I want to remember is 864 ms downtime per day. Using ProxySQL is going closer to that, but still too long to be called zero downtime.
I also have fail-back issues when using the AWS cluster endpoint, given it was not able to move the connection to the joining node seamlessly.

Last but not least, when using the consistency level INSTANCE_RAW, I had some data issue as well as PK conflict:
FATAL: mysql_drv_query() returned error 1062 (Duplicate entry ‘18828082’ for key ‘PRIMARY’) 

Conclusions

As state at the beginning of this long blog, the reasonable expectations to go for a multi-primary solution were:

  • Scale writes (more nodes more writes)
  • Gives me zero downtime, or close to that (5 nines is a maximum downtime of 864 milliseconds per day!!)
  • Allow me to shift the writer pointer at any time from A to B and vice versa, consistently.

Honestly, I feel we have completely failed the scaling point. Probably if I use the largest Aurora I will get much better absolute numbers, and it will take me more to encounter the same issues, but I will. In any case, if the multi-primary solution is designed to provide that scalability, and it should do that with any version.

I did not have zero downtime, but I was able to failover pretty quickly with ProxySQL.

Finally, unless the consistency model is REGIONAL_RAW, shifting from one node to the other is not prone to possible negative effects like stale reads. Given that I consider this requirement not satisfied in full.

Given all the above, I think this solution could eventually be valid only for High Availability (close to being 5 nines), but given it comes with some limitations I do not feel comfortable in preferring it over others just for that, at the end default Aurora is already good enough as a High available solution.

References

AWS re:Invent 2019: Amazon Aurora Multi-Master: Scaling out database write performance

Working with Aurora multi-master clusters

Improving enterprises ha and disaster recovery solutions reviewed

Robust ha solutions with proxysql

Limitations of multi-master clusters

Oct
22
2018
--

One Billion Tables in MySQL 8.0 with ZFS

one billion tables MySQL

The short version

I created > one billion InnoDB tables in MySQL 8.0 (tables, not rows) just for fun. Here is the proof:

$ mysql -A
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 1425329
Server version: 8.0.12 MySQL Community Server - GPL
Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> select count(*) from information_schema.tables;
+------------+
| count(*)   |
+------------+
| 1011570298 |
+------------+
1 row in set (6 hours 57 min 6.31 sec)

Yes, it took 6 hours and 57 minutes to count them all!

Why does anyone need one billion tables?

In my previous blog post, I created and tested MySQL 8.0 with 40 million tables (that was a real case study). The One Billion Tables project is not a real world scenario, however. I was challenged by Billion Tables Project (BTP) in PostgreSQL, and decided to repeat it with MySQL, creating 1 billion InnoDB tables.

As an aside: I think MySQL 8.0 is the first MySQL version where creating 1 billion InnoDB tables is even practically possible.

Challenges with one billion InnoDB tables

Disk space

The first and one of the most important challenges is disk space. InnoDB allocates data pages on disk when creating .ibd files. Without disk level compression we need > 25Tb of disk. The good news: we have ZFS which provides transparent disk compression. Here’s how the disk utilization looks:

Actual data (apparent-size):

# du -sh --apparent-size /mysqldata/
26T     /mysqldata/

Compressed data:

# du -sh /mysqldata/
2.4T    /mysqldata/

Compression ratio:

# zfs get compression,compressratio
...
mysqldata/mysql/data             compressratio         7.14x                      -
mysqldata/mysql/data             compression           gzip                       inherited from mysqldata/mysql

(Looks like the compression ratio reported is not 100% correct, we expect ~10x compression ratio.)

Too many tiny files

This is usually the big issue with databases that create a file per table. With MySQL 8.0 we can create a shared tablespace and “assign” a table to it. I created a tablespace per database, and created 1000 tables in each database.

The result:

mysql> select count(*) from information_schema.schemata;
+----------+
| count(*) |
+----------+
|  1011575 |
+----------+
1 row in set (1.31 sec)

Creating tables

Another big challenge is how to create tables fast enough so it will not take months. I have used three approaches:

  1. Disabled all possible consistency checks in MySQL, and decreased the innodb page size to 4K (these config options are NOT for production use)
  2. Created tables in parallel: as the mutex contention bug in MySQL 8.0 has been fixed, creating tables in parallel works fine.
  3. Use local NVMe cards on top of an AWS ec2 i3.8xlarge instance

my.cnf config file (I repeat: do not use this in production):

[mysqld]
default-authentication-plugin = mysql_native_password
performance_schema=0
datadir=/mysqldata/mysql/data
socket=/mysqldata/mysql/data/mysql.sock
log-error = /mysqldata/mysql/log/error.log
skip-log-bin=1
innodb_log_group_home_dir = /mysqldata/mysql/log/
innodb_doublewrite = 0
innodb_checksum_algorithm=none
innodb_log_checksums=0
innodb_flush_log_at_trx_commit=0
innodb_log_file_size=2G
innodb_buffer_pool_size=100G
innodb_page_size=4k
innodb_flush_method=nosync
innodb_io_capacity_max=20000
innodb_io_capacity=5000
innodb_buffer_pool_instances=32
innodb_stats_persistent = 0
tablespace_definition_cache = 524288
schema_definition_cache = 524288
table_definition_cache = 524288
table_open_cache=524288
table_open_cache_instances=32
open-files-limit=1000000

ZFS pool:

# zpool status
  pool: mysqldata
 state: ONLINE
  scan: scrub repaired 0B in 1h49m with 0 errors on Sun Oct 14 02:13:17 2018
config:
        NAME        STATE     READ WRITE CKSUM
        mysqldata   ONLINE       0     0     0
          nvme0n1   ONLINE       0     0     0
          nvme1n1   ONLINE       0     0     0
          nvme2n1   ONLINE       0     0     0
          nvme3n1   ONLINE       0     0     0
errors: No known data errors

A simple “deploy” script to create tables in parallel (includes the sysbench table structure):

#/bin/bash
function do_db {
        db_exist=$(mysql -A -s -Nbe "select 1 from information_schema.schemata where schema_name = '$db'")
        if [ "$db_exist" == "1" ]; then echo "Already exists: $db"; return 0; fi;
        tbspace="create database $db; use $db; CREATE TABLESPACE $db ADD DATAFILE '$db.ibd' engine=InnoDB";
        #echo "Tablespace $db.ibd created!"
        tables=""
        for i in {1..1000}
        do
                table="CREATE TABLE sbtest$i ( id int(10) unsigned NOT NULL AUTO_INCREMENT, k int(10) unsigned NOT NULL DEFAULT '0', c varchar(120) NOT NULL DEFAULT '', pad varchar(60) NOT NULL DEFAULT '', PRIMARY KEY (id), KEY k_1 (k) ) ENGINE=InnoDB DEFAULT CHARSET=latin1 tablespace $db;"
                tables="$tables; $table;"
        done
        echo "$tbspace;$tables" | mysql
}
c=0
echo "starting..."
c=$(mysql -A -s -Nbe "select max(cast(SUBSTRING_INDEX(schema_name, '_', -1) as unsigned)) from information_schema.schemata where schema_name like 'sbtest_%'")
for m in {1..100000}
do
        echo "m=$m"
        for i in {1..30}
        do
                let c=$c+1
                echo $c
                db="sbtest_$c"
                do_db &
        done
        wait
done

How fast did we create tables? Here are some stats:

# mysqladmin -i 10 -r ex|grep Com_create_table
...
| Com_create_table                                      | 6497                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| Com_create_table                                      | 6449

So we created ~650 tables per second. The average, above, is per 10 seconds.

Counting the tables

It took > 6 hours to do “count(*) from information_schema.tables”! Here is why:

  1. MySQL 8.0 uses a new data dictionary (this is great as it avoids creating 1 billion frm files). Everything is stored in this file:
    # ls -lah /mysqldata/mysql/data/mysql.ibd
    -rw-r----- 1 mysql mysql 6.1T Oct 18 15:02 /mysqldata/mysql/data/mysql.ibd
  2. The information_schema.tables is actually a view:
mysql> show create table information_schema.tables\G
*************************** 1. row ***************************
                View: TABLES
         Create View: CREATE ALGORITHM=UNDEFINED DEFINER=`mysql.infoschema`@`localhost` SQL SECURITY DEFINER VIEW `information_schema`.`TABLES` AS select `cat`.`name` AS `TABLE_CATALOG`,`sch`.`name` AS `TABLE_SCHEMA`,`tbl`.`name` AS `TABLE_NAME`,`tbl`.`type` AS `TABLE_TYPE`,if((`tbl`.`type` = 'BASE TABLE'),`tbl`.`engine`,NULL) AS `ENGINE`,if((`tbl`.`type` = 'VIEW'),NULL,10) AS `VERSION`,`tbl`.`row_format` AS `ROW_FORMAT`,internal_table_rows(`sch`.`name`,`tbl`.`name`,if(isnull(`tbl`.`partition_type`),`tbl`.`engine`,''),`tbl`.`se_private_id`,(`tbl`.`hidden` <> 'Visible'),`ts`.`se_private_data`,coalesce(`stat`.`table_rows`,0),coalesce(cast(`stat`.`cached_time` as unsigned),0)) AS `TABLE_ROWS`,internal_avg_row_length(`sch`.`name`,`tbl`.`name`,if(isnull(`tbl`.`partition_type`),`tbl`.`engine`,''),`tbl`.`se_private_id`,(`tbl`.`hidden` <> 'Visible'),`ts`.`se_private_data`,coalesce(`stat`.`avg_row_length`,0),coalesce(cast(`stat`.`cached_time` as unsigned),0)) AS `AVG_ROW_LENGTH`,internal_data_length(`sch`.`name`,`tbl`.`name`,if(isnull(`tbl`.`partition_type`),`tbl`.`engine`,''),`tbl`.`se_private_id`,(`tbl`.`hidden` <> 'Visible'),`ts`.`se_private_data`,coalesce(`stat`.`data_length`,0),coalesce(cast(`stat`.`cached_time` as unsigned),0)) AS `DATA_LENGTH`,internal_max_data_length(`sch`.`name`,`tbl`.`name`,if(isnull(`tbl`.`partition_type`),`tbl`.`engine`,''),`tbl`.`se_private_id`,(`tbl`.`hidden` <> 'Visible'),`ts`.`se_private_data`,coalesce(`stat`.`max_data_length`,0),coalesce(cast(`stat`.`cached_time` as unsigned),0)) AS `MAX_DATA_LENGTH`,internal_index_length(`sch`.`name`,`tbl`.`name`,if(isnull(`tbl`.`partition_type`),`tbl`.`engine`,''),`tbl`.`se_private_id`,(`tbl`.`hidden` <> 'Visible'),`ts`.`se_private_data`,coalesce(`stat`.`index_length`,0),coalesce(cast(`stat`.`cached_time` as unsigned),0)) AS `INDEX_LENGTH`,internal_data_free(`sch`.`name`,`tbl`.`name`,if(isnull(`tbl`.`partition_type`),`tbl`.`engine`,''),`tbl`.`se_private_id`,(`tbl`.`hidden` <> 'Visible'),`ts`.`se_private_data`,coalesce(`stat`.`data_free`,0),coalesce(cast(`stat`.`cached_time` as unsigned),0)) AS `DATA_FREE`,internal_auto_increment(`sch`.`name`,`tbl`.`name`,if(isnull(`tbl`.`partition_type`),`tbl`.`engine`,''),`tbl`.`se_private_id`,(`tbl`.`hidden` <> 'Visible'),`ts`.`se_private_data`,coalesce(`stat`.`auto_increment`,0),coalesce(cast(`stat`.`cached_time` as unsigned),0),`tbl`.`se_private_data`) AS `AUTO_INCREMENT`,`tbl`.`created` AS `CREATE_TIME`,internal_update_time(`sch`.`name`,`tbl`.`name`,if(isnull(`tbl`.`partition_type`),`tbl`.`engine`,''),`tbl`.`se_private_id`,(`tbl`.`hidden` <> 'Visible'),`ts`.`se_private_data`,coalesce(cast(`stat`.`update_time` as unsigned),0),coalesce(cast(`stat`.`cached_time` as unsigned),0)) AS `UPDATE_TIME`,internal_check_time(`sch`.`name`,`tbl`.`name`,if(isnull(`tbl`.`partition_type`),`tbl`.`engine`,''),`tbl`.`se_private_id`,(`tbl`.`hidden` <> 'Visible'),`ts`.`se_private_data`,coalesce(cast(`stat`.`check_time` as unsigned),0),coalesce(cast(`stat`.`cached_time` as unsigned),0)) AS `CHECK_TIME`,`col`.`name` AS `TABLE_COLLATION`,internal_checksum(`sch`.`name`,`tbl`.`name`,if(isnull(`tbl`.`partition_type`),`tbl`.`engine`,''),`tbl`.`se_private_id`,(`tbl`.`hidden` <> 'Visible'),`ts`.`se_private_data`,coalesce(`stat`.`checksum`,0),coalesce(cast(`stat`.`cached_time` as unsigned),0)) AS `CHECKSUM`,if((`tbl`.`type` = 'VIEW'),NULL,get_dd_create_options(`tbl`.`options`,if((ifnull(`tbl`.`partition_expression`,'NOT_PART_TBL') = 'NOT_PART_TBL'),0,1))) AS `CREATE_OPTIONS`,internal_get_comment_or_error(`sch`.`name`,`tbl`.`name`,`tbl`.`type`,`tbl`.`options`,`tbl`.`comment`) AS `TABLE_COMMENT` from (((((`mysql`.`tables` `tbl` join `mysql`.`schemata` `sch` on((`tbl`.`schema_id` = `sch`.`id`))) join `mysql`.`catalogs` `cat` on((`cat`.`id` = `sch`.`catalog_id`))) left join `mysql`.`collations` `col` on((`tbl`.`collation_id` = `col`.`id`))) left join `mysql`.`tablespaces` `ts` on((`tbl`.`tablespace_id` = `ts`.`id`))) left join `mysql`.`table_stats` `stat` on(((`tbl`.`name` = `stat`.`table_name`) and (`sch`.`name` = `stat`.`schema_name`)))) where (can_access_table(`sch`.`name`,`tbl`.`name`) and is_visible_dd_object(`tbl`.`hidden`))
character_set_client: utf8
collation_connection: utf8_general_ci

and the explain plan looks like this:

mysql> explain select count(*) from information_schema.tables \G
*************************** 1. row ***************************
           id: 1
  select_type: SIMPLE
        table: cat
   partitions: NULL
         type: index
possible_keys: PRIMARY
          key: name
      key_len: 194
          ref: NULL
         rows: 1
     filtered: 100.00
        Extra: Using index
*************************** 2. row ***************************
           id: 1
  select_type: SIMPLE
        table: tbl
   partitions: NULL
         type: ALL
possible_keys: schema_id
          key: NULL
      key_len: NULL
          ref: NULL
         rows: 1023387060
     filtered: 100.00
        Extra: Using where; Using join buffer (Block Nested Loop)
*************************** 3. row ***************************
           id: 1
  select_type: SIMPLE
        table: sch
   partitions: NULL
         type: eq_ref
possible_keys: PRIMARY,catalog_id
          key: PRIMARY
      key_len: 8
          ref: mysql.tbl.schema_id
         rows: 1
     filtered: 11.11
        Extra: Using where
*************************** 4. row ***************************
           id: 1
  select_type: SIMPLE
        table: stat
   partitions: NULL
         type: eq_ref
possible_keys: PRIMARY
          key: PRIMARY
      key_len: 388
          ref: mysql.sch.name,mysql.tbl.name
         rows: 1
     filtered: 100.00
        Extra: Using index
*************************** 5. row ***************************
           id: 1
  select_type: SIMPLE
        table: ts
   partitions: NULL
         type: eq_ref
possible_keys: PRIMARY
          key: PRIMARY
      key_len: 8
          ref: mysql.tbl.tablespace_id
         rows: 1
     filtered: 100.00
        Extra: Using index
*************************** 6. row ***************************
           id: 1
  select_type: SIMPLE
        table: col
   partitions: NULL
         type: eq_ref
possible_keys: PRIMARY
          key: PRIMARY
      key_len: 8
          ref: mysql.tbl.collation_id
         rows: 1
     filtered: 100.00
        Extra: Using index

Conclusions

  1. I have created more than 1 billion real InnoDB tables with indexes in MySQL 8.0, just for fun, and it worked. It took ~2 weeks to create.
  2. Probably MySQL 8.0 is the first version where it is even practically possible to create billion InnoDB tables
  3. ZFS compression together with NVMe cards makes it reasonably cheap to do, for example, by using i3.4xlarge or i3.8xlarge instances on AWS.

one billion tables MySQL

Sep
28
2018
--

Scaling Percona Monitoring and Management (PMM)

PMM tested with 1000 nodes

Starting with PMM 1.13,  PMM uses Prometheus 2 for metrics storage, which tends to be heaviest resource consumer of CPU and RAM.  With Prometheus 2 Performance Improvements, PMM can scale to more than 1000 monitored nodes per instance in default configuration. In this blog post we will look into PMM scaling and capacity planning—how to estimate the resources required, and what drives resource consumption.

PMM tested with 1000 nodes

We have now tested PMM with up to 1000 nodes, using a virtualized system with 128GB of memory, 24 virtual cores, and SSD storage. We found PMM scales pretty linearly with the available memory and CPU cores, and we believe that a higher number of nodes could be supported with more powerful hardware.

What drives resource usage in PMM ?

Depending on your system configuration and workload, a single node can generate very different loads on the PMM server. The main factors that impact the performance of PMM are:

  1. Number of samples (data points) injected into PMM per second
  2. Number of distinct time series they belong to (cardinality)
  3. Number of distinct query patterns your application uses
  4. Number of queries you have on PMM, through the user interface on the API, and their complexity

These specifically can be impacted by:

  • Software version – modern database software versions expose more metrics)
  • Software configuration – some metrics are only exposed in certain configuration
  • Workload – a large number of database objects and high concurrency will increase both the number of samples ingested and their cardinality.
  • Exporter configuration – disabling collectors can reduce amount of data collectors
  • Scrape frequency –  controlled by METRICS_RESOLUTION

All these factors together may impact resource requirements by a factor of ten or more, so do your own testing to be sure. However, the numbers in this article should serve as good general guidance as a start point for your research.

On the system supporting 1000 instances we observed the following performance:

Performance PMM 1000 nodes load

As you can see, we have more than 2.000 scrapes/sec performed, providing almost two million samples/sec, and more than eight million active time series. These are the main numbers that define the load placed on Prometheus.

Capacity planning to scale PMM

Both CPU and memory are very important resources for PMM capacity planning. Memory is the more important as Prometheus 2 does not have good options for limiting memory consumption. If you do not have enough memory to handle your workload, then it will run out of memory and crash.

We recommend at least 2GB of memory for a production PMM Installation. A test installation with 1GB of memory is possible. However, it may not be able to monitor more than one or two nodes without running out of memory. With 2GB of memory you should be able to monitor at least five nodes without problem.

With powerful systems (8GB of more) you can have approximately eight systems per 1GB of memory, or about 15,000 samples ingested/sec per 1GB of memory.

To calculate the CPU usage resources required, allow for about 50 monitored systems per core (or 100K metrics/sec per CPU core).

One problem you’re likely to encounter if you’re running PMM with 100+ instances is the “Home Dashboard”. This becomes way too heavy with such a large number of servers. We plan to fix this issue in future releases of PMM, but for now you can work around it in two simple ways:

You can select the host, for example “pmm-server” in your home dashboard and save it, before adding a large amount of hosts to the system.

set home dashboard for PMM

Or you can make some other dashboard of your choice and set it as the home dashboard.

Summary

  • More than 1,000 monitored systems is possible per single PMM server
  • Your specific workload and configuration may significantly change the resources required
  • If deploying with 8GB or more, plan 50 systems per core, and eight systems per 1GB of RAM

The post Scaling Percona Monitoring and Management (PMM) appeared first on Percona Database Performance Blog.

Aug
06
2018
--

Webinar Tues 8/14: Utilizing ProxySQL for Connection Pooling in PHP

ProxySQL for connection pooling

ProxySQL for connection poolingPlease join Percona’s Architect, Tibi Köröcz as he presents Utilizing ProxySQL for Connection Pooling in PHP on Tuesday August 14, 2018, at 8:00 am PDT (UTC-7) / 11:00 am EDT (UTC-4).

 

ProxySQL is a very powerful tool, with extended capabilities. This presentation will demonstrate how to use ProxySQL to gain functionality (seamless database backend switch) and correct problems (applications missing connection pooling).

The presentation will be a real-life study on how we use ProxySQL for connection pooling, database failover and load balancing the communication between our (third party) PHP-application and our master-master MySQL-cluster.
Also, we will show monitoring and statistics using Percona Monitoring and Management (PMM).

Register Now!

Tibor Köröcz

Architect

ProxySQL for Connection Pooling

Tibi joined Percona in 2015 as a Consultant. Before joining Percona, among many other things, he worked at the world’s largest car hire booking service as a Senior Database Engineer. He enjoys trying and working with the latest technologies and applications which can help or work with MySQL together. In his spare time he likes to spend time with his friends, travel around the world and play ultimate frisbee.

 

The post Webinar Tues 8/14: Utilizing ProxySQL for Connection Pooling in PHP appeared first on Percona Database Performance Blog.

Aug
02
2018
--

Amazon RDS Multi-AZ Deployments and Read Replicas

RDS Multi-AZ

Amazon RDS is a managed relational database service that makes it easier to set up, operate, and scale a relational database in the cloud. One of the common questions that we get is “What is Multi-AZ and how it’s different from Read Replica, do I need both?”.  I have tried to answer this question in this blog post and it depends on your application needs. Are you looking for High Availability (HA), read scalability … or both?

Before we go to into detail, let me explain two common terms used with Amazon AWS.

Region – an AWS region is a separate geographical area like US East (N. Virginia), Asia Pacific (Mumbai), EU (London) etc. Each AWS Region has multiple, isolated locations known as Availability Zones.

Availability Zone (AZ) – AZ is simply one or more data centers, each with redundant power, networking and connectivity, housed in separate facilities. Data centers are geographically isolated within the same region.

What is Multi-AZ?

Amazon RDS provides high availability and failover support for DB instances using Multi-AZ deployments.

In a Multi-AZ deployment, Amazon RDS automatically provisions and maintains a synchronous standby replica of the master DB in a different Availability Zone. The primary DB instance is synchronously replicated across Availability Zones to the standby replica to provide data redundancy, failover support and to minimize latency during system backups. In the event of planned database maintenance, DB instance failure, or an AZ failure of your primary DB instance, Amazon RDS automatically performs a failover to the standby so that database operations can resume quickly without administrative intervention.

You can check in the AWS management console if a database instance is configured as Multi-AZ. Select the RDS service, click on the DB instance and review the details section.

AWS management console showing that instance is Multi-AZ

This screenshot from AWS management console (above) shows that the database is hosted as Multi-AZ deployment and the standby replica is deployed in us-east-1a AZ.

Benefits of Multi-AZ deployment:

  • Replication to a standby replica is synchronous which is highly durable.
  • When a problem is detected on the primary instance, it will automatically failover to the standby in the following conditions:
    • The primary DB instance fails
    • An Availability Zone outage
    • The DB instance server type is changed
    • The operating system of the DB instance is undergoing software patching.
    • A manual failover of the DB instance was initiated using Reboot with failover.
  • The endpoint of the DB instance remains the same after a failover, the application can resume database operations without manual intervention.
  • If a failure occurs, your availability impact is limited to the time that the automatic failover takes to complete. This helps to achieve increased availability.
  • It reduces the impact of maintenance. RDS performs maintenance on the standby first, promotes the standby to primary master, and then performs maintenance on the old master which is now a standby replica.
  • To prevent any negative impact of the backup process on performance, Amazon RDS creates a backup from the standby replica.

Amazon RDS does not failover automatically in response to database operations such as long-running queries, deadlocks or database corruption errors. Also, the Multi-AZ deployments are limited to a single region only, cross-region Multi-AZ is not currently supported.

Can I use an RDS standby replica for read scaling?

The Multi-AZ deployments are not a read scaling solution, you cannot use a standby replica to serve read traffic. Multi-AZ maintains a standby replica for HA/failover. It is available for use only when RDS promotes the standby instance as the primary. To service read-only traffic, you should use a Read Replica instead.

What is Read Replica?

Read replicas allow you to have a read-only copy of your database.

When you create a Read Replica, you first specify an existing DB instance as the source. Then Amazon RDS takes a snapshot of the source instance and creates a read-only instance from the snapshot. You can use MySQL native asynchronous replication to keep Read Replica up-to-date with the changes. The source DB must have automatic backups enabled for setting up read replica.

Benefits of Read Replica

  • Read Replica helps in decreasing load on the primary DB by serving read-only traffic.
  • A Read Replica can be manually promoted as a standalone database instance.
  • You can create Read Replicas within AZ, Cross-AZ or Cross-Region.
  • You can have up to five Read Replicas per master, each with own DNS endpoint. Unlike a Multi-AZ standby replica, you can connect to each Read Replica and use them for read scaling.
  • You can have Read Replicas of Read Replicas.
  • Read Replicas can be Multi-AZ enabled.
  • You can use Read Replicas to take logical backups (mysqldump/mydumper) if you want to store the backups externally to RDS.
  • Read Replica helps to maintain a copy of databases in a different region for disaster recovery.

At AWS re:Invent 2017, AWS announced the preview for Amazon Aurora Multi-Master, this will allow users to create multiple Aurora writer nodes and helps in scaling reads/writes across multiple AZs. You can sign up for preview here.

Conclusion

While both (Multi-AZ and Read replica) maintain a copy of database but they are different in nature. Use Multi-AZ deployments for High Availability and Read Replica for read scalability. You can further set up a cross-region read replica for disaster recovery.

The post Amazon RDS Multi-AZ Deployments and Read Replicas appeared first on Percona Database Performance Blog.

Jun
19
2018
--

Webinar Weds 20/6: Percona XtraDB Cluster 5.7 Tutorial Part 2

webinar Percona XtraDB Cluster

Including setting up Percona XtraDB Cluster with ProxySQL and PMM

webinar Percona XtraDB ClusterPlease join Percona’s Architect, Tibi Köröcz as he presents Percona XtraDB Cluster 5.7 Tutorial Part 2 on Wednesday, June 20th, 2018, at 7:00 am PDT (UTC-7) / 10:00 am EDT (UTC-4).

 

Never used Percona XtraDB Cluster before? This is the webinar for you! In this 45-minute webinar, we will introduce you to a fully functional Percona XtraDB Cluster.

This webinar will show you how to install Percona XtraDB Cluster with ProxySQL, and monitor it with Percona Monitoring and Management (PMM).

We will also cover topics like bootstrap, IST, SST, certification, common-failure situations and online schema changes.

After this webinar, you will have enough knowledge to set up a working Percona XtraDB Cluster with ProxySQL, in order to meet your high availability requirements.

You can see part one of this series here: Percona XtraDB Cluster 5.7 Tutorial Part 1

Register Now!

Tibor Köröcz

Architect

ProxySQL for Connection Pooling

Tibi joined Percona in 2015 as a Consultant. Before joining Percona, among many other things, he worked at the world’s largest car hire booking service as a Senior Database Engineer. He enjoys trying and working with the latest technologies and applications which can help or work with MySQL together. In his spare time he likes to spend time with his friends, travel around the world and play ultimate frisbee.

 

The post Webinar Weds 20/6: Percona XtraDB Cluster 5.7 Tutorial Part 2 appeared first on Percona Database Performance Blog.

Apr
21
2014
--

Using Apache Hadoop and Impala together with MySQL for data analysis

Apache Hadoop is commonly used for data analysis. It is fast for data loads and scalable. In a previous post I showed how to integrate MySQL with Hadoop. In this post I will show how to export a table from  MySQL to Hadoop, load the data to Cloudera Impala (columnar format) and run a reporting on top of that. For the examples below I will use the “ontime flight performance” data from my previous post (Increasing MySQL performance with parallel query execution). I’ve used the Cloudera Manager v.4 to install Apache Hadoop and Impala. For this test I’ve (intentionally) used an old hardware (servers from 2006) to show that Hadoop can utilize the old hardware and still scale. The test cluster consists of 6 datanodes. Below are the specs:

Purpose Server specs
Namenode, Hive metastore, etc + Datanodes 2x PowerEdge 2950, 2x L5335 CPU @ 2.00GHz, 8 cores, 16G RAM, RAID 10 with 8 SAS drives
Datanodes only 4x PowerEdge SC1425, 2x Xeon CPU @ 3.00GHz, 2 cores, 8G RAM, single 4TB drive

As you can see those a pretty old servers; the only thing I’ve changed is added a 4TB drive to be able to store more data. Hadoop provides redundancy on the server level (it writes 3 copies of the same block to all datanodes) so we do not need RAID on the datanodes (need redundancy for namenodes thou).

Data export

There are a couple of ways to export data from MySQL to Hadoop. For the purpose of this test I have simply exported the ontime table into a text file with:

select * into outfile '/tmp/ontime.psv' 
FIELDS TERMINATED BY ','
from ontime;

(you can use “|” or any other symbol as a delimiter) Alternatively, you can download data directly from www.transtats.bts.gov site using this simple script:

for y in {1988..2013}
do
        for i in {1..12}
        do
                u="http://www.transtats.bts.gov/Download/On_Time_On_Time_Performance_${y}_${i}.zip"
                wget $u -o ontime.log
                unzip On_Time_On_Time_Performance_${y}_${i}.zip
        done
done

Load into Hadoop HDFS

First thing we will need to do is to load data into HDFS as a set of files. Hive or Impala it will work with a directory to which you have imported your data and concatenate all files inside this directory. In our case it is easy to simply copy all our files into the directory inside HDFS

$ hdfs dfs -mkdir /data/ontime/
$ hdfs -v dfs -copyFromLocal On_Time_On_Time_Performance_*.csv /data/ontime/

 Create external table in Impala

Now, when we have all data files loaded we can create an external table:

CREATE EXTERNAL TABLE ontime_csv (
YearD int ,
Quarter tinyint ,
MonthD tinyint ,
DayofMonth tinyint ,
DayOfWeek tinyint ,
FlightDate string ,
UniqueCarrier string ,
AirlineID int ,
Carrier string ,
TailNum string ,
FlightNum string ,
OriginAirportID int ,
OriginAirportSeqID int ,
OriginCityMarketID int ,
Origin string ,
OriginCityName string ,
OriginState string ,
OriginStateFips string ,
OriginStateName string ,
OriginWac int ,
DestAirportID int ,
DestAirportSeqID int ,
DestCityMarketID int ,
Dest string ,
...
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE 
LOCATION '/data/ontime';

Note the “EXTERNAL” keyword and LOCATION (LOCATION points to a directory inside HDFS, not a file). The impala will create a meta information only (will not modify the table). We can query this table right away, however, impala will need to scan all files (full scan) for queries.

Example:

[d30.local:21000] > select yeard, count(*) from ontime_psv  group by yeard;
Query: select yeard, count(*) from ontime_psv  group by yeard
+-------+----------+
| yeard | count(*) |
+-------+----------+
| 2010  | 6450117  |
| 2013  | 5349447  |
| 2009  | 6450285  |
| 2002  | 5271359  |
| 2004  | 7129270  |
| 1997  | 5411843  |
| 2012  | 6096762  |
| 2005  | 7140596  |
| 1999  | 5527884  |
| 2007  | 7455458  |
| 1994  | 5180048  |
| 2008  | 7009726  |
| 1988  | 5202096  |
| 2003  | 6488540  |
| 1996  | 5351983  |
| 1989  | 5041200  |
| 2011  | 6085281  |
| 1998  | 5384721  |
| 1991  | 5076925  |
| 2006  | 7141922  |
| 1993  | 5070501  |
| 2001  | 5967780  |
| 1995  | 5327435  |
| 1990  | 5270893  |
| 1992  | 5092157  |
| 2000  | 5683047  |
+-------+----------+
Returned 26 row(s) in 131.38s

(Note that “group by” will not sort the rows, unlike MySQL. To sort we will need to add “ORDER BY yeard”)

Explain plan:

Query: explain select yeard, count(*) from ontime_csv  group by yeard
+-----------------------------------------------------------+
| Explain String                                            |
+-----------------------------------------------------------+
| PLAN FRAGMENT 0                                           |
|   PARTITION: UNPARTITIONED                                |
|                                                           |
|   4:EXCHANGE                                              |
|                                                           |
| PLAN FRAGMENT 1                                           |
|   PARTITION: HASH_PARTITIONED: yeard                      |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 4                                        |
|     UNPARTITIONED                                         |
|                                                           |
|   3:AGGREGATE (merge finalize)                            |
|   |  output: SUM(COUNT(*))                                |
|   |  group by: yeard                                      |
|   |                                                       |
|   2:EXCHANGE                                              |
|                                                           |
| PLAN FRAGMENT 2                                           |
|   PARTITION: RANDOM                                       |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 2                                        |
|     HASH_PARTITIONED: yeard                               |
|                                                           |
|   1:AGGREGATE                                             |
|   |  output: COUNT(*)                                     |
|   |  group by: yeard                                      |
|   |                                                       |
|   0:SCAN HDFS                                             |
|      table=ontime.ontime_csv #partitions=1/1 size=45.68GB |
+-----------------------------------------------------------+
Returned 31 row(s) in 0.13s

As we can see it will scan 45G of data.

Impala with columnar format and compression

The great benefit of the impala is that it supports columnar format and compression. I’ve tried the new “parquet” format with “snappy” compression codec. As our table is very wide (and de-normalized) it will help alot to use columnar format. To take advantages of the “parquet” format we will need to load data into it, which is easy to do when we already have a table inside impala and files inside HDFS:

[d30.local:21000] > set PARQUET_COMPRESSION_CODEC=snappy;
[d30.local:21000] > create table ontime_parquet_snappy LIKE ontime_parquet_snappy STORED AS PARQUET;
[d30.local:21000] > insert into ontime_parquet_snappy select * from ontime_csv;
Query: insert into ontime_parquet_snappy select * from ontime_csv
Inserted 152657276 rows in 729.76s

Then we can test our query against the new table:

Query: explain select yeard, count(*) from ontime_parquet_snappy  group by yeard
+---------------------------------------------------------------------+
| Explain String                                                      |
+---------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                     |
|   PARTITION: UNPARTITIONED                                          |
|                                                                     |
|   4:EXCHANGE                                                        |
|                                                                     |
| PLAN FRAGMENT 1                                                     |
|   PARTITION: HASH_PARTITIONED: yeard                                |
|                                                                     |
|   STREAM DATA SINK                                                  |
|     EXCHANGE ID: 4                                                  |
|     UNPARTITIONED                                                   |
|                                                                     |
|   3:AGGREGATE (merge finalize)                                      |
|   |  output: SUM(COUNT(*))                                          |
|   |  group by: yeard                                                |
|   |                                                                 |
|   2:EXCHANGE                                                        |
|                                                                     |
| PLAN FRAGMENT 2                                                     |
|   PARTITION: RANDOM                                                 |
|                                                                     |
|   STREAM DATA SINK                                                  |
|     EXCHANGE ID: 2                                                  |
|     HASH_PARTITIONED: yeard                                         |
|                                                                     |
|   1:AGGREGATE                                                       |
|   |  output: COUNT(*)                                               |
|   |  group by: yeard                                                |
|   |                                                                 |
|   0:SCAN HDFS                                                       |
|      table=ontime.ontime_parquet_snappy #partitions=1/1 size=3.95GB |
+---------------------------------------------------------------------+
Returned 31 row(s) in 0.02s

As we can see it will scan much smaller amount of data: 3.95 (with compression) compared to 45GB

Results:

Query: select yeard, count(*) from ontime_parquet_snappy  group by yeard
+-------+----------+
| yeard | count(*) |
+-------+----------+
| 2010  | 6450117  |
| 2013  | 5349447  |
| 2009  | 6450285  |
...
Returned 26 row(s) in 4.17s

And the response time is much better as well.

Impala complex query example

I’ve used the complex query from my previous post. I had to adapt it for use with Impala: it does not support “sum(ArrDelayMinutes>30)” notation but “sum(if(ArrDelayMinutes>30, 1, 0)” works fine.

select
   min(yeard), max(yeard), Carrier, count(*) as cnt,
   sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed,
   round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate
FROM ontime_parquet_snappy
WHERE
   DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI')
   and DestState not in ('AK', 'HI', 'PR', 'VI')
   and flightdate < '2010-01-01'
GROUP by carrier
HAVING cnt > 100000 and max(yeard) > 1990
ORDER by rate DESC
LIMIT 1000;

The query is intentionally designed the way it does not take advantage of the indexes: most of the conditions will only filter out less than 30% of the data.

Impala results:

+------------+------------+---------+----------+-----------------+------+
| min(yeard) | max(yeard) | carrier | cnt      | flights_delayed | rate |
+------------+------------+---------+----------+-----------------+------+
| 2003       | 2009       | EV      | 1454777  | 237698          | 0.16 |
| 2003       | 2009       | FL      | 1082489  | 158748          | 0.15 |
| 2006       | 2009       | XE      | 1016010  | 152431          | 0.15 |
| 2003       | 2009       | B6      | 683874   | 103677          | 0.15 |
| 2006       | 2009       | YV      | 740608   | 110389          | 0.15 |
| 2003       | 2005       | DH      | 501056   | 69833           | 0.14 |
| 2001       | 2009       | MQ      | 3238137  | 448037          | 0.14 |
| 2004       | 2009       | OH      | 1195868  | 160071          | 0.13 |
| 2003       | 2006       | RU      | 1007248  | 126733          | 0.13 |
| 2003       | 2006       | TZ      | 136735   | 16496           | 0.12 |
| 1988       | 2009       | UA      | 9593284  | 1197053         | 0.12 |
| 1988       | 2009       | AA      | 10600509 | 1185343         | 0.11 |
| 1988       | 2001       | TW      | 2659963  | 280741          | 0.11 |
| 1988       | 2009       | CO      | 6029149  | 673863          | 0.11 |
| 2007       | 2009       | 9E      | 577244   | 59440           | 0.10 |
| 1988       | 2009       | US      | 10276941 | 991016          | 0.10 |
| 2003       | 2009       | OO      | 2654259  | 257069          | 0.10 |
| 1988       | 2009       | NW      | 7601727  | 725460          | 0.10 |
| 1988       | 2009       | DL      | 11869471 | 1156267         | 0.10 |
| 1988       | 2009       | AS      | 1506003  | 146920          | 0.10 |
| 1988       | 2005       | HP      | 2607603  | 235675          | 0.09 |
| 2005       | 2009       | F9      | 307569   | 28679           | 0.09 |
| 1988       | 1991       | PA      | 206841   | 19465           | 0.09 |
| 1988       | 2009       | WN      | 12722174 | 1107840         | 0.09 |
+------------+------------+---------+----------+-----------------+------+
Returned 24 row(s) in 15.28s

15.28 seconds is significantly faster than original MySQL results (15 min 56.40 sec without parallel execution and  5 min 47 with the parallel execution). However, this is not “apple to apple comparison”:

  • MySQL will scan 45G of data and Impala with parquet will only scan 3.5G
  • MySQL will run on a single server, Hadoop + Impala will run in parallel on 6 servers.

Nevertheless, Hadoop + Implala shows impressive performance and ability to scale out the box, which can help a lot with the large data volume analysis.

Conclusion

Hadoop + Impala will give us an easy way to analyze large datasets using SQL with the ability to scale even on the old hardware.

In my next posts I will plan to explore:

As always, please share your thoughts in the comments.

The post Using Apache Hadoop and Impala together with MySQL for data analysis appeared first on MySQL Performance Blog.

Jan
26
2011
--

Modeling InnoDB Scalability on Multi-Core Servers

Mat Keep’s blog post on InnoDB-vs-MyISAM benchmarks that Oracle recently published prompted me to do some mathematical modeling of InnoDB’s scalability as the number of cores in the server increases. Vadim runs lots of benchmarks that measure what happens under increasing concurrency while holding the hardware constant, but not as many with varying numbers of cores, so I decided to use Mat Keep’s data for this. The modeling I performed is Universal Scalability Law modeling, which can predict both software and hardware scalability, depending on how it is used.

In brief, the benchmarks are sysbench’s read-only and read-write tests, and the server has two Intel SSDs, 64GB of memory, and 4 x 12-core AMD Opteron 6172 “Magny-Cours” 2.1GHz CPUs. It is a reasonably typical commodity machine except for the high core count, which is more than I can remember seeing in the wild. The database was MySQL 5.5.7-rc. I am not sure why they didn’t run the GA version of MySQL for this benchmark. Maybe they wrote the paper before 5.5 went GA.

The following are plots of the read-only and read-write scalability models that I generated, based on these benchmarks.

Read-Only Results

Read-Only Results

Read-Write Results

Read-Write Results

The model predicts that the server will continue to provide more throughput as the core count climbs into the mid-50s, although the bang for the buck isn’t very good at that point. Also, there appears to be some bottleneck that hits more sharply than the model predicts at high core counts. It would be great if the benchmark were re-run with the same core counts and with sysbench on another machine, instead of taking 12 cores away from MySQL and giving them to sysbench. That way we could test with 48 cores and see what happens. My gut feeling is that the results will not be as good as the model predicts at high numbers of cores. But as Neil Gunther says, this wouldn’t mean the model is broken; it would mean that there is potentially something to fix in the server at high core counts. Without the model, there wouldn’t even be a basis for discussion.

The biggest thing I want to point out here is the dramatic improvement over just a few years ago, when you could “upgrade” from 4 to 8 cores and see a reduction in throughput. Oracle (and Percona, and lots of others) have done great work in the last couple of years making InnoDB scale and perform better on modern hardware.

Nov
16
2010
--

Percona white paper: Forecasting MySQL Scalability

Ewen and I have just published Percona’s latest white paper, Forecasting MySQL Scalability with the Universal Scalability Law. This is essentially a streamlined walk-through of Dr. Neil J. Gunther’s book Guerrilla Capacity Planning, with examples to show how you can apply it to MySQL servers.

One thing alluded to in the paper is extracting the necessary metrics from network traffic. I had this idea after studying the data in Linux’s /proc/diskstats file. It turns out that two simple metrics can provide amazingly rich insight into system performance and scalability, in combination with Little’s Law and queueing theory. These are the busy time and the total time that requests were resident in the system. There are different terms for the latter, but in MySQL we’d call it query response time. After studying these for a few months, I’m so awed by how useful they are that I am going to make a blanket recommendation: if you create server software, you must expose these simple metrics. (I have filed a feature request for Percona Server to add these metrics for MySQL users.)

I’ll probably follow this up with another blog post or white paper at some point in the future, to show how to use the busy time and query response time to predict a system’s scalability.


Entry posted by Baron Schwartz |
3 comments

Add to: delicious | digg | reddit | netscape | Google Bookmarks

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com