An Overview of Sharding in PostgreSQL and How it Relates to MongoDB’s

PostgreSQL LogoA couple of weeks ago I presented at Percona University São Paulo about the new features in PostgreSQL that allow the deployment of simple shards. I’ve tried to summarize the main points in this post, as well as providing an introductory overview of sharding itself. Please note I haven’t included any third-party extensions that provide sharding for PostgreSQL in my discussion below.

Partitioning in PostgreSQL

In a nutshell, until not long ago there wasn’t a dedicated, native feature in PostgreSQL for table partitioning. Not that that prevented people from doing it anyway: the PostgreSQL community is very creative. There’s a table inheritance feature in PostgreSQL that allows the creation of child tables with the same structure as a parent table. That, combined with the employment of proper constraints in each child table along with the right set of triggers in the parent table, has provided practical “table partitioning” in PostgreSQL for years (and still works). Here’s an example:

Using table inheritance

CREATE TABLE temperature (
  city_id INT NOT NULL,
  temp DECIMAL(5,2) NOT NULL

Figure 1a. Main (or parent) table

CREATE TABLE temperature_201901 (CHECK (timestamp >= DATE '2019-01-01' AND timestamp <= DATE '2019-01-31')) INHERITS (temperature);
CREATE TABLE temperature_201902 (CHECK (timestamp >= DATE '2019-02-01' AND timestamp <= DATE '2019-02-28')) INHERITS (temperature);
CREATE TABLE temperature_201903 (CHECK (timestamp >= DATE '2019-03-01' AND timestamp <= DATE '2019-03-31')) INHERITS (temperature);

Figure 1b. Child tables inherit the structure of the parent table and are limited by constraints

CREATE OR REPLACE FUNCTION temperature_insert_trigger()
    IF ( NEW.timestamp >= DATE '2019-01-01' AND NEW.timestamp <= DATE '2019-01-31' ) THEN INSERT INTO temperature_201901 VALUES (NEW.*);
    ELSIF ( NEW.timestamp >= DATE '2019-02-01' AND NEW.timestamp <= DATE '2019-02-28' ) THEN INSERT INTO temperature_201902 VALUES (NEW.*);
    ELSIF ( NEW.timestamp >= DATE '2019-03-01' AND NEW.timestamp <= DATE '2019-03-31' ) THEN INSERT INTO temperature_201903 VALUES (NEW.*);
    ELSE RAISE EXCEPTION 'Date out of range!';
    END IF;
LANGUAGE plpgsql;

Figure 1c. A function that controls in which child table a new entry should be added according to the timestamp field

CREATE TRIGGER insert_temperature_trigger
    BEFORE INSERT ON temperature
    FOR EACH ROW EXECUTE PROCEDURE temperature_insert_trigger();

Figure 1d. A trigger is added to the parent table that calls the function above when an INSERT is performed

The biggest drawbacks for such an implementation was related to the amount of manual work needed to maintain such an environment (even though a certain level of automation could be achieved through the use of 3rd party extensions such as pg_partman) and the lack of optimization/support for “distributed” queries. The PostgreSQL optimizer wasn’t advanced enough to have a good understanding of partitions at the time, though there were workarounds that could be used such as employing constraint exclusion.

Declarative partitioning

About 1.5 year ago, PostgreSQL 10 was released with a bunch of new features, among them native support for table partitioning through the new declarative partitioning feature. Here’s how we could partition the same temperature table using this new method:

CREATE TABLE temperature (
  city_id INT NOT NULL,
  temp DECIMAL(5,2) NOT NULL
) PARTITION BY RANGE (timestamp);

Figure 2a. Main table structure for a partitioned table

CREATE TABLE temperature_201901 PARTITION OF temperature FOR VALUES FROM ('2019-01-01') TO ('2019-02-01');
CREATE TABLE temperature_201902 PARTITION OF temperature FOR VALUES FROM ('2019-02-01') TO ('2019-03-01');
CREATE TABLE temperature_201903 PARTITION OF temperature FOR VALUES FROM ('2019-03-01') TO ('2019-04-01');

Figure 2b. Tables defined as partitions of the main table; with declarative partitioning, there was no need for triggers anymore.

It still missed the greater optimization and flexibility needed to consider it a complete partitioning solution. It wasn’t possible, for example, to perform an UPDATE that would result in moving a row from one partition to a different one, but the foundation had been laid. Fast forward another year and PostgreSQL 11 builds on top of this, delivering additional features like:

  • the possibility to define a default partition, to which any entry that wouldn’t fit a corresponding partition would be added to.
  • having indexes added to the main table “replicated” to the underlying partitions, which improved declarative partitioning usability.
  • support for Foreign Keys

These are just a few of the features that led to a more mature partitioning solution.

Sharding in PostgreSQL

By now you might be reasonably questioning my premise, and that partitioning is not sharding, at least not in the sense and context you would have expected this post to cover. In fact, PostgreSQL has implemented sharding on top of partitioning by allowing any given partition of a partitioned table to be hosted by a remote server. The basis for this is in PostgreSQL’s Foreign Data Wrapper (FDW) support, which has been a part of the core of PostgreSQL for a long time. While technically possible to implement, we just couldn’t make practical use of it for sharding using the table inheritance + triggers approach. Declarative partitioning allowed for much better integration of these pieces making sharding – partitioned tables hosted by remote servers – more of a reality in PostgreSQL.

CREATE TABLE temperature_201904 (
  city_id INT NOT NULL,
  temp DECIMAL(5,2) NOT NULL

Figure 3a. On the remote server we create a “partition” – nothing but a simple table

CREATE EXTENSION postgres_fdw;
GRANT USAGE ON FOREIGN DATA WRAPPER postgres_fdw to app_user;
    OPTIONS (dbname 'postgres', host 'shard02', port
CREATE USER MAPPING for app_user SERVER shard02
    OPTIONS (user 'fdw_user', password 'secret');

Figure 3b. On the local server the preparatory steps involve loading the postgres_fdw extension, allowing our local application user to use that extension, creating an entry to access the remote server, and finally mapping that user with a user in the remote server (fdw_user) that has local access to the table we’ll use as a remote partition.

CREATE FOREIGN TABLE temperature_201904 PARTITION OF temperature
    FOR VALUES FROM ('2019-04-01') TO ('2019-05-01')
    SERVER remoteserver01;

Figure 3c. Now it’s simply a matter of creating a proper partition of our main table in the local server that will be linked to the table of the same name in the remote server

You can read more about postgres_fdw in Foreign Data Wrappers in PostgreSQL and a closer look at postgres_fdw.

When does it make sense to partition a table?

There are a several principle reasons to partition a table:

  1. When a table grows so big that searching it becomes impractical even with the help of indexes (which will invariably become too big as well).
  2. When data management is such that the target data is often the most recently added and/or older data is constantly being purged/archived, or even not being searched anymore (at least not as often).
  3. If you are loading data from different sources and maintaining it as a data warehousing for reporting and analytics.
  4. For a less expensive archiving or purging of massive data that avoids exclusive locks on the entire table.

When should we resort to sharding?

Here are a couple of classic cases:

  1. To scale out (horizontally), when even after partitioning a table the amount of data is too great or too complex to be processed by a single server.
  2. Use cases where the data in a big table can be divided into two or more segments that would benefit the majority of the search patterns. A common example used to describe a scenario like this is that of a company whose customers are evenly spread across the United States and searches to a target table involves the customer ZIP code. A shard then could be used to host entries of customers located on the East coast and another for customers on the West coast.

Note though this is by no means an extensive list.

How should we shard the data?

With sharding (in this context) being “distributed” partitioning, the essence for a successful (performant) sharded environment lies in choosing the right shard key – and by “right” I mean one that will distribute your data across the shards in a way that will benefit most of your queries. In the example above, using the customer ZIP code as shard key makes sense if an application will more often be issuing queries that will hit one shard (East) or the other (West). However, if most queries would filter by, say, birth date, then all queries would need to be run through all shards to recover the full result set. This could easily backfire on performance with the shard approach, by not selecting the right shard key or simply by having such a heterogeneous workload that no shard key would be able to satisfy it.

It only ever makes sense to shard if the nature of the queries involving the target table(s) is such that distributed processing will be the norm and constitute an advantage far greater than any overhead caused by a minority of queries that rely on JOINs involving multiple shards. Due to the distributed nature of sharding such queries will necessarily perform worse if compared to having them all hosted on the same server.

Why not simply rely on replication or clustering?

Sharding should be considered in those situations where you can’t efficiently break down a big table through data normalization or use an alternative approach and maintaining it on a single server is too demanding. The table is then partitioned and the partitions distributed across different servers to spread the load across many servers. It doesn’t need to be one partition per shard, often a single shard will host a number of partitions.

Note how sharding differs from traditional “share all” database replication and clustering environments: you may use, for instance, a dedicated PostgreSQL server to host a single partition from a single table and nothing else. However, these data scaling technologies may well complement each other: a PostgreSQL database may host a shard with part of a big table as well as replicate smaller tables that are often used for some sort of consultation (read-only), such as a price list, through logical replication.

How does sharding in PostgreSQL relates to sharding in MongoDB®?

MongoDB® tackles the matter of managing big collections straight through sharding: there is no concept of local partitioning of collections in MongoDB. In fact, the whole MongoDB scaling strategy is based on sharding, which takes a central place in the database architecture. As such, the sharding process has been made as transparent to the application as possible: all a DBA has to do is to define the shard key.

Instead of connecting to a reference database server the application will connect to an auxiliary router server named mongos which will process the queries and request the necessary information to the respective shard. It knows which shard contains what because they maintain a copy of the metadata that maps chunks of data to shards, which they get from a config server, another important and independent component of a MongoDB sharded cluster. Together, they also play a role in maintaining good data distribution across the shards, actively splitting and migrating chunks of data between servers as needed.

In PostgreSQL the application will connect and query the main database server. There isn’t an intermediary router such as the mongos but PostgreSQL’s query planner will process the query and create an execution plan. When data requested from a partitioned table is found on a remote server PostgreSQL will request the data from it, as shown in the EXPLAIN output below:

   Remote SQL: UPDATE public.emp SET sal = $2 WHERE ctid = $1
   ->  Nested Loop  (cost=100.00..300.71 rows=669 width=118)
     	Output: emp.empno, emp.ename, emp.job, emp.mgr, emp.hiredate, (emp.sal * '1.1'::double precision), emp.comm, emp.deptno, emp.ctid, salgrade.ctid
     	Join Filter: ((emp.sal > (salgrade.losal)::double precision) AND (emp.sal < (salgrade.hisal)::double precision)) ->  Foreign Scan on public.emp  (cost=100.00..128.06 rows=602 width=112)
           	Output: emp.empno, emp.ename, emp.job, emp.mgr, emp.hiredate, emp.sal, emp.comm, emp.deptno, emp.ctid

Figure 4: excerpt of an EXPLAIN plan that involves processing a query in a remote server

Note in the above query the mention “Remote SQL”. A lot of optimizations have been made in the execution of remote queries in PostgreSQL 10 and 11, which contributed to mature and improve the sharding solution. Among them is support for having grouping and aggregation operations executed on the remote server itself (“push down”) rather than recovering all rows and having them processed locally.

What is missing in PostgreSQL implementation?

There is, however, still room for improvement. In terms of remote execution, reports from the community indicate not all queries are performing as they should. For example, in some cases the PostgreSQL planner is not performing a full push-down, resulting in shards transferring more data than required. Parallel scheduling of queries that touch multiple shards is not yet implemented: for now, the execution is taking place sequentially, one shard at a time, which takes longer to complete. When it comes to the maintenance of partitioned and sharded environments, changes in the structure of partitions are still complicated and not very practical. For example, when you add a new partition to a partitioned table with an appointed default partition you may need to detach the default partition first if it contains rows that would now fit in the new partition, manually move those to the new partition, and finally re-attach the default partition back in place.

But that is all part of a maturing technology. We’re looking forward to PostgreSQL 12 and what it will bring in the partitioning and sharding fronts.

Image based on photos by Leonardo Quatrocchi from Pexels



Zone Based Sharding in MongoDB

MongoDB shard zones

MongoDB shard zonesIn this blog post, we will discuss about how to use zone based sharding to deploy a sharded MongoDB cluster in a customized manner so that the queries and data will be redirected per geographical groupings. This feature of MongoDB is a part of its Data Center Awareness, that allows queries to be routed to particular MongoDB deployments considering physical locations or configurations of mongod instances.

Before moving on, let’s have an overview of this feature. You might already have some questions about zone based sharding. Was it recently introduced? If zone-based sharding is something we should use, then what about tag-aware sharding?

MongoDB supported tag-aware sharding from even the initial versions of MongoDB. This means tagging a range of shard keys values, associating that range with a shard, and redirecting operations to that specific tagged shard. This tag-aware sharding, since version 3.4, is referred to as ZONES. So, the only change is its name, and this is the reason sh.addShardTag(shard, tag) method is being used.

How it works

  1. With the help of a shard key, MongoDB allows you to create zones of sharded data – also known as shard zones.
  2. Each zone can be associated with one or more shards.
  3. Similarly, a shard can associate with any number of non-conflicting zones.
  4. MongoDB migrates chunks to the zone range in the selected shards.
  5. MongoDB routes read and write to a particular zone range that resides in particular shards.

Useful for what kind of deployments/applications?

  1. In cases where data needs to be routed to a particular shard due to some hardware configuration restrictions.
  2. Zones can be useful if there is the need to isolate specific data to a particular shard. For example, in the case of GDPR compliance that requires businesses to protect data and privacy for an individual within the EU.
  3. If an application is being used geographically and you want a query to route to the nearest shards for both reads and writes.

Let’s consider a Scenario

Consider the scenario of a school where students are experts in Biology, but most students are experts in Maths. So we have more data for the maths students compare to Biology students. In this example, deployment requires that Maths students data should route to the shard with the better configuration for a large amount of data. Both read and write will be served by specific shards.  All the Biology students will be served by another shard. To implement this, we will add a tag to deploy the zones to the shards.

For this scenario we have an environment with:

DB: “school”

Collection: “students”

Fields: “sId”, “subject”, “marks” and so on..

Indexed Fields: “subject” and “sId”

We enable sharding:


And create a shardkey: “subject” and “sId” 

sh.shardCollection("school.students", {subject: 1, sId: 1});

We have two shards in our test environment


{  "_id" : "shard0000",  "host" : "",  "state" : 1 }
{  "_id" : "shard0001",  "host" : "",  "state" : 1 }

Zone Deployment

1) Disable balancer

To prevent migration of the chunks across the cluster, disable the balancer for the “students” collection:

mongos> sh.disableBalancing("school.students")
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

Before proceeding further make sure the balancer is not running. It is not a mandatory process, but it is always a good practice to make sure no migration of chunks takes place while configuring zones

mongos> sh.isBalancerRunning()

2) Add shard to the zone

A zone can be associated with a particular shard in the form of a tag, using the sh.addShardTag(), so a tag will be added to each shard. Here we are considering two zones so the tags “MATHS” and “BIOLOGY” need to be added.

mongos> sh.addShardTag( "shard0000" , "MATHS");
{ "ok" : 1 }
mongos> sh.addShardTag( "shard0001" , "BIOLOGY");
{ "ok" : 1 }

We can see zones are assigned in the form of tags as required against each shard.

mongos> sh.status()
        {  "_id" : "shard0000",  "host" : "",  "state" : 1,  "tags" : [ "MATHS" ] }
        {  "_id" : "shard0001",  "host" : "",  "state" : 1,  "tags" : [ "BIOLOGY" ] }

3) Define ranges for each zone

