Feb
21
2019
--

Parallel queries in PostgreSQL

parallel queries in postgresql

PostgreSQL logoModern CPU models have a huge number of cores. For many years, applications have been sending queries in parallel to databases. Where there are reporting queries that deal with many table rows, the ability for a query to use multiple CPUs helps us with a faster execution. Parallel queries in PostgreSQL allow us to utilize many CPUs to finish report queries faster. The parallel queries feature was implemented in 9.6 and helps. Starting from PostgreSQL 9.6 a report query is able to use many CPUs and finish faster.

The initial implementation of the parallel queries execution took three years. Parallel support requires code changes in many query execution stages. PostgreSQL 9.6 created an infrastructure for further code improvements. Later versions extended parallel execution support for other query types.

Limitations

  • Do not enable parallel executions if all CPU cores are already saturated. Parallel execution steals CPU time from other queries, and increases response time.
  • Most importantly, parallel processing significantly increases memory usage with high WORK_MEM values, as each hash join or sort operation takes a work_mem amount of memory.
  • Next, low latency OLTP queries can’t be made any faster with parallel execution. In particular, queries that returns a single row can perform badly when parallel execution is enabled.
  • The Pierian spring for developers is a TPC-H benchmark. Check if you have similar queries for the best parallel execution.
  • Parallel execution supports only SELECT queries without lock predicates.
  • Proper indexing might be a better alternative to a parallel sequential table scan.
  • There is no support for cursors or suspended queries.
  • Windowed functions and ordered-set aggregate functions are non-parallel.
  • There is no benefit for an IO-bound workload.
  • There are no parallel sort algorithms. However, queries with sorts still can be parallel in some aspects.
  • Replace CTE (WITH …) with a sub-select to support parallel execution.
  • Foreign data wrappers do not currently support parallel execution (but they could!)
  • There is no support for FULL OUTER JOIN.
  • Clients setting max_rows disable parallel execution.
  • If a query uses a function that is not marked as PARALLEL SAFE, it will be single-threaded.
  • SERIALIZABLE transaction isolation level disables parallel execution.

Test environment

The PostgreSQL development team have tried to improve TPC-H benchmark queries’ response time. You can download the benchmark and adapt it to PostgreSQL by using these instructions. It’s not an official way to use the TPC-H benchmark, so you shouldn’t use it to compare different databases or hardware.

  1. Download TPC-H_Tools_v2.17.3.zip (or newer version) from official TPC site.
  2. Rename makefile.suite to Makefile and modify it as requested at https://github.com/tvondra/pg_tpch . Compile the code with make command
  3. Generate data: ./dbgen -s 10 generates 23GB database which is enough to see the difference in performance for parallel and non-parallel queries.
  4. Convert tbl files to csv with for + sed
  5. Clone pg_tpch repository and copy csv files to pg_tpch/dss/data
  6. Generate queries with qgen command
  7. Load data to the database with ./tpch.sh command.

Parallel sequential scan

This might be faster not because of parallel reads, but due to scattering of data across many CPU cores. Modern OS provides good caching for PostgreSQL data files. Read-ahead allows getting a block from storage more than just the block requested by PG daemon. As a result, query performance is not limited due to disk IO. It consumes CPU cycles for:

  • reading rows one by one from table data pages
  • comparing row values and WHERE conditions

Let’s try to execute simple select query:

tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 0.203 ms
Execution Time: 19035.100 ms

A sequential scan produces too many rows without aggregation. So, the query is executed by a single CPU core.

After adding SUM(), it’s clear to see that two workers will help us to make the query faster:

explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
-> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
-> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 382112
Planning Time: 0.241 ms
Execution Time: 8555.131 ms

The more complex query is 2.2X faster compared to the plain, single-threaded select.

Parallel Aggregation

A “Parallel Seq Scan” node produces rows for partial aggregation. A “Partial Aggregate” node reduces these rows with SUM(). At the end, the SUM counter from each worker collected by “Gather” node.

The final result is calculated by the “Finalize Aggregate” node. If you have your own aggregation functions, do not forget to mark them as “parallel safe”.

Number of workers

We can increase the number of workers without server restart:

alter system set max_parallel_workers_per_gather=4;
select * from pg_reload_conf();
Now, there are 4 workers in explain output:
tpch=# explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1440213.58..1440213.59 rows=1 width=32) (actual time=5152.072..5152.072 rows=1 loops=1)
-> Gather (cost=1440213.15..1440213.56 rows=4 width=32) (actual time=5151.807..5153.900 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (cost=1439213.15..1439213.16 rows=1 width=32) (actual time=5147.238..5147.239 rows=1 loops=5)
-> Parallel Seq Scan on lineitem (cost=0.00..1402428.00 rows=14714059 width=5) (actual time=0.037..3601.882 rows=11767943 loops=5)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 229267
Planning Time: 0.218 ms
Execution Time: 5153.967 ms

What’s happening here? We have changed the number of workers from 2 to 4, but the query became only 1.6599 times faster. Actually, scaling is amazing. We had two workers plus one leader. After a configuration change, it becomes 4+1.

The biggest improvement from parallel execution that we can achieve is: 5/3 = 1.66(6)X faster.

How does it work?

Processes

Query execution always starts in the “leader” process. A leader executes all non-parallel activity and its own contribution to parallel processing. Other processes executing the same queries are called “worker” processes. Parallel execution utilizes the Dynamic Background Workers infrastructure (added in 9.4). As other parts of PostgreSQL uses processes, but not threads, the query creating three worker processes could be 4X faster than the traditional execution.

Communication

Workers communicate with the leader using a message queue (based on shared memory). Each process has two queues: one for errors and the second one for tuples.

How many workers to use?

Firstly, the max_parallel_workers_per_gather parameter is the smallest limit on the number of workers. Secondly, the query executor takes workers from the pool limited by max_parallel_workers size. Finally, the top-level limit is max_worker_processes: the total number of background processes.

Failed worker allocation leads to single-process execution.

The query planner could consider decreasing the number of workers based on a table or index size. min_parallel_table_scan_size and min_parallel_index_scan_size control this behavior.

set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker

Each time the table is 3X bigger than min_parallel_(index|table)_scan_size, postgres adds a worker. The number of workers is not cost-based! A circular dependency makes a complex implementation hard. Instead, the planner uses simple rules.

In practice, these rules are not always acceptable in production and you can override the number of workers for the specific table with ALTER TABLE … SET (parallel_workers = N).

Why parallel execution is not used?

Besides to the long list of parallel execution limitations, PostgreSQL checks costs:

parallel_setup_cost to avoid parallel execution for short queries. It models the time spent for memory setup, process start, and initial communication

parallel_tuple_cost : The communication between leader and workers could take a long time. The time is proportional to the number of tuples sent by workers. The parameter models the communication cost.

Nested loop joins

PostgreSQL 9.6+ could execute a “Nested loop” in parallel due to the simplicity of the operation.

explain (costs off) select c_custkey, count(o_orderkey)
                from    customer left outer join orders on
                                c_custkey = o_custkey and o_comment not like '%special%deposits%'
                group by c_custkey;
                                      QUERY PLAN
--------------------------------------------------------------------------------------
 Finalize GroupAggregate
   Group Key: customer.c_custkey
   ->  Gather Merge
         Workers Planned: 4
         ->  Partial GroupAggregate
               Group Key: customer.c_custkey
               ->  Nested Loop Left Join
                     ->  Parallel Index Only Scan using customer_pkey on customer
                     ->  Index Scan using idx_orders_custkey on orders
                           Index Cond: (customer.c_custkey = o_custkey)
                           Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

Gather happens in the last stage, so “Nested Loop Left Join” is a parallel operation. “Parallel Index Only Scan” is available from version 10. It acts in a similar way to a parallel sequential scan. The

c_custkey = o_custkey

condition reads a single order for each customer row. Thus it’s not parallel.

Hash Join

Each worker builds its own hash table until PostgreSQL 11. As a result, 4+ workers weren’t able to improve performance. The new implementation uses a shared hash table. Each worker can utilize WORK_MEM to build the hash table.

select
        l_shipmode,
        sum(case
                when o_orderpriority = '1-URGENT'
                        or o_orderpriority = '2-HIGH'
                        then 1
                else 0
        end) as high_line_count,
        sum(case
                when o_orderpriority <> '1-URGENT'
                        and o_orderpriority <> '2-HIGH'
                        then 1
                else 0
        end) as low_line_count
from
        orders,
        lineitem
where
        o_orderkey = l_orderkey
        and l_shipmode in ('MAIL', 'AIR')
        and l_commitdate < l_receiptdate
        and l_shipdate < l_commitdate
        and l_receiptdate >= date '1996-01-01'
        and l_receiptdate < date '1996-01-01' + interval '1' year
group by
        l_shipmode
order by
        l_shipmode
LIMIT 1;
                                                                                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
   ->  Finalize GroupAggregate  (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
         Group Key: lineitem.l_shipmode
         ->  Gather Merge  (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
               Workers Planned: 4
               Workers Launched: 4
               ->  Partial GroupAggregate  (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
                     Group Key: lineitem.l_shipmode
                     ->  Sort  (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
                           Sort Key: lineitem.l_shipmode
                           Sort Method: external merge  Disk: 2304kB
                           Worker 0:  Sort Method: external merge  Disk: 2064kB
                           Worker 1:  Sort Method: external merge  Disk: 2384kB
                           Worker 2:  Sort Method: external merge  Disk: 2264kB
                           Worker 3:  Sort Method: external merge  Disk: 2336kB
                           ->  Parallel Hash Join  (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
                                 Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                                 ->  Parallel Seq Scan on lineitem  (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
                                       Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone))
                                       Rows Removed by Filter: 11934691
                                 ->  Parallel Hash  (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
                                       Buckets: 65536  Batches: 256  Memory Usage: 3840kB
                                       ->  Parallel Seq Scan on orders  (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
 Planning Time: 0.977 ms
 Execution Time: 7923.770 ms

Query 12 from TPC-H is a good illustration for a parallel hash join. Each worker helps to build a shared hash table.

Merge Join

Due to the nature of merge join it’s not possible to make it parallel. Don’t worry if it’s the last stage of the query execution—you can still can see parallel execution for queries with a merge join.

-- Query 2 from TPC-H
explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
from    part, supplier, partsupp, nation, region
where
        p_partkey = ps_partkey
        and s_suppkey = ps_suppkey
        and p_size = 36
        and p_type like '%BRASS'
        and s_nationkey = n_nationkey
        and n_regionkey = r_regionkey
        and r_name = 'AMERICA'
        and ps_supplycost = (
                select
                        min(ps_supplycost)
                from    partsupp, supplier, nation, region
                where
                        p_partkey = ps_partkey
                        and s_suppkey = ps_suppkey
                        and s_nationkey = n_nationkey
                        and n_regionkey = r_regionkey
                        and r_name = 'AMERICA'
        )
order by s_acctbal desc, n_name, s_name, p_partkey
LIMIT 100;
                                                QUERY PLAN
----------------------------------------------------------------------------------------------------------
 Limit
   ->  Sort
         Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
         ->  Merge Join
               Merge Cond: (part.p_partkey = partsupp.ps_partkey)
               Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
               ->  Gather Merge
                     Workers Planned: 4
                     ->  Parallel Index Scan using part_pkey on part
                           Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
               ->  Materialize
                     ->  Sort
                           Sort Key: partsupp.ps_partkey
                           ->  Nested Loop
                                 ->  Nested Loop
                                       Join Filter: (nation.n_regionkey = region.r_regionkey)
                                       ->  Seq Scan on region
                                             Filter: (r_name = 'AMERICA'::bpchar)
                                       ->  Hash Join
                                             Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
                                             ->  Seq Scan on supplier
                                             ->  Hash
                                                   ->  Seq Scan on nation
                                 ->  Index Scan using idx_partsupp_suppkey on partsupp
                                       Index Cond: (ps_suppkey = supplier.s_suppkey)
               SubPlan 1
                 ->  Aggregate
                       ->  Nested Loop
                             Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
                             ->  Seq Scan on region region_1
                                   Filter: (r_name = 'AMERICA'::bpchar)
                             ->  Nested Loop
                                   ->  Nested Loop
                                         ->  Index Scan using idx_partsupp_partkey on partsupp partsupp_1
                                               Index Cond: (part.p_partkey = ps_partkey)
                                         ->  Index Scan using supplier_pkey on supplier supplier_1
                                               Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
                                   ->  Index Scan using nation_pkey on nation nation_1
                                         Index Cond: (n_nationkey = supplier_1.s_nationkey)

The “Merge Join” node is above “Gather Merge”. Thus merge is not using parallel execution. But the “Parallel Index Scan” node still helps with the part_pkey segment.

Partition-wise join

PostgreSQL 11 disables the partition-wise join feature by default. Partition-wise join has a high planning cost. Joins for similarly partitioned tables could be done partition-by-partition. This allows postgres to use smaller hash tables. Each per-partition join operation could be executed in parallel.

tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                    QUERY PLAN
---------------------------------------------------
 Append
   ->  Hash Join
         Hash Cond: (t2.b = t1.a)
         ->  Seq Scan on prt2_p1 t2
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p1 t1
                     Filter: (b = 0)
   ->  Hash Join
         Hash Cond: (t2_1.b = t1_1.a)
         ->  Seq Scan on prt2_p2 t2_1
               Filter: ((b >= 0) AND (b <= 10000))
         ->  Hash
               ->  Seq Scan on prt1_p2 t1_1
                     Filter: (b = 0)
tpch=# set parallel_setup_cost = 1;
tpch=# set parallel_tuple_cost = 0.01;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
                        QUERY PLAN
-----------------------------------------------------------
 Gather
   Workers Planned: 4
   ->  Parallel Append
         ->  Parallel Hash Join
               Hash Cond: (t2_1.b = t1_1.a)
               ->  Parallel Seq Scan on prt2_p2 t2_1
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p2 t1_1
                           Filter: (b = 0)
         ->  Parallel Hash Join
               Hash Cond: (t2.b = t1.a)
               ->  Parallel Seq Scan on prt2_p1 t2
                     Filter: ((b >= 0) AND (b <= 10000))
               ->  Parallel Hash
                     ->  Parallel Seq Scan on prt1_p1 t1
                           Filter: (b = 0)

Above all, a partition-wise join can use parallel execution only if partitions are big enough.

Parallel Append

Parallel Append partitions work instead of using different blocks in different workers. Usually, you can see this with UNION ALL queries. The drawback – less parallelism, because every worker could ultimately work for a single query.

There are just two workers launched even with four workers enabled.

tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
                                           QUERY PLAN
------------------------------------------------------------------------------------------------
 Gather
   Workers Planned: 2
   ->  Parallel Append
         ->  Aggregate
               ->  Seq Scan on lineitem
                     Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone)
         ->  Aggregate
               ->  Seq Scan on lineitem lineitem_1
                     Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)

Most important variables

  • WORK_MEM limits the memory usage of each process! Not just for queries: work_mem * processes * joins => could lead to significant memory usage.
  • max_parallel_workers_per_gather  – how many workers an executor will use for the parallel execution of a planner node
  • max_worker_processes – adapt the total number of workers to the number of CPU cores installed on a server
  • max_parallel_workers – same for the number of parallel workers

Summary

Starting from 9.6 parallel queries execution could significantly improve performance for complex queries scanning many rows or index records. In PostgreSQL 10, parallel execution was enabled by default. Do not forget to disable parallel execution on servers with a heavy OLTP workload. Sequential scans or index scans still consume a significant amount of resources. If you are not running a report against the whole dataset, you may improve query performance just by adding missing indexes or by using proper partitioning.

References


Image compiled from photos by Nathan Gonthier and Pavel Nekoranec on Unsplash

Jan
23
2019
--

MySQL 8.0.14: A Road to Parallel Query Execution is Wide Open!

road to MySQL parallel query

road to MySQL parallel queryFor a very long time – since when multiple CPU cores were commonly available – I dreamed about MySQL having the ability to execute queries in parallel. This feature was lacking from MySQL, and I wrote a lots of posts on how to emulate parallel queries in MySQL using different methods: from simple parallel bash script to using Apache Spark to using ClickHouse together with MySQL. I have watched parallelism coming to PostgreSQL, to new databases like TiDB, to Amazon Aurora… And finally: MySQL 8.0.14 has (for now limited) an ability to perform parallel query execution. At the time of writing it is limited to select count(*) from table queries as well as check table queries.

MySQL 8.0.14 contains this in the release notes: “As of MySQL 8.0.14, InnoDB supports parallel clustered index reads, which can improve CHECK TABLE performance.” Actually parallel clustered index reads also works for simple count(*) (without a “where” condition). You can control the parallel threads with the innodb_parallel_read_threads parameter.

Here is the simple test (machine has 32 cpu cores):

mysql> set local innodb_parallel_read_threads=1;
Query OK, 0 rows affected (0.00 sec)
mysql> select count(*) from ontime;
+-----------+
| count(*)  |
+-----------+
| 177920306 |
+-----------+
1 row in set (2 min 33.93 sec)
mysql> set local innodb_parallel_read_threads=DEFAULT; -- 4 is default
Query OK, 0 rows affected (0.00 sec)
mysql> select count(*) from ontime;
+-----------+
| count(*)  |
+-----------+
| 177920306 |
+-----------+
1 row in set (21.85 sec)
mysql> set local innodb_parallel_read_threads=32;
Query OK, 0 rows affected (0.00 sec)
mysql> select count(*) from ontime;
+-----------+
| count(*)  |
+-----------+
| 177920306 |
+-----------+
1 row in set (5.35 sec)

The following graph shows CPU utilization during the execution with 4 threads and 32 threads:

Unfortunately it only works for count(*) from table without a “where” condition.

Conclusion: although this feature is currently limited it is a great start for MySQL and opens a road to real parallel query executions.


Photo by Vidar Nordli-Mathisen on Unsplash

Jan
17
2019
--

Using Parallel Query with Amazon Aurora for MySQL

parallel query amazon aurora for mysql

parallel query amazon aurora for mysqlParallel query execution is my favorite, non-existent, feature in MySQL. In all versions of MySQL – at least at the time of writing – when you run a single query it will run in one thread, effectively utilizing one CPU core only. Multiple queries run at the same time will be using different threads and will utilize more than one CPU core.

On multi-core machines – which is the majority of the hardware nowadays – and in the cloud, we have multiple cores available for use. With faster disks (i.e. SSD) we can’t utilize the full potential of IOPS with just one thread.

AWS Aurora (based on MySQL 5.6) now has a version which will support parallelism for SELECT queries (utilizing the read capacity of storage nodes underneath the Aurora cluster). In this article, we will look at how this can improve the reporting/analytical query performance in MySQL. I will compare AWS Aurora with MySQL (Percona Server) 5.6 running on an EC2 instance of the same class.

In Short

Aurora Parallel Query response time (for queries which can not use indexes) can be 5x-10x better compared to the non-parallel fully cached operations. This is a significant improvement for the slow queries.

Test data and versions

For my test, I need to choose:

  1. Aurora instance type and comparison
  2. Dataset
  3. Queries

Aurora instance type and comparison

According to Jeff Barr’s excellent article (https://aws.amazon.com/blogs/aws/new-parallel-query-for-amazon-aurora/) the following instance classes will support parallel query (PQ):

“The instance class determines the number of parallel queries that can be active at a given time:

  • db.r*.large – 1 concurrent parallel query session
  • db.r*.xlarge – 2 concurrent parallel query sessions
  • db.r*.2xlarge – 4 concurrent parallel query sessions
  • db.r*.4xlarge – 8 concurrent parallel query sessions
  • db.r*.8xlarge – 16 concurrent parallel query sessions
  • db.r4.16xlarge – 16 concurrent parallel query sessions”

As I want to maximize the concurrency of parallel query sessions, I have chosen db.r4.8xlarge. For the EC2 instance I will use the same class: r4.8xlarge.

Aurora:

mysql> show global variables like '%version%';
+-------------------------+------------------------------+
| Variable_name           | Value                        |
+-------------------------+------------------------------+
| aurora_version          | 1.18.0                       |
| innodb_version          | 1.2.10                       |
| protocol_version        | 10                           |
| version                 | 5.6.10                       |
| version_comment         | MySQL Community Server (GPL) |
| version_compile_machine | x86_64                       |
| version_compile_os      | Linux                        |
+-------------------------+------------------------------+

MySQL on ec2

mysql> show global variables like '%version%';
+-------------------------+------------------------------------------------------+
| Variable_name           | Value                                                |
+-------------------------+------------------------------------------------------+
| innodb_version          | 5.6.41-84.1                                          |
| protocol_version        | 10                                                   |
| slave_type_conversions  |                                                      |
| tls_version             | TLSv1.1,TLSv1.2                                      |
| version                 | 5.6.41-84.1                                          |
| version_comment         | Percona Server (GPL), Release 84.1, Revision b308619 |
| version_compile_machine | x86_64                                               |
| version_compile_os      | debian-linux-gnu                                     |
| version_suffix          |                                                      |
+-------------------------+------------------------------------------------------+

Table

I’m using the “Airlines On-Time Performance” database from http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time  (You can find the scripts I used here: https://github.com/Percona-Lab/ontime-airline-performance).

mysql> show table status like 'ontime'\G
*************************** 1. row ***************************
          Name: ontime
        Engine: InnoDB
       Version: 10
    Row_format: Compact
          Rows: 173221661
Avg_row_length: 409
   Data_length: 70850183168
Max_data_length: 0
  Index_length: 0
     Data_free: 7340032
Auto_increment: NULL
   Create_time: 2018-09-26 02:03:28
   Update_time: NULL
    Check_time: NULL
     Collation: latin1_swedish_ci
      Checksum: NULL
Create_options:
       Comment:
1 row in set (0.00 sec)

The table is very wide, 84 columns.

Working with Aurora PQ (Parallel Query)

Documentation: https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/aurora-mysql-parallel-query.html

Aurora PQ works by doing a full table scan (parallel reads are done on the storage level). The InnoDB buffer pool is not used when Parallel Query is utilized.

For the purposes of the test I turned PQ on and off (normally AWS Aurora uses its own heuristics to determine if the PQ will be helpful or not):

Turn on and force:

mysql> set session aurora_pq = 1;
Query OK, 0 rows affected (0.00 sec)
mysql> set aurora_pq_force = 1;
Query OK, 0 rows affected (0.00 sec)

Turn off:

mysql> set session aurora_pq = 0;
Query OK, 0 rows affected (0.00 sec)

The EXPLAIN plan in MySQL will also show the details about parallel query execution statistics.

Queries

Here, I use the “reporting” queries, running only one query at a time. The queries are similar to those I’ve used in older blog posts comparing MySQL and Apache Spark performance (https://www.percona.com/blog/2016/08/17/apache-spark-makes-slow-mysql-queries-10x-faster/ )

Here is a summary of the queries:

  1. Simple queries:
    • select count(*) from ontime where flightdate > '2017-01-01'
    • select avg(DepDelay/ArrDelay+1) from ontime
  2. Complex filter, single table:
select SQL_CALC_FOUND_ROWS
FlightDate, UniqueCarrier as carrier, FlightNum, Origin, Dest
FROM ontime
WHERE
  DestState not in ('AK', 'HI', 'PR', 'VI')
  and OriginState not in ('AK', 'HI', 'PR', 'VI')
  and flightdate > '2015-01-01'
   and ArrDelay < 15
and cancelled = 0
and Diverted = 0
and DivAirportLandings = 0
  ORDER by DepDelay DESC
LIMIT 10;

3. Complex filter, join “reference” table

select SQL_CALC_FOUND_ROWS
FlightDate, UniqueCarrier, TailNum, FlightNum, Origin, OriginCityName, Dest, DestCityName, DepDelay, ArrDelay
FROM ontime_ind o
JOIN carriers c on o.carrier = c.carrier_code
WHERE
  (carrier_name like 'United%' or carrier_name like 'Delta%')
  and ArrDelay > 30
  ORDER by DepDelay DESC
LIMIT 10\G

4. select one row only, no index

Query 1a: simple, count(*)

Let’s take a look at the most simple query: count(*). This variant of the “ontime” table has no secondary indexes.

select count(*) from ontime where flightdate > '2017-01-01';

Aurora, pq (parallel query) disabled:

I disabled the PQ first to compare:

mysql> select count(*) from ontime where flightdate > '2017-01-01';
+----------+
| count(*) |
+----------+
|  5660651 |
+----------+
1 row in set (8 min 25.49 sec)
mysql> select count(*) from ontime where flightdate > '2017-01-01';
+----------+
| count(*) |
+----------+
|  5660651 |
+----------+
1 row in set (2 min 48.81 sec)
mysql> mysql> select count(*) from ontime where flightdate > '2017-01-01';
+----------+
| count(*) |
+----------+
|  5660651 |
+----------+
1 row in set (2 min 48.25 sec)
Please note: the first run was “cold run”; data was read from disk. The second and third run used the cached data.
Now let's enable and force Aurora PQ:
mysql> set session aurora_pq = 1;
Query OK, 0 rows affected (0.00 sec)
mysql> set aurora_pq_force = 1; 
Query OK, 0 rows affected (0.00 sec)
mysql> explain select count(*) from ontime where flightdate > '2017-01-01'\G
*************************** 1. row ***************************
          id: 1
 select_type: SIMPLE
       table: ontime
        type: ALL
possible_keys: NULL
         key: NULL
     key_len: NULL
         ref: NULL
        rows: 173706586
       Extra: Using where; Using parallel query (1 columns, 1 filters, 0 exprs; 0 extra)
1 row in set (0.00 sec)

(from the EXPLAIN plan, we can see that parallel query is used).

Results:

mysql> select count(*) from ontime where flightdate > '2017-01-01';                                                                                                                          
+----------+
| count(*) |
+----------+
|  5660651 |
+----------+
1 row in set (16.53 sec)
mysql> select count(*) from ontime where flightdate > '2017-01-01';
+----------+
| count(*) |
+----------+
|  5660651 |
+----------+
1 row in set (16.56 sec)
mysql> select count(*) from ontime where flightdate > '2017-01-01';
+----------+
| count(*) |
+----------+
|  5660651 |
+----------+
1 row in set (16.36 sec)
mysql> select count(*) from ontime where flightdate > '2017-01-01';
+----------+
| count(*) |
+----------+
|  5660651 |
+----------+
1 row in set (16.56 sec)
mysql> select count(*) from ontime where flightdate > '2017-01-01';
+----------+
| count(*) |
+----------+
|  5660651 |
+----------+
1 row in set (16.36 sec)

As we can see the results are very stable. It does not use any cache (ie: innodb buffer pool) either. The result is also interesting: utilizing multiple threads (up to 16 threads) and reading data from disk (using disk cache, probably) can be ~10x faster compared to reading from memory in a single thread.

Result: ~10x performance gain, no index used

Query 1b: simple, avg

set aurora_pq = 1; set aurora_pq_force=1;
select avg(DepDelay) from ontime;
+---------------+
| avg(DepDelay) |
+---------------+
|        8.2666 |
+---------------+
1 row in set (1 min 48.17 sec)
set aurora_pq = 0; set aurora_pq_force=0;  
select avg(DepDelay) from ontime;
+---------------+
| avg(DepDelay) |
+---------------+
|        8.2666 |
+---------------+
1 row in set (2 min 49.95 sec)
Here we can see that PQ gives use ~2x performance increase.

Summary of simple query performance

Here is what we learned comparing Aurora PQ performance to native MySQL query execution:

  1. Select count(*), not using index: 10x performance increase with Aurora PQ.
  2. select avg(…), not using index: 2x performance increase with Aurora PQ.

Query 2: Complex filter, single table

The following query will always be slow in MySQL. This combination of the filters in the WHERE condition makes it extremely hard to prepare a good set of indexes to make this query faster.

select SQL_CALC_FOUND_ROWS
FlightDate, UniqueCarrier as carrier, FlightNum, Origin, Dest
FROM ontime
WHERE
  DestState not in ('AK', 'HI', 'PR', 'VI')
  and OriginState not in ('AK', 'HI', 'PR', 'VI')
  and flightdate > '2015-01-01'
  and ArrDelay < 15
and cancelled = 0
and Diverted = 0
and DivAirportLandings = '0'
ORDER by DepDelay DESC
LIMIT 10;

Let’s compare the query performance with and without PQ.

PQ disabled:

mysql> set aurora_pq_force = 0;
Query OK, 0 rows affected (0.00 sec)
mysql> set aurora_pq = 0;                                                                                                                                                                  
Query OK, 0 rows affected (0.00 sec)
mysql> explain select SQL_CALC_FOUND_ROWS FlightDate, UniqueCarrier as carrier, FlightNum, Origin, Dest FROM ontime WHERE    DestState not in ('AK', 'HI', 'PR', 'VI') and OriginState not in ('AK', 'HI', 'PR', 'VI') and flightdate > '2015-01-01'     and ArrDelay < 15 and cancelled = 0 and Diverted = 0 and DivAirportLandings = 0 ORDER by DepDelay DESC LIMIT 10\G
*************************** 1. row ***************************
          id: 1
 select_type: SIMPLE
       table: ontime
        type: ALL
possible_keys: NULL
         key: NULL
     key_len: NULL
         ref: NULL
        rows: 173706586
       Extra: Using where; Using filesort
1 row in set (0.00 sec)
mysql> select SQL_CALC_FOUND_ROWS FlightDate, UniqueCarrier as carrier, FlightNum, Origin, Dest FROM ontime WHERE    DestState not in ('AK', 'HI', 'PR', 'VI') and OriginState not in ('AK', 'HI', 'PR', 'VI') and flightdate > '2015-01-01'     and ArrDelay < 15 and cancelled = 0 and Diverted = 0 and DivAirportLandings = 0 ORDER by DepDelay DESC LIMIT 10;
+------------+---------+-----------+--------+------+
| FlightDate | carrier | FlightNum | Origin | Dest |
+------------+---------+-----------+--------+------+
| 2017-10-09 | OO      | 5028      | SBP    | SFO  |
| 2015-11-03 | VX      | 969       | SAN    | SFO  |
| 2015-05-29 | VX      | 720       | TUL    | AUS  |
| 2016-03-11 | UA      | 380       | SFO    | BOS  |
| 2016-06-13 | DL      | 2066      | JFK    | SAN  |
| 2016-11-14 | UA      | 1600      | EWR    | LAX  |
| 2016-11-09 | WN      | 2318      | BDL    | LAS  |
| 2016-11-09 | UA      | 1652      | IAD    | LAX  |
| 2016-11-13 | AA      | 23        | JFK    | LAX  |
| 2016-11-12 | UA      | 800       | EWR    | SFO  |
+------------+---------+-----------+--------+------+

10 rows in set (3 min 42.47 sec)

/* another run */

10 rows in set (3 min 46.90 sec)

This query is 100% cached. Here is the graph from PMM showing the number of read requests:

  1. Read requests: logical requests from the buffer pool
  2. Disk reads: physical requests from disk

Buffer pool requests:

Buffer pool requests from PMM

Now let’s enable and force PQ:

PQ enabled:

mysql> set session aurora_pq = 1;
Query OK, 0 rows affected (0.00 sec)
mysql> set aurora_pq_force = 1;                                                                                                                              Query OK, 0 rows affected (0.00 sec)
mysql> explain select SQL_CALC_FOUND_ROWS FlightDate, UniqueCarrier as carrier, FlightNum, Origin, Dest FROM ontime WHERE    DestState not in ('AK', 'HI', 'PR', 'VI') and OriginState not in ('AK', 'HI', 'PR', 'VI') and flightdate > '2015-01-01'     and ArrDelay < 15 and cancelled = 0 and Diverted = 0 and DivAirportLandings = 0 ORDER by DepDelay DESC LIMIT 10\G
*************************** 1. row ***************************
          id: 1
 select_type: SIMPLE
       table: ontime
        type: ALL
possible_keys: NULL
         key: NULL
     key_len: NULL
         ref: NULL
        rows: 173706586
       Extra: Using where; Using filesort; Using parallel query (12 columns, 4 filters, 3 exprs; 0 extra)
1 row in set (0.00 sec)
mysql> select SQL_CALC_FOUND_ROWS                                                                                                                                                                      -> FlightDate, UniqueCarrier as carrier, FlightNum, Origin, Dest -> FROM ontime
   -> WHERE
   ->  DestState not in ('AK', 'HI', 'PR', 'VI')
   ->  and OriginState not in ('AK', 'HI', 'PR', 'VI')
   ->  and flightdate > '2015-01-01'
   ->   and ArrDelay < 15
   -> and cancelled = 0
   -> and Diverted = 0
   -> and DivAirportLandings = 0
   ->  ORDER by DepDelay DESC
   -> LIMIT 10;
+------------+---------+-----------+--------+------+
| FlightDate | carrier | FlightNum | Origin | Dest |
+------------+---------+-----------+--------+------+
| 2017-10-09 | OO      | 5028      | SBP    | SFO  |
| 2015-11-03 | VX      | 969       | SAN    | SFO  |
| 2015-05-29 | VX      | 720       | TUL    | AUS  |
| 2016-03-11 | UA      | 380       | SFO    | BOS  |
| 2016-06-13 | DL      | 2066      | JFK    | SAN  |
| 2016-11-14 | UA      | 1600      | EWR    | LAX  |
| 2016-11-09 | WN      | 2318      | BDL    | LAS  |
| 2016-11-09 | UA      | 1652      | IAD    | LAX  |
| 2016-11-13 | AA      | 23        | JFK    | LAX  |
| 2016-11-12 | UA      | 800       | EWR    | SFO  |
+------------+---------+-----------+--------+------+
10 rows in set (41.88 sec)
/* run 2 */
10 rows in set (28.49 sec)
/* run 3 */
10 rows in set (29.60 sec)

Now let’s compare the requests:

InnoDB Buffer Pool Requests

As we can see, Aurora PQ is almost NOT utilizing the buffer pool (there are a minor number of read requests. Compare the max of 4K requests per second with PQ to the constant 600K requests per second in the previous graph).

Result: ~8x performance gain

Query 3: Complex filter, join “reference” table

In this example I join two tables: the main “ontime” table and a reference table. If we have both tables without indexes it will simply be too slow in MySQL. To make it better, I have created an index for both tables and so it will use indexes for the join:

CREATE TABLE `carriers` (
 `carrier_code` varchar(8) NOT NULL DEFAULT '',
 `carrier_name` varchar(200) DEFAULT NULL,
 PRIMARY KEY (`carrier_code`),
 KEY `carrier_name` (`carrier_name`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
mysql> show create table ontime_ind\G
...
 PRIMARY KEY (`id`),
 KEY `comb1` (`Carrier`,`Year`,`ArrDelayMinutes`),
 KEY `FlightDate` (`FlightDate`)
) ENGINE=InnoDB AUTO_INCREMENT=178116912 DEFAULT CHARSET=latin1

Query:

select SQL_CALC_FOUND_ROWS
FlightDate, UniqueCarrier, TailNum, FlightNum, Origin, OriginCityName, Dest, DestCityName, DepDelay, ArrDelay
FROM ontime_ind o
JOIN carriers c on o.carrier = c.carrier_code
WHERE
  (carrier_name like 'United%' or carrier_name like 'Delta%')
  and ArrDelay > 30
  ORDER by DepDelay DESC
LIMIT 10\G

PQ disabled, explain plan:

mysql> set aurora_pq_force = 0;
Query OK, 0 rows affected (0.00 sec)
mysql> set aurora_pq = 0;                                                                                                                                                                  
Query OK, 0 rows affected (0.00 sec)
mysql> explain
   -> select SQL_CALC_FOUND_ROWS
   -> FlightDate, UniqueCarrier, TailNum, FlightNum, Origin, OriginCityName, Dest, DestCityName, DepDelay, ArrDelay
   -> FROM ontime_ind o
   -> JOIN carriers c on o.carrier = c.carrier_code
   -> WHERE
   ->  (carrier_name like 'United%' or carrier_name like 'Delta%')
   ->  and ArrDelay > 30
   ->  ORDER by DepDelay DESC
   -> LIMIT 10\G
*************************** 1. row ***************************
          id: 1
 select_type: SIMPLE
       table: c
        type: range
possible_keys: PRIMARY,carrier_name
         key: carrier_name
     key_len: 203
         ref: NULL
        rows: 3
       Extra: Using where; Using index; Using temporary; Using filesort
*************************** 2. row ***************************
          id: 1
 select_type: SIMPLE
       table: o
        type: ref
possible_keys: comb1
         key: comb1
     key_len: 3
         ref: ontime.c.carrier_code
        rows: 2711597
       Extra: Using index condition; Using where
2 rows in set (0.01 sec)

As we can see MySQL uses indexes for the join. Response times:

/* run 1 – cold run */

10 rows in set (29 min 17.39 sec)

/* run 2  – warm run */

10 rows in set (2 min 45.16 sec)

PQ enabled, explain plan:

mysql> explain
   -> select SQL_CALC_FOUND_ROWS
   -> FlightDate, UniqueCarrier, TailNum, FlightNum, Origin, OriginCityName, Dest, DestCityName, DepDelay, ArrDelay
   -> FROM ontime_ind o
   -> JOIN carriers c on o.carrier = c.carrier_code
   -> WHERE
   ->  (carrier_name like 'United%' or carrier_name like 'Delta%')
   ->  and ArrDelay > 30
   ->  ORDER by DepDelay DESC
   -> LIMIT 10\G
*************************** 1. row ***************************
          id: 1
 select_type: SIMPLE
       table: c
        type: ALL
possible_keys: PRIMARY,carrier_name
         key: NULL
     key_len: NULL
         ref: NULL
        rows: 1650
       Extra: Using where; Using temporary; Using filesort; Using parallel query (2 columns, 0 filters, 1 exprs; 0 extra)
*************************** 2. row ***************************
          id: 1
 select_type: SIMPLE
       table: o
        type: ALL
possible_keys: comb1
         key: NULL
     key_len: NULL
         ref: NULL
        rows: 173542245
       Extra: Using where; Using join buffer (Hash Join Outer table o); Using parallel query (11 columns, 1 filters, 1 exprs; 0 extra)
2 rows in set (0.00 sec)

As we can see, Aurora does not use any indexes and uses a parallel scan instead.

Response time:

mysql> select SQL_CALC_FOUND_ROWS
   -> FlightDate, UniqueCarrier, TailNum, FlightNum, Origin, OriginCityName, Dest, DestCityName, DepDelay, ArrDelay
   -> FROM ontime_ind o
   -> JOIN carriers c on o.carrier = c.carrier_code
   -> WHERE
   ->  (carrier_name like 'United%' or carrier_name like 'Delta%')
   ->  and ArrDelay > 30
   ->  ORDER by DepDelay DESC
   -> LIMIT 10\G
...
*************************** 4. row ***************************
   FlightDate: 2017-05-04
UniqueCarrier: UA
      TailNum: N68821
    FlightNum: 1205
       Origin: KOA
OriginCityName: Kona, HI
         Dest: LAX
 DestCityName: Los Angeles, CA
     DepDelay: 1457
     ArrDelay: 1459
*************************** 5. row ***************************
   FlightDate: 1991-03-12
UniqueCarrier: DL
      TailNum:
    FlightNum: 1118
       Origin: ATL
OriginCityName: Atlanta, GA
         Dest: STL
 DestCityName: St. Louis, MO
...
10 rows in set (28.78 sec)
mysql> select found_rows();
+--------------+
| found_rows() |
+--------------+
|      4180974 |
+--------------+
1 row in set (0.00 sec)

Result: ~5x performance gain

(this is actually comparing the index cached read to a non-index PQ execution)

Summary

Aurora PQ can significantly improve the performance of reporting queries as such queries may be extremely hard to optimize in MySQL, even when using indexes. With indexes, Aurora PQ response time can be 5x-10x better compared to the non-parallel, fully cached operations. Aurora PQ can help improve performance of complex queries by performing parallel reads.

The following table summarizes the query response times:

Query Time, No PQ, index Time, PQ
select count(*) from ontime where flightdate > ‘2017-01-01’ 2 min 48.81 sec 16.53 sec
select avg(DepDelay) from ontime; 2 min 49.95 sec 1 min 48.17 sec
select SQL_CALC_FOUND_ROWS

FlightDate, UniqueCarrier as carrier, FlightNum, Origin, Dest

FROM ontime

WHERE

DestState not in (‘AK’, ‘HI’, ‘PR’, ‘VI’)

and OriginState not in (‘AK’, ‘HI’, ‘PR’, ‘VI’)

and flightdate > ‘2015-01-01’

and ArrDelay < 15

and cancelled = 0

and Diverted = 0

and DivAirportLandings = 0

ORDER by DepDelay DESC

LIMIT 10;

3 min 42.47 sec 28.49 sec
select SQL_CALC_FOUND_ROWS

FlightDate, UniqueCarrier, TailNum, FlightNum, Origin, OriginCityName, Dest, DestCityName, DepDelay, ArrDelay

FROM ontime_ind o

JOIN carriers c on o.carrier = c.carrier_code

WHERE

(carrier_name like ‘United%’ or carrier_name like ‘Delta%’)

and ArrDelay > 30

ORDER by DepDelay DESC

LIMIT 10\G

2 min 45.16 sec 28.78 sec


Photo by Thomas Lipke on Unsplash

Jul
06
2016
--

Pipelining versus Parallel Query Execution with MySQL 5.7 X Plugin

Pipelining versus Parallel Query Execution

Pipelining versus Parallel Query ExecutionIn this blog post, we’ll look at pipelining versus parallel query execution when using X Plugin for MySQL 5.7.

In my previous blog post, I showed how to useX Plugin for MySQL 5.7 for parallel query execution. The tricks I used to make it work:

  • Partitioning by hash
  • Open N connections to MySQL, where N = number of CPU cores

I had to do it manually (as well as to sort the result at the end) as X Plugin only supports “pipelining” (which only saves the round trip time) and does not “multiplex” connections to MySQL (MySQL does not use multiple CPU cores for a single query).

TL:DR; version

In this (long) post I’m playing with MySQL 5.7 X Plugin / X Protocol and document store. Here is the summary:

  1. X Plugin does not “multiplex” connections/sessions to MySQL. Similar to the original protocol, one connection to X Plugin will result in one session open to MySQL
  2. An X Plugin query (if the library supports it) returns immediately and does not wait until the query is finished (async call). MySQL works like a queue.
  3. X Plugin does not have any additional server-level durability settings. Unless you check or wait for the acknowledgement (which is asynchronous) from the server, the data might or might not be written into MySQL (“fire and forget”).

At the same time, X Protocol can be helpful if:

  • We want to implement an asynchronous client (i.e., we do not want to block the network communication such as downloading or API calls) when the MySQL table is locked.
  • We want to use MySQL as a queue and save the round-trip time.
Benchmark results: “pipelining” versus “parallelizing” versus a single query

I’ve done a couple of tests comparing the results between “pipelining” versus “parallelizing” versus a single query. Here are the results:

      1. Parallel queries with NodeJS:
        $ time node async_wikistats.js
        ...
        All done! Total: 17753
        ...
        real    0m30.668s
        user    0m0.256s
        sys     0m0.028s
      2. Pipeline with NojeJS:
        $ time node async_wikistats_pipeline.js
        ...
        All done! Total: 17753
        ...
        real 5m39.666s
        user 0m0.212s
        sys 0m0.024s

        In the pipeline with NojeJS, I’m reusing the same connection (and do not open a new one for each thread).

      3. Direct query – partitioned table:
        mysql> select sum(tot_visits) from wikistats.wikistats_by_day_spark_part where url like ‘%postgresql%’;
        +-----------------+
        | sum(tot_visits) |
        +-----------------+
        | 17753           |
        +-----------------+
        1 row in set (5 min 31.44 sec)
      4. Direct query – non-partitioned table.
        mysql> select sum(tot_visits) from wikistats.wikistats_by_day_spark where url like ‘%postgresql%’;
        +-----------------+
        | sum(tot_visits) |
        +-----------------+
        | 17753           |
        +-----------------+
        1 row in set (4 min 38.16 sec)
Advantages of pipelines with X Plugin 

Although pipelining with X Plugin does not significantly increase query response time (it can reduce the total latency), it might be helpful in some cases. For example, let’s say we are downloading something from the Internet and need to save the progress of the download as well as the metadata for the document. In this example, I use youtube-dl to search and download the metadata about YouTube videos, then save the metadata JSON into MySQL 5.7 Document Store. Here is the code:

var mysqlx = require('mysqlx');
# This is the same as running $ youtube-dl -j -i ytsearch100:"mysql 5.7"
const spawn = require('child_process').spawn;
const yt = spawn('youtube-dl', ['-j', '-i', 'ytsearch100:"mysql 5.7"'], {maxBuffer: 1024 * 1024 * 128});
var mySession =
mysqlx.getSession({
    host: 'localhost',
    port: 33060,
    dbUser: 'root',
    dbPassword: '<your password>'
});
yt.stdout.on('data', (data) => {
        try {
                dataObj = JSON.parse(data);
                console.log(dataObj.fulltitle);
                mySession.then(session => {
                                                session.getSchema("yt").getCollection("youtube").add(  dataObj  )
                                                .execute(function (row) {
                                                }).catch(err => {
                                                        console.log(err);
                                                })
                                                .then( function (notices) { console.log("Wrote to MySQL: " + JSON.stringify(notices))  });
                                }).catch(function (err) {
                                              console.log(err);
                                              process.exit();
                                });
        } catch (e) {
                console.log(" --- Can't parse json" + e );
        }
});
yt.stderr.on('data', (data) => {
  console.log("Error receiving data");
});
yt.on('close', (code) => {
  console.log(`child process exited with code ${code}`);
  mySession.then(session => {session.close() } );
});

In the above example, I execute the youtube-dl binary (you need to have it installed first) to search for “MySQL 5.7” videos. Instead of downloading the videos, I only grab the video’s metadata in JSON format  (“-j” flag). Because it is JSON, I can save it into MySQL document store. The table has the following structure:

CREATE TABLE `youtube` (
  `doc` json DEFAULT NULL,
  `_id` varchar(32) GENERATED ALWAYS AS (json_unquote(json_extract(`doc`,'$._id'))) STORED NOT NULL,
  UNIQUE KEY `_id` (`_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

Here is the execution example:

$ node yt.js
What's New in MySQL 5.7
Wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["3f312c3b-b2f3-55e8-0ee9-b706eddf"]}}
MySQL 5.7: MySQL JSON data type example
Wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["88223742-9875-59f1-f535-f1cfb936"]}}
MySQL Performance Tuning: Part 1. Configuration (Covers MySQL 5.7)
Wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["c377e051-37e6-8a63-bec7-1b81c6d6"]}}
Dave Stokes — MySQL 5.7 - New Features and Things That Will Break — php[world] 2014
Wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["96ae0dd8-9f7d-c08a-bbef-1a256b11"]}}
MySQL 5.7 & JSON: New Opportunities for Developers - Thomas Ulin - Forum PHP 2015
Wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["ccb5c53e-561c-2ed5-6deb-1b325739"]}}
Cara Instal MySQL 5.7.10 NoInstaller pada Windows Manual Part3
Wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["95efbd79-8d79-e7b6-a535-271640c8"]}}
MySQL 5.7 Install and Configuration on Ubuntu 14.04
Wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["b8cfe132-aca4-1eba-c2ae-69e48db8"]}}

Now, here is what make this example interesting: as NodeJS + X Plugin = Asynchronous + Pipelining, the program execution will not stop if the table is locked. I’ve opened two sessions:

  • session 1: $ node yt.js > test_lock_table.log
  • session 2:
    mysql> lock table youtube read; select sleep(10); unlock tables;
    Query OK, 0 rows affected (0.00 sec)
    +-----------+
    | sleep(10) |
    +-----------+
    |         0 |
    +-----------+
    1 row in set (10.01 sec)
    Query OK, 0 rows affected (0.00 sec)

Results:

...
Upgrade MySQL Server from 5.5 to 5.7
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["d4d62a8a-fbfa-05ab-2110-2fd5cf6d"]}}
OSC15 - Georgi Kodinov - Secure Deployment Changes Coming in MySQL 5.7
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["8ac1cdb9-1499-544c-da2a-5db1ccf5"]}}
MySQL 5.7: Create JSON string using mysql
FreeBSD 10.3 - Instalación de MySQL 5.7 desde Código Fuente - Source Code
Webinar replay: How To Upgrade to MySQL 5.7 - The Best Practices - part 1
How to install MySQL Server on Mac OS X Yosemite - ltamTube
Webinar replay: How To Upgrade to MySQL 5.7 - The Best Practices - part 4
COMO INSTALAR MYSQL VERSION 5.7.13
MySQL and JSON
MySQL 5.7: Merge JSON data using MySQL
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["a11ff369-6f23-11e9-187b-e3713e6e"]}}
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["06143a61-4add-79da-0e1d-c2b52cf6"]}}
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["1eb94ef4-db63-cb75-767e-e1555549"]}}
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["e25f15b5-8c19-9531-ed69-7b46807a"]}}
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["02b5a4c9-6a21-f263-90d5-cd761906"]}}
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["e0bef958-10af-b181-81cd-5debaaa0"]}}
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["f48fa635-fa63-7481-0668-addabbac"]}}
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["557fa5c5-3c8a-fe01-c17c-549c557e"]}}
MySQL 5.7 Install and Configuration on Ubuntu 14.04
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["456b11d8-ba03-0aec-8e06-9517c6e1"]}}
MySQL WorkBench 6.3 installation on Ubuntu 14.04
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["0b651987-9b23-b5e0-f8f7-49b8ba5c"]}}
Going through era of IoT with MySQL 5.7 - FOSSASIA 2016
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["e133746c-836c-a7e0-3893-292a7429"]}}
MySQL 5.7: MySQL JSON operator example
... => wrote to MySQL: {"_state":{"rows_affected":1,"doc_ids":["4d13830d-7b30-5b31-d068-c7305e0a"]}}

As we can see, the first two writes were immediate. Then I’ve locked the table, and no MySQL queries went through. At the same time the download process (which is the slowest part here) proceeded and was not blocked (we can see the titles above, which are not followed by lines “… => wrote to MySQL:”). When the table was unlocked, a pile of waiting queries succeeded.

This can be very helpful when running a “download” process, and the network is a bottleneck. In a traditional synchronous query execution, when we lock a table the application gets blocked (including the network communication). With NodeJS and X Plugin, the download part will proceed with MySQL acting as a queue.

Pipeline Durability

How “durable” this pipeline, you might ask. In other words, what will happen if I will kill the connection? To test it out, I have (once again) locked the table (but now before starting the nodejs), killed the connection and finally unlocked the table. Here are the results:

Session 1:
----------
mysql> truncate table youtube_new;
Query OK, 0 rows affected (0.25 sec)
mysql> lock table youtube_new read;
Query OK, 0 rows affected (0.00 sec)
mysql> select count(*) from youtube_new;
+----------+
| count(*) |
+----------+
|        0 |
+----------+
1 row in set (0.00 sec)
Session 2:
----------
(when table is locked)
$ node yt1.js
11 03  MyISAM
Switching to InnoDB from MyISAM
tablas InnoDB a MyISAM
MongoDB vs MyISAM (MariaDB/MySQL)
MySQL Tutorial 35 - Foreign Key Constraints for the InnoDB Storage Engine
phpmyadmin foreign keys myisam innodb
Convert or change database manual from Myisam to Innodb
... >100 other results omited ...
^C
Session 1:
----------
mysql> select count(*) from youtube_new;
+----------+
| count(*) |
+----------+
|        0 |
+----------+
1 row in set (0.00 sec)
     Id: 4916
   User: root
   Host: localhost:33221
     db: NULL
Command: Query
   Time: 28
  State: Waiting for table metadata lock
   Info: PLUGIN: INSERT INTO `iot`.`youtube_new` (doc) VALUES ('{"upload_date":"20140319","protocol":"
mysql> unlock table;
Query OK, 0 rows affected (0.00 sec)
mysql> select count(*) from youtube_new;
+----------+
| count(*) |
+----------+
|        2 |
+----------+
1 row in set (0.00 sec)
mysql>  select json_unquote(doc->'$.title') from youtube_new;
+---------------------------------+
| json_unquote(doc->'$.title')    |
+---------------------------------+
| 11 03  MyISAM                   |
| Switching to InnoDB from MyISAM |
+---------------------------------+
2 rows in set (0.00 sec)

Please note: in the above, there isn’t a single acknowledgement from the MySQL server. When code receives a response from MySQL it prints “Wrote to MySQL: {“_state”:{“rows_affected”:1,”doc_ids”:[“…”]}}“. Also, note that when the connection was killed the MySQL process is still there, waiting on the table lock.

What is interesting here is is that only two rows have been inserted into the document store. Is there a “history length” here or some other buffer that we can increase? I’ve asked Jan Kneschke, one of the authors of the X Protocol, and the answers were:

  • Q: Is there any history length or any buffer and can we tune it?
    • A: There is no “history” or “buffer” at all, it is all at the connector level.
  • Q: Then why is 2 rows were finally inserted?
    • To answer this question I’ve collected tcpdump to port 33060 (X Protocol), see below

This is very important information! Keep in mind that the asynchronous pipeline has no durability settings: if the application fails and there are some pending writes, those writes can be lost (or could be written).

To fully understand how the protocol works, I’ve captured tcpdump (Jan Kneschke helped me to analyze it):

tcpdump -i lo -s0 -w tests/node-js-pipelining.pcap "tcp port 33060"

This is what is happening:

  • When I hit CTRL+C, nodejs closes the connection. As the table is still locked, MySQL can’t write to it and will not send the result of the insert back.
  • When the table is unlocked, it starts the first statement despite the fact that the connection has been closed. It then acknowledges the first insert and starts the second one.
  • However, at this point the script (client) has already closed the connection and the final packet (write done, here is the id) gets denied. The X Plugin then finds out that the client closed the connection and stops executing the pipeline.

Actually, this is very similar to how the original MySQL protocol worked. If we kill the script/application, it doesn’t automatically kill the MySQL connection (unless you hit CTRL+C in the MySQL client, sends the kill signal) and the connection waits for the table to get unlocked. When the table is unlocked, it inserts the first statement from a file.

Session 1
---------
mysql> select * from t_sql;
Empty set (0.00 sec)
mysql> lock table t_sql read;
Query OK, 0 rows affected (0.00 sec)
Session 2:
----------
$ mysql iot < t.sql
$ kill -9 ...
[3]   Killed                  mysql iot < t.sql
Session 1:
----------
mysql> show processlist;
+------+------+-----------------+------+---------+---------+---------------------------------+-----------------------------------------------+
| Id   | User | Host            | db   | Command | Time    | State                           | Info                                          |
+------+------+-----------------+------+---------+---------+---------------------------------+-----------------------------------------------+
| 4913 | root | localhost       | iot  | Query   |      41 | Waiting for table metadata lock | insert into t_sql  values('{"test_field":0}') |
+------+------+-----------------+------+---------+---------+---------------------------------+-----------------------------------------------+
4 rows in set (0.00 sec)
mysql> unlock tables;
Query OK, 0 rows affected (0.00 sec)
mysql> select * from t_sql;
+-------------------+
| doc               |
+-------------------+
| {"test_field": 0} |
+-------------------+
1 row in set (0.00 sec)

Enforcing unique checks

If I restart my script, it finds the same videos again. We will probably need to enforce the consistency of our data. By default the plugin generates the unique key (_id) for the document, so it prevents inserting the duplicates.

Another way to enforce the unique checks is to create a unique key for youtube id. Here is the updated table structure:

CREATE TABLE `youtube` (
  `doc` json DEFAULT NULL,
  `youtube_id` varchar(11) GENERATED ALWAYS AS (json_unquote(json_extract(`doc`,'$.id'))) STORED NOT NULL,
  UNIQUE KEY `youtube_id` (`youtube_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

I’ve changed the default “_id” column to the YouTube’s unique ID. Now when I restart the script it shows:

MySQL 5.7: Merge JSON data using MySQL
{ [Error: Document contains a field value that is not unique but required to be]
  info:
   { severity: 0,
     code: 5116,
     msg: 'Document contains a field value that is not unique but required to be',
     sql_state: 'HY000' } }
... => wrote to MySQL: undefined

…as this document has already been loaded.

Conclusion

Although X Plugin pipelining does not necessarily significantly increase query response (it might save the roundtrip time) it can be helpful for some applications.We might not want to block the network communication (i.e., downloading or API calls) when the MySQL table is locked, for example. At the same time, unless you check/wait for the acknowledgement from the server, the data might or might not be written into MySQL.

Bonus: data analysis

Now we can see what we have downloaded. There are a number of interesting fields in the result:

"is_live": null,
	"license": "Standard YouTube License",
	"duration": 2965,
	"end_time": null,
	"playlist": ""mysql 5.7"",
	"protocol": "https",
	"uploader": "YUI Library",
	"_filename": "Douglas Crockford - The JSON Saga--C-JoyNuQJs.mp4",
	"age_limit": 0,
	"alt_title": null,
	"extractor": "youtube",
	"format_id": "18",
	"fulltitle": "Douglas Crockford: The JSON Saga",
	"n_entries": 571,
	"subtitles": {},
	"thumbnail": "https://i.ytimg.com/vi/-C-JoyNuQJs/hqdefault.jpg",
	"categories": ["Science & Technology"],
	"display_id": "-C-JoyNuQJs",
	"like_count": 251,
	"player_url": null,
	"resolution": "640x360",
	"start_time": null,
	"thumbnails": [{
		"id": "0",
		"url": "https://i.ytimg.com/vi/-C-JoyNuQJs/hqdefault.jpg"
	}],
	"view_count": 36538,
	"annotations": null,
	"description": "Yahoo! JavaScript architect Douglas Crockford tells the story of how JSON was discovered and how it became a major standard for describing data.",
	"format_note": "medium",
	"playlist_id": ""mysql 5.7"",
	"upload_date": "20110828",
	"uploader_id": "yuilibrary",
	"webpage_url": "https://www.youtube.com/watch?v=-C-JoyNuQJs",
	"uploader_url": "http://www.youtube.com/user/yuilibrary",
	"dislike_count": 5,
	"extractor_key": "Youtube",
	"average_rating": 4.921875,
	"playlist_index": 223,
	"playlist_title": null,
	"automatic_captions": {},
	"requested_subtitles": null,
	"webpage_url_basename": "-C-JoyNuQJs"

We can see the most popular videos. To do that I’ve added one more virtual field on view_count, and created an index on it:

CREATE TABLE `youtube` (
  `doc` json DEFAULT NULL,
  `youtube_id` varchar(11) GENERATED ALWAYS AS (json_unquote(json_extract(`doc`,'$.id'))) STORED NOT NULL,
  `view_count` int(11) GENERATED ALWAYS AS (json_unquote(json_extract(`doc`,'$.view_count'))) VIRTUAL,
  UNIQUE KEY `youtube_id` (`youtube_id`),
  KEY `view_count` (`view_count`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

We can run the queries like:

mysql> select json_unquote(doc->'$.title'),
    -> view_count,
    -> json_unquote(doc->'$.dislike_count') as dislikes
    -> from youtube
    -> order by view_count desc
    -> limit 10;
+----------------------------------------------------------------------------------------------------+------------+----------+
| json_unquote(doc->'$.title')                                                                       | view_count | dislikes |
+----------------------------------------------------------------------------------------------------+------------+----------+
| Beginners MYSQL Database Tutorial 1 # Download , Install MYSQL and first SQL query                 |     664153 | 106      |
| MySQL Tutorial                                                                                     |     533983 | 108      |
| PHP and MYSQL - Connecting to a Database and Adding Data                                           |     377006 | 50       |
| PHP MySQL Tutorial                                                                                 |     197984 | 41       |
| Installing MySQL (Windows 7)                                                                       |     196712 | 28       |
| Understanding PHP, MySQL, HTML and CSS and their Roles in Web Development - CodersCult Webinar 001 |     195464 | 24       |
| jQuery Ajax Tutorial #1 - Using AJAX & API's (jQuery Tutorial #7)                                  |     179198 | 25       |
| How To Root Lenovo A6000                                                                           |     165221 | 40       |
| MySQL Tutorial 1 - What is MySQL                                                                   |     165042 | 45       |
| How to Send Email in Blackboard Learn                                                              |     144948 | 28       |
+----------------------------------------------------------------------------------------------------+------------+----------+
10 rows in set (0.00 sec)

Or if we want to find out the most popular resolutions:

mysql> select count(*) as cnt,
    -> sum(view_count) as sum_views,
    -> json_unquote(doc->'$.resolution') as resolution
    -> from youtube
    -> group by resolution
    -> order by cnt desc, sum_views desc
    -> limit 10;
+-----+-----------+------------+
| cnt | sum_views | resolution |
+-----+-----------+------------+
| 273 |   3121447 | 1280x720   |
|  80 |   1195865 | 640x360    |
|  18 |     33958 | 1278x720   |
|  15 |     18560 | 1152x720   |
|  11 |     14800 | 960x720    |
|   5 |      6725 | 1276x720   |
|   4 |     18562 | 1280x682   |
|   4 |      1581 | 1280x616   |
|   4 |       348 | 1280x612   |
|   3 |      2024 | 1200x720   |
+-----+-----------+------------+
10 rows in set (0.02 sec)

Special thanks to Jan Kneschke and Morgan Tocker from Oracle for helping with the X Protocol internals.

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com