Each zone covers one or more ranges of shard key values. Note: each range a zone covers is always inclusive of its lower boundary and exclusive of its upper boundary.

mongos> sh.addTagRange(
	{ "subject" : "maths", "sId" : MinKey},
	{ "subject" : "maths", "sId" : MaxKey},
{ "ok" : 1 }
mongos> sh.addTagRange(
	{ "subject" : "biology", "sId" : MinKey},
	{ "subject" : "biology", "sId" : MaxKey},
{ "ok" : 1 }

4) Enable balancer

Now enable the balancer so the chunks will migrate across the shards as per the requirement and all the read and write queries will be routed to the particular shards.

mongos> sh.enableBalancing("school.students")
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
mongos> sh.isBalancerRunning()

Let’s check how documents get routed as per the tags:

We have inserted 6 documents, 4 documents with “subject”:”maths” and 2 documents with “subject”:”biology”

mongos> db.students.find({"subject":"maths"}).count()
mongos> db.students.find({"subject":"biology"}).count()

Checking the shard distribution for the students collection:

mongos> db.students.getShardDistribution()
Shard shard0000 at
data : 236B docs : 4 chunks : 4
estimated data per chunk : 59B
estimated docs per chunk : 1
Shard shard0001 at
data : 122B docs : 2 chunks : 1
estimated data per chunk : 122B
estimated docs per chunk : 2

So in this test case, all the queries for the students collection have routed as per the tag used, with four documents inserted into shard0000 and two documents inserted to shard0001.

Any queries related to MATHS will route to shard0000 and queries related to BIOLOGY will route to shard0001, hence the load will be distributed as per the configuration of the shard, keeping the database performance optimized.

Sharding MongoDB using zones is a great feature provided by MongoDB. With the help of zones, data can be isolated to the specific shards. Or if we have any kind of hardware or configuration restrictions to the shards, it is a possible solution for routing the operations.

The post Zone Based Sharding in MongoDB appeared first on Percona Database Performance Blog.


Percona Live Europe: Tutorials Day

Percona Live Tutorials

Percona Live Europe TutorialsWelcome to the first day of the Percona Live Open Source Database Conference Europe 2017: Tutorials day! Technically the first day of the conference, this day focused on provided hands-on tutorials for people interested in learning directly how to use open source tools and technologies.

Today attendees went to training sessions taught by open source database experts and got first-hand experience configuring, working with, and experimenting with various open source technologies and software.

The first full day (which includes opening keynote speakers and breakout sessions) starts Tuesday 9/26 at 9:15 am.

Some of the tutorial topics covered today were:

Percona Live Europe TutorialsMonitoring MySQL Performance with Percona Monitoring and Management (PMM)

Michael Coburn, Percona

This was a hands-on tutorial covering how to set up monitoring for MySQL database servers using the Percona Monitoring and Management (PMM) platform. PMM is an open-source collection of tools for managing and monitoring MySQL and MongoDB performance. It provides thorough time-based analysis for database servers to ensure that they work as efficiently as possible.

We learned about:

  • The best practices on MySQL monitoring
  • Metrics and time series
  • Data collection, management and visualization tools
  • Monitoring deployment
  • How to use graphs to spot performance issues
  • Query analytics
  • Alerts
  • Trending and capacity planning
  • How to monitor HA

Percona Live Europe TutorialsHands-on ProxySQL

Rene Cannao, ProxySQL

ProxySQL is an open source proxy for MySQL that can provide HA and high performance with no changes in the application, using several built-in features and integration with clustering software. Those were only a few of the features we learned about in this hands-on tutorial.

Percona Live Europe TutorialsMongoDB: Sharded Cluster Tutorial

Jason Terpko, ObjectRocket
Antonios Giannopoulos, ObjectRocket

This tutorial guided us through the many considerations when deploying a sharded cluster. It covered the services that make up a sharded cluster, configuration recommendations for these services, shard key selection, use cases, and how data is managed within a sharded cluster. Maintaining a sharded cluster also has its challenges. We reviewed these challenges and how you can prevent them with proper design or ways to resolve them if they exist today.

Percona Live Europe TutorialsInnoDB Architecture and Performance Optimization

Peter Zaitsev, Percona

InnoDB is the most commonly used storage engine for MySQL and Percona Server for MySQL. It is the focus of most of the storage engine development by the MySQL and Percona Server for MySQL development teams.

In this tutorial, we looked at the InnoDB architecture, including new feature developments for InnoDB in MySQL 5.7 and Percona Server for MySQL 5.7. Peter explained how to use InnoDB in a database environment to get the best application performance and provide specific advice on server configuration, schema design, application architecture and hardware choices.

Peter updated this tutorial from previous versions to cover new MySQL 5.7 and Percona Server for MySQL 5.7 InnoDB features.

Join us tomorrow for the first full day of the Percona Live Open Source Database Conference Europe 2017!

Powered by WordPress | Theme: Aeros 2.0 by