Oct
05
2021
--

Configuring a MongoDB Sharded Cluster with PMM2 – Part 2

Configure MongoDB Sharded Cluster

Configure MongoDB Sharded ClusterAs a DBA, it is important to monitor a database to help us troubleshoot or to understand the health of an instance. Percona Monitoring and Management (PMM v2) is open-source and does a great job in monitoring the databases like MongoDB, MySQL, PostgreSQL, etc.

In this blog post, we will see how to configure a sharded cluster in PMM2. This is a part two version of the previous one which was done with PMM v1, titled Configuring PMM Monitoring for MongoDB Cluster. I have listed the steps to configure the sharded cluster into PMM2 below:

Prepare DB for Monitoring

Before configuring with PMM2, we will need to create a USER for monitoring from the database side. If you need to enable QAN (query analytics), then you will need to enable profiler and some more custom permission like “explainRole”  to the user as well. Adding profiler adds up some more little load to the database, so it is better you do prior tests to analyze the load if you want to assess the extra load.

  1. Add PMM Users to the DB

// Change role name / user / password as required

db.getSiblingDB("admin").createRole({
    role: "explainRole",
    privileges: [{
        resource: {
            db: "",
            collection: ""
            },
        actions: [
            "listIndexes",
            "listCollections",
            "dbStats",
            "dbHash",
            "collStats",
            "find"
            ]
        }],
    roles:[]
})


db.getSiblingDB("admin").createUser({
   user: "pmm_mongodb",
   pwd: "password",
   roles: [
      { role: "explainRole", db: "admin" },
      { role: "clusterMonitor", db: "admin" },
      { role: "read", db: "local" }
   ]
})

  1. Enabling Profiler

This is optional. Run the instance with the profiler or add profiling at the database level to monitor queries in QAN (not applicable for mongos).

To start at the instance level (enables profiling for all databases):

mongod <other options> --profile 2 --slowms 200 --rateLimit 100

or in mongod.conf:

operationProfiling:
  mode: all
  slowOpThresholdMs: 200
# (Below variable is available only with Percona Server for MongoDB.)
  rateLimit: 100

To enable p[rofiling at DB level:

use dbname
db.setProfilingLevel(2)

  1. Add MongoDB Instance to the pmm-client

Here use the same –cluster option name for all members from the same cluster and provide service-name to identify it:

sudo pmm-admin add mongodb \
--username=pmm_mongodb --password=password \
--query-source=profiler \
--cluster=mycluster \
--service-name=myc_mongoc2 \
--host=127.0.0.1 --port=37061

  1. Check the Inventory Service

Then check whether the service was added successfully or not:

$ sudo pmm-admin list
Service type        Service name                   Address and port        Service ID
MongoDB             myc_mongoc2                    127.0.0.1:37061         /service_id/02e261a1-e8e0-4eb4-8043-8616424500de

Agent type                    Status           Metrics Mode        Agent ID                                              Service ID
pmm_agent                     Connected                            /agent_id/281b4046-4f4b-4897-bd2e-b771d3e97922         
node_exporter                 Running          push                /agent_id/5e9b17a8-ecb9-47c3-8477-ce322047c4d9         
mongodb_exporter              Running          push                /agent_id/0067dd85-9a0a-47dd-976e-ae779deb982b        /service_id/5c92f132-3005-45ab-84df-7541c286c34a
mongodb_profiler_agent        Running                              /agent_id/18d3d87a-9bb9-48c1-8e3e-d8bae3f043bb        /service_id/02e261a1-e8e0-4eb4-8043-8616424500de

From My Test

I used localhost to deploy the sharded cluster for the testing purpose as below:

Members list:

1 mongos (37050), 
3 shards consist of 3 member replicaSet each (37051-37059), 
3 config members(37060-37062)

Listing one mongod instance from the ps command:

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ ps -ef | grep mongod -w | head -1
balaguru   41883    2846  1 13:01 ?        00:04:04 mongod --replSet configRepl --dbpath /home/balaguru/mongodb/testshard/data/configRepl/rs1/db --logpath /home/balaguru/mongodb/testshard/data/configRepl/rs1/mongod.log --port 37060 --fork --configsvr --wiredTigerCacheSizeGB 1 --profile 2 --slowms 200 --rateLimit 100 --logappend

Adding mongodb services to pmm-admin:

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \
--query-source=profiler --cluster=mycluster --service-name=myc_s11 --host=127.0.0.1 --port=37051
MongoDB Service added.
Service ID  : /service_id/cc6b3fed-ee16-494e-93f0-0d2e8f60a136
Service name: myc_s11--host=127.0.0.1

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \
--query-source=profiler --cluster=mycluster --service-name=myc_s12 --host=127.0.0.1 --port=37052
MongoDB Service added.
Service ID  : /service_id/235494d8-aaee-4ca0-bd3a-bf2259e87ecc
Service name: myc_s12

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \
--query-source=profiler --cluster=mycluster --service-name=myc_s13 --host=127.0.0.1 --port=37053
MongoDB Service added.
Service ID  : /service_id/55261675-41e7-40f1-95c9-08cac25c4f64
Service name: myc_s13

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \
--query-source=profiler --cluster=mycluster --service-name=myc_s21 --host=127.0.0.1 --port=37054
MongoDB Service added.
Service ID  : /service_id/5c92f132-3005-45ab-84df-7541c286c34a
Service name: myc_s21

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \
--query-source=profiler --cluster=mycluster --service-name=myc_s22 --host=127.0.0.1 --port=37055
MongoDB Service added.
Service ID  : /service_id/4de07a5b-5a47-4126-8824-80570bd72cef
Service name: myc_s22--host=127.0.0.1

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \
--query-source=profiler --cluster=mycluster --service-name=myc_s23 --host=127.0.0.1 --port=37056
MongoDB Service added.
Service ID  : /service_id/7bdaaa72-6e00-4f46-a2a9-5205d5f3fff5
Service name: myc_s23

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \
--query-source=profiler --cluster=mycluster --service-name=myc_s31 --host=127.0.0.1 --port=37057
MongoDB Service added.
Service ID  : /service_id/2028e075-bc65-4aae-bcdd-ec616b36e81b
Service name: myc_s31

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \
--query-source=profiler --cluster=mycluster --service-name=myc_s32 --host=127.0.0.1 --port=37058
MongoDB Service added.
Service ID  : /service_id/7659231c-f48f-4a65-b651-585ac1f058cd
Service name: myc_s32

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \
--query-source=profiler --cluster=mycluster --service-name=myc_s33 --host=127.0.0.1 --port=37059
MongoDB Service added.
Service ID  : /service_id/2c224eaf-c0f1-482b-b23c-8ea4b914c8e5
Service name: myc_s33

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \
--query-source=profiler --cluster=mycluster --service-name=myc_mongoc1 --host=127.0.0.1 --port=37060
MongoDB Service added.
Service ID  : /service_id/09e95cc5-40b7-4a53-9e35-2937ca23395f
Service name: myc_mongoc1

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \
--query-source=profiler --cluster=mycluster --service-name=myc_mongoc2 --host=127.0.0.1 --port=37061
MongoDB Service added.
Service ID  : /service_id/02e261a1-e8e0-4eb4-8043-8616424500de
Service name: myc_mongoc2

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \
--query-source=profiler --cluster=mycluster --service-name=myc_mongoc3 --host=127.0.0.1 --port=37062
MongoDB Service added.
Service ID  : /service_id/421449d9-8ada-46dd-9c8a-84c0847a8742
Service name: myc_mongoc3

Listing the services added:

balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin list
Service type        Service name                   Address and port        Service ID
MongoDB             myc_mongoc2                    127.0.0.1:37061         /service_id/02e261a1-e8e0-4eb4-8043-8616424500de
MongoDB             myc_mongoc1                    127.0.0.1:37060         /service_id/09e95cc5-40b7-4a53-9e35-2937ca23395f
MongoDB             myc_s31                        127.0.0.1:37057         /service_id/2028e075-bc65-4aae-bcdd-ec616b36e81b
MongoDB             myc_s12                        127.0.0.1:37052         /service_id/235494d8-aaee-4ca0-bd3a-bf2259e87ecc
MongoDB             myc_s33                        127.0.0.1:37059         /service_id/2c224eaf-c0f1-482b-b23c-8ea4b914c8e5
MongoDB             myc_mongos                     127.0.0.1:37050         /service_id/3f4f56be-6259-4579-88b7-bb4d0c29204b
MongoDB             myc_mongoc3                    127.0.0.1:37062         /service_id/421449d9-8ada-46dd-9c8a-84c0847a8742
MongoDB             myc_s22                        127.0.0.1:37055         /service_id/4de07a5b-5a47-4126-8824-80570bd72cef
MongoDB             myc_s13                        127.0.0.1:37053         /service_id/55261675-41e7-40f1-95c9-08cac25c4f64
MongoDB             myc_s21                        127.0.0.1:37054         /service_id/5c92f132-3005-45ab-84df-7541c286c34a
MongoDB             myc_s32                        127.0.0.1:37058         /service_id/7659231c-f48f-4a65-b651-585ac1f058cd
MongoDB             myc_s23                        127.0.0.1:37056         /service_id/7bdaaa72-6e00-4f46-a2a9-5205d5f3fff5
MongoDB             myc_s11                        127.0.0.1:37051         /service_id/cc6b3fed-ee16-494e-93f0-0d2e8f60a136

Agent type                    Status           Metrics Mode        Agent ID                                              Service ID
pmm_agent                     Connected                            /agent_id/281b4046-4f4b-4897-bd2e-b771d3e97922         
node_exporter                 Running          push                /agent_id/5e9b17a8-ecb9-47c3-8477-ce322047c4d9         
mongodb_exporter              Running          push                /agent_id/0067dd85-9a0a-47dd-976e-ae779deb982b        /service_id/5c92f132-3005-45ab-84df-7541c286c34a 
mongodb_exporter              Running          push                /agent_id/071ec1ae-ff35-4fa1-a4c9-4d5bca705131        /service_id/09e95cc5-40b7-4a53-9e35-2937ca23395f 
mongodb_exporter              Running          push                /agent_id/5e045290-36c2-410b-86e9-b4945cd7ecfb        /service_id/3f4f56be-6259-4579-88b7-bb4d0c29204b 
mongodb_exporter              Running          push                /agent_id/6331b519-da6e-47c0-be7e-92f2ac142fa5        /service_id/2c224eaf-c0f1-482b-b23c-8ea4b914c8e5 
mongodb_exporter              Running          push                /agent_id/6ce78e1c-be6a-4ffd-844b-8afdc0ee5700        /service_id/235494d8-aaee-4ca0-bd3a-bf2259e87ecc 
mongodb_exporter              Running          push                /agent_id/6ed1bcc2-3561-4c65-95e1-11b3cc051194        /service_id/cc6b3fed-ee16-494e-93f0-0d2e8f60a136 
mongodb_exporter              Running          push                /agent_id/7721bd24-7408-431d-abcb-3239459df75a        /service_id/7659231c-f48f-4a65-b651-585ac1f058cd 
mongodb_exporter              Running          push                /agent_id/999c0152-656e-4941-a1fb-003df2dbfbf6        /service_id/2028e075-bc65-4aae-bcdd-ec616b36e81b 
mongodb_exporter              Running          push                /agent_id/9e63f2d9-7e75-45ee-927d-b1406d4797e0        /service_id/55261675-41e7-40f1-95c9-08cac25c4f64 
mongodb_exporter              Running          push                /agent_id/ca3ab511-29eb-4c68-b037-23ab13fa92ff        /service_id/4de07a5b-5a47-4126-8824-80570bd72cef 
mongodb_exporter              Running          push                /agent_id/cd1066eb-f917-4d7e-b284-8d8a8bc7c652        /service_id/7bdaaa72-6e00-4f46-a2a9-5205d5f3fff5 
mongodb_exporter              Running          push                /agent_id/e2ef230a-d84b-428c-921b-b6da7c3180f3        /service_id/421449d9-8ada-46dd-9c8a-84c0847a8742 
mongodb_exporter              Running          push                /agent_id/e3f7ba25-6592-4cb4-aae6-7431b3b6a6da        /service_id/02e261a1-e8e0-4eb4-8043-8616424500de 
mongodb_profiler_agent        Running                              /agent_id/18d3d87a-9bb9-48c1-8e3e-d8bae3f043bb        /service_id/02e261a1-e8e0-4eb4-8043-8616424500de 
mongodb_profiler_agent        Running                              /agent_id/1cf5ee8a-b5b5-4133-896c-fafccc164f54        /service_id/5c92f132-3005-45ab-84df-7541c286c34a 
mongodb_profiler_agent        Running                              /agent_id/4b13cc24-fbd2-47cc-955d-c2a65624d2be        /service_id/55261675-41e7-40f1-95c9-08cac25c4f64 
mongodb_profiler_agent        Running                              /agent_id/4de795cf-f047-49e6-a3bc-dc2ab1b2bc86        /service_id/cc6b3fed-ee16-494e-93f0-0d2e8f60a136 
mongodb_profiler_agent        Running                              /agent_id/89ae83c7-e62c-48f6-9e8c-597ce978c8ce        /service_id/4de07a5b-5a47-4126-8824-80570bd72cef 
mongodb_profiler_agent        Running                              /agent_id/98343388-a246-4767-8838-ded8f8de5191        /service_id/235494d8-aaee-4ca0-bd3a-bf2259e87ecc 
mongodb_profiler_agent        Running                              /agent_id/a5df9e6b-037e-486a-bc95-afe20095cf98        /service_id/7bdaaa72-6e00-4f46-a2a9-5205d5f3fff5 
mongodb_profiler_agent        Running                              /agent_id/a6bda9b4-989a-427b-ae64-5deffc2b9ba2        /service_id/7659231c-f48f-4a65-b651-585ac1f058cd 
mongodb_profiler_agent        Running                              /agent_id/c59c40ca-63ee-4497-b297-403faa9d4ec0        /service_id/2c224eaf-c0f1-482b-b23c-8ea4b914c8e5 
mongodb_profiler_agent        Running                              /agent_id/c7f84a08-4823-455b-93a3-168eee19329b        /service_id/3f4f56be-6259-4579-88b7-bb4d0c29204b 
mongodb_profiler_agent        Running                              /agent_id/e85d0757-7542-4b38-bfed-81ded8bf309c        /service_id/421449d9-8ada-46dd-9c8a-84c0847a8742 
mongodb_profiler_agent        Running                              /agent_id/ed81849a-6fc9-46f3-a5dc-e6c288409009        /service_id/09e95cc5-40b7-4a53-9e35-2937ca23395f 
mongodb_profiler_agent        Running                              /agent_id/f9d26161-4827-4bed-a85f-cbe3ce9478ab        /service_id/2028e075-bc65-4aae-bcdd-ec616b36e81b 
vmagent                       Running          push                /agent_id/a662e1f6-31d3-4514-8f83-ea31e0165d61

PMM Dashboards

From PMM Dashboards, you can then view the replSet summary as well as the sharded cluster summary.

Cluster Summary

This dashboard gives information about the sharded/unsharded databases, shards, chunks, cursor details, etc.

Cluster Summary

 

ReplSet Summary:

This dashboard tells about the replication information like replica lag, operations, heartbeat, ping time, etc.

ReplSet Summary

 

MongoDB Instance Overview:

This is the general dashboard for a MongoDB instance which provides generic information about the connections, memory usage, latency, etc

MongoDB Instance Overview

 

WiredTiger Details:

This is the main dashboard that you’ll need most to analyze the problems here as it shows the wiredTiger information. The main metrics that you need to monitor here are the WT cache utilization, evictions of modified or unmodified pages, write/read tickets utilization, index/objects scans, etc.

WiredTiger Details

 

QAN:

If you enable the profiling, then you could see the queries used in the database here. You can filter them easily as shown in the screenshot below. Also, you can get the explain plan to check whether they utilize the COLLSCAN (disk reads) or IXSCAN (uses index). Also, you can check the counts, load, etc.

QAN

 

Conclusion

As said, Percona Monitoring and Management 2 is very easy to configure to monitor the databases and it is recommended too. It’s better now rather than late to configure the monitoring. PMM2 is managed by Percona which is totally free and you can raise any bugs here – https://jira.percona.com/. If you have doubts, you can leave your questions here – https://forums.percona.com.

Complete the 2021 Percona Open Source Data Management Software Survey

Have Your Say!

Sep
28
2021
--

Horizontal Scaling in MySQL – Sharding Followup

Horizontal Scaling in MySQL Sharding

Horizontal Scaling in MySQL ShardingIn a previous post, A Horizontal Scalability Mindset for MySQL, I discussed the concerns around growing individual MySQL instances too large and some basic strategies:

  • Optimizing/minimizing size with proper data types
  • Removing unused/duplicate indexes
  • Keeping your Primary Keys small
  • Pruning data

Finally, if those methods have been exhausted, I touched on horizontal sharding as the approach to keep individual instances at a reasonable size. When discussing my thoughts across our internal teams, there was lots of feedback that we needed to dive into the sharding topic in more detail. This post aims to give more theory and considerations around sharding along with a lightweight ProxySQL sample implementation.

What is Sharding?

Sharding is a word that is frequently used but is often misunderstood. Classic vertical scaling (often just called “scaling”) is when you increase the resources on a single server to handle more data and/or load. Horizontal scaling (aka sharding) is when you actually split your data into smaller, independent buckets and keep adding new buckets as needed.

A sharded environment has two or more groups of MySQL servers that are isolated and independent from each other. While specific implementation details are dependent on your environment, here are some general guidelines:

  • A shard can be single server or replication hierarchy
  • Each shard has identical schemas – you can run the same query pattern against any shard and expect to get a result set (of course varying the WHERE clause of the identifier)
  • A given identifier belongs to exactly one shard at a time – you should NOT have a Customer ID spanning shards (unless only temporarily during migration between shards for example – this is an edge case that requires special handling)

With this understanding of sharding, let’s look at some of the key considerations and challenges for actual implementation.

Sharding Considerations and Challenges

While sharding often becomes the only way to manage exploding data, most experts and architects advise that it is delayed as long as possible. If it is such a great way to handle data, then why is that? The primary reason is complexity. While vertical scaling is trivial (just add more hardware and make no other changes), sharding requires significant thought and planning. The main challenges fall into a few primary buckets:

  • How should I split my data?
  • What if my data spans shards?
  • How do I find the data I need?

Each bucket has its own set of challenges, but each can also be an overall blocker when trying to implement sharding.

How Do I Split My Data?

Picking the shard key is the MOST IMPORTANT part of implementing sharding. The main concept is to pick an identifier that is shared across records, such as CustomerID, ClientID, or UserID. Generally, you’ll want to pick the smallest unit possible (generally at the UserID level), but this also depends on what level of aggregation is needed (more on this later). The risk here is that the key isn’t selective enough and you end up not splitting the data evenly. However, if you are too selective, then you risk breaking up units that logically should remain together (think sharding on ForumPostID vs UserID).

What if My Data Spans Shards?

This is often a deciding factor that prevents an organization from sharding. Splitting the data and retrieving individual records, while not trivial, is relatively straightforward. In many workloads, this is enough. Locate my small subset of data and fetch it. When the use case dictates aggregate queries, however, this workflow breaks down. Take this common query to find the newest users across the platform:

SELECT user_id, date_created FROM users ORDER BY date_created DESC LIMIT 10;

When all the users are stored in a single server, this query is trivial. However, when the data is split across multiple shards, we now need a way to aggregate this data. Implementing this is outside the scope of this post, but the general approach looks something like this:

  1. Issue the query on every shard
  2. Collect the results from each shard in a central aggregation service
  3. Combine/filter the individual results into a single result set

If your use case relies heavily on aggregate queries, implementing this aggregation layer is critical and often very complicated. This is also the use case that discourages so many from sharding, even when it is necessary.

In an ideal scenario, the active OLTP workload can be contained within individual shards. Reporting and data warehousing requires the implementation of distributed queries which can be managed through different systems, or via custom aggregation using multi-source MySQL asynchronous replication.

How Do I Find My Data?

Now, assuming that you have chosen a sharding key, you need to be able to retrieve records when needed. This is the next major hurdle that needs to be overcome. There are two distinct components to this challenge:

  • Determining which shard holds the data
  • How to connect to that shard from my application

Finding the ShardID

Determining which shard holds that data can be as simple or complex as you want. However, with each approach, there are tradeoffs. The simple approach using a simple hash/modulus to determine the shard looks something like this:

shardID = identifier % numShards
return shardID

The most basic example would be sharding by userID across 2 shards. UserIDs that are even would be on shard 0 and odd userIDs would be on shard 1. While incredibly simplistic, what do I do when 2 shards aren’t enough? The goal of this architecture is to just add more shards when individual shards get too big. So now, when I add new shards, the number of shards increases so the shardID may change for many of the records.

A more flexible approach would be to use a directory service. In this approach, there is a very simple datastore that maps an identifier to a shard. To determine which shard holds the data, you simply query the datastore with the identifier and it returns the shardID. This gives incredible flexibility for moving data between shards, but also adds complexity to the system. Naturally, this service itself would need to be highly available, but doesn’t have to be in MySQL. In addition to complexity, you have potentially introduced additional latency to the system with an extra lookup.

Connecting to the Shard

Once we have the proper shardID, we need to connect to the database cluster in question. Again, we have options and the implementation is dependent on the application. We can look at one of two main approaches:

  • Use a different connection string for each shard
  • Leverage a proxy layer that routes intelligently

Using a different connection string is generally self-explanatory: I have a collection of shardIDs and their corresponding connection strings. When I know the shardID, I then fetch the connection string, connect to the cluster, and away we go.

But we know developers don’t want to (and shouldn’t) mess with connection strings and credentials. So a proxy layer that can route the query based on the query itself or some metadata in the query is ideal. Being a layer 7 service, ProxySQL is able to inspect the queries themselves and make decisions based on defined rules. While definitely not the only way, let’s look at a sample ProxySQL implementation that uses query comments to route to the correct shard.

Sample ProxySQL Implementation

For our example, here is the basic architecture:

  • Sharding users by UserID
  • Directory service that does simple routing based on modulus
  • ProxySQL is already used for HA
  • Data is split across 3 shards

A typical user query may look something like this:

SELECT * 
FROM users
WHERE user_id = 5

With UserID being the shard key and simple modulus sharding strategy, we can identify the shardID easily for this query:

Identifier % numShards = shardID
5 % 3 = 2

With the shardID defined, we can now inject a comment into the original query that can be used by ProxySQL to properly route the query to the correct shard:

SELECT /* shard=shard_2 */ 
*
FROM users
WHERE user_id = 5

ProxySQL uses the concept of hostgroups to define routing internally. In many HA cases, this is simply a reader and writer hostgroup. For the sharding architecture, however, we can expand that define hostgroups as actual shards:

INSERT INTO mysql_servers (hostgroup_id, hostname, comment) 
VALUES 
(1,'10.0.0.1','Shard 0'),
(2,'10.0.0.2','Shard 1')
(3,'10.0.0.3','Shard 2');

With the hostgroups in place, we just need to add a ProxySQL rule to choose the hostgroup based on the injected comment:

INSERT INTO mysql_query_rules (rule_id,active,match_pattern,destination_hostgroup,apply)
VALUES
(5,1,'\S*\s*\/\*\s*shard=shard_0\s*\*\/\S*\s*',1,1),
(6,1,'\S*\s*\/\*\s*shard=shard_1\s*\*\/\S*\s*',2,1),
(7,1,'\S*\s*\/\*\s*shard=shard_2\s*\*\/\S*\s*',3,1);

Now, after we issue a few queries, you can see in the ProxySQL stats that we are matching the shardID and routing accordingly:

*************************** 1. row ***************************
active: 1
hits: 3
rule_id: 6
match_digest: NULL
match_pattern: \S*\s*\/\*\s*shard=shard_1\s*\*\/\S*\s*
replace_pattern: NULL
cache_ttl: NULL
apply: 1
flagIN: 0
flagOUT: NULL
*************************** 2. row ***************************
active: 1
hits: 5
rule_id: 7
match_digest: NULL
match_pattern: \S*\s*\/\*\s*shard=shard_2\s*\*\/\S*\s*
replace_pattern: NULL
cache_ttl: NULL
apply: 1
flagIN: 0
flagOUT: NULL

Obviously, this is a massive simplification and you’d want rules that are more generic and more complex hostgroups. For example, your deployment would need to account for queries where no shard hint is provided or the case where an invalid shardID is supplied. However, it displays the potential of writing a custom routing layer within the very lightweight tool ProxySQL. This approach, while it requires some additional thought and logic, could be a viable first step towards splitting your data horizontally.

Again, this is just a sample proof-of-concept implementation. Each use case is different. The main takeaway from this post is that manual sharding is possible, but requires significant planning and implementation work.

Percona Distribution for MySQL is the most complete, stable, scalable, and secure, open-source MySQL solution available, delivering enterprise-grade database environments for your most critical business applications… and it’s free to use!

Download Percona Distribution for MySQL Today

Sep
10
2021
--

Finding Undetected Jumbo Chunks in MongoDB

Jumbo Chunks in MongoDB

Jumbo Chunks in MongoDBI recently came across an interesting case of performance issues during balancing in a MongoDB cluster. Digging through the logs, it became clear the problem was related to chunk moves taking a long time.

As we know, the default maximum chunk size is 64 MB. So these migrations are supposed to be very fast in most of the hardware in use nowadays.

In this case, there were several chunks way above that limit being moved around. How can this happen? Shouldn’t those chunks have been marked as Jumbo?

Recap on Chunk Moves

For starters, let’s review what we know about chunk moves.

MongoDB does not move a chunk if the number of documents in it is greater than 1.3 times the result of dividing the configured chunk size (default 64 MB) by the average document size.

If we have a collection with an average document size of 2 KB, a chunk with more than 42597 (65535 / 2 * 1.3) documents won’t be balanced. This might be an issue for collections with non-uniform document sizes, as the estimation won’t be very good in that case.

Also, if a chunk grows past the maximum size, it is flagged as Jumbo. This means the balancer will not try to move it. You can check out the following article for more information about dealing with jumbo chunks.

Finally, we must keep in mind that chunk moves take a metadata lock at a certain part of the process, so write operations are blocked momentarily near the end of a chunk move.

The autoSplitter in Action

Before MongoDB 4.2, the autoSplitter process was responsible for ensuring chunks do not exceed the maximum size runs on the mongos router. The router process keeps some statistics about the operations it performs. These statistics are consulted to make chunk-splitting decisions.

The problem is that in a production environment, there are usually many of these processes deployed. An individual mongos router only has a partial idea of what’s happening with a particular chunk of data.

If multiple mongos routers change data on the same chunk, it could grow beyond the maximum size and still go unnoticed.

There are many reported issues related to this topic (for example, see SERVER-44088, SERVER-13806, and SERVER-16715).

Frequently restarting mongos processes could also lead to the same problem. This causes the statistics used to make chunk-splitting decisions to be reset.

Because of these issues, the autoSplitter process was changed to run on the primary member of each shard’s replica set. This happened in MongoDB version 4.2 (see SERVER-9287 for more details).

The process should better prevent jumbo chunks now, as each shard has a more accurate view of what data it contains (and how it is being changed).

The MongoDB version, in this case, was 3.6, and the autoSplitter was not doing as good a job as expected.

Finding Undetected Jumbo Chunks in MongoDB

For starters, I came across a script to print a summary of the chunks statistics here. We can build upon it to also display information about Jumbo chunks and generate the commands required to manually split any chunks exceeding the maximum size.

var allChunkInfo = function(ns){
    var chunks = db.getSiblingDB("config").chunks.find({"ns" : ns}).sort({min:1}).noCursorTimeout(); //this will return all chunks for the ns ordered by min
    var totalChunks = 0;
    var totalJumbo = 0;
    var totalSize = 0;
    var totalEmpty = 0;
 
    chunks.forEach( 
        function printChunkInfo(chunk) { 
          var db1 = db.getSiblingDB(chunk.ns.split(".")[0]) // get the database we will be running the command against later
          var key = db.getSiblingDB("config").collections.findOne({_id:chunk.ns}).key; // will need this for the dataSize call
         
          // dataSize returns the info we need on the data, but using the estimate option to use counts is less intensive
          var dataSizeResult = db1.runCommand({datasize:chunk.ns, keyPattern:key, min:chunk.min, max:chunk.max, estimate:false});
         
          if(dataSizeResult.size > 67108864) {
            totalJumbo++;
            print('sh.splitFind("' + chunk.ns.toString() + '", ' + JSON.stringify(chunk.min) + ')' + ' // '+  chunk.shard + '    ' + Math.round(dataSizeResult.size/1024/1024) + ' MB    ' + dataSizeResult.numObjects );
          }
          totalSize += dataSizeResult.size;
          totalChunks++;
          
          if (dataSizeResult.size == 0) { totalEmpty++ }; //count empty chunks for summary
          }
    )
    print("***********Summary Chunk Information***********");
    print("Total Chunks: "+totalChunks);
    print("Total Jumbo Chunks: "+totalJumbo);
    print("Average Chunk Size (Mbytes): "+(totalSize/totalChunks/1024/1024));
    print("Empty Chunks: "+totalEmpty);
    print("Average Chunk Size (non-empty): "+(totalSize/(totalChunks-totalEmpty)/1024/1024));
}

The script has to be called from a mongos router as follows:

mongos> allChunkInfo("db.test_col")

And it will print any commands to perform chunk splits as required:

sh.splitFind("db.test_col", {"_id":"jhxT2neuI5fB4o4KBIASK1"}) // shard-1    222 MB    7970
sh.splitFind("db.test_col", {"_id":"zrAESqSZjnpnMI23oh5JZD"}) // shard-2    226 MB    7988
sh.splitFind("db.test_col", {"_id":"SgkCkfSDrY789e9nD4crk9"}) // shard-1    218 MB    7986
sh.splitFind("db.test_col", {"_id":"X5MKEH4j32OhmAhY7LGPMm"}) // shard-1    238 MB    8338
...
***********Summary Chunk Information***********
Total Chunks: 5047
Total Jumbo Chunks: 120
Average Chunk Size (Mbytes): 19.29779934868946
Empty Chunks: 1107
Average Chunk Size (non-empty): 24.719795257064895

You can see the script prints the commands to split the jumbo chunks, along with the shard where each one lives, the size of the chunk, and the number of documents on it. At the end, summary information is also displayed.

The estimate: true option in the dataSize command causes it to use the average document size of the collection for the estimation. This is not very accurate if the document size has many variances but runs faster and is less resource-intensive. Consider setting that option if you know your document size is more or less uniform.

All that is left is to run the commands generated by the script, and the jumbo chunks should be gone. The balancer will then move chunks during the next balancing window as required to even out the data. Consider scheduling the balancing window outside the peak workload hours to reduce the impact.

Conclusion

Even though MongoDB 3.6 has been EOL for some time now, many people are still running it, mostly due to outdated drivers on the application side that are not compatible with newer releases. Nowadays, the autoSplitter should be doing a better job of keeping chunk size at bay.

If you are still running a MongoDB version older than 4.2, it would be a good idea to review your chunk stats from time to time to avoid some nasty surprises.

Percona Distribution for MongoDB is a freely available MongoDB database alternative, giving you a single solution that combines the best and most important enterprise components from the open source community, designed and tested to work together.

Download Percona Distribution for MongoDB Today!

Aug
24
2021
--

Resharding in MongoDB 5.0

Resharding in MongoDB 5.0

Resharding in MongoDB 5.0MongoDB 5.0 has been released with a bunch of new features. An important one is the capability to completely redefine the shard key of a collection in a sharded cluster. Resharding was a feature frequently requested by the customers. In MongoDB 4.4 only the refining of the shard was available; from now on you can change your shard key using also different fields.

When you use sharding in MongoDB and you fail to create the shard key of a collection, you can face issues like:

  • Unbalanced distribution of the data
  • Having jumbo chunks that cannot be split and cannot be migrated

These issues can simply make your cluster very slow, making some of the shards extremely overloaded. Also, the cluster can be really unmanageable and crashes could happen at some point.

The Problem of Changing a Shard Key

I had a customer running a 5-shard cluster with more than 50TB of data. They had a few collections with sub-optimal shard keys. They had one of the shards having more than 75% of the data and workload, and they had thousands of jumbo chunks around. Some of the jumbo chunks reached 200GB in size; really a lot. The cluster was terribly slow, chunk migration attempts were a lot and they failed very frequently, and the application was most of the time unresponsive due to timeouts. Manual splits and migrations of jumbo chunks were impossible. The only solution was to create a different shard key for those collections, but this wasn’t possible with that MongoDB version. The only suggested way to change a shard key was to drop and recreate the collection. Yes, remember also to dump and reload the data. This requires you to stop your application for an impressive amount of time if the dataset is large. That was the case.

In order to avoid stopping the application, we deployed a new empty cluster with a few more shards as well. We created all the empty collections in the new cluster with the optimal shard key. Then we moved the data out of the old cluster with custom scripts, one piece at the time, using boundaries on indexed timestamp fields. The solution required the customer to deploy some application changes. The application was instructed to read/write from a different cluster depending if the affected data was already migrated into the new cluster or not. The entire migration process took weeks to complete. In the end, it worked and the customer now has a balanced cluster with no jumbo chunks.

Anyway, the solution came at some cost in terms of new machines, development effort, and time.

Fortunately, MongoDB 5.0 introduced the reshardCollection command. From now on changing a shard key should be less expensive than in the past. That’s awesome.

In this article, we’ll take a look at how resharding in MongoDB 5.0 works.

Internals

The way resharding works is simple. When you issue the reshardCollection command, a new implicit empty collection is created by MongoDB with the new shard key defined, and the data will be moved from the existing collection chunk by chunk. There is a two-second lock required by the balancer to determine the new data distribution, but during the copy phase, the application can continue to read and write the collection with no issues.

Due to this copy phase, the larger the collection is, the more time the resharding takes.

Prerequisites for Resharding

There are some requirements you need to validate before starting the resharding, and some could be expensive, in certain cases.

  • Disk space: be sure to have at least 1.2x the size of the collection you’re going to reshard. If the collection is 1TB, you need at least 1.2TB free space in your disk.
  • I/O capacity should be below 50%
  • CPU load should be below 80%

The resources available must be evaluated on each shard.

If you fail to provide these resources the database will run out of space or the resharding could take longer than expected.

Another important requirement is that you will need to deploy changes to your application queries. To let the database work best during the sharding process, the queries must use filters on both the current shard key and the new shard key. Only at the end of the resharding process you may drop all the filters regarding the old shard key out of your queries.

This requirement is very important and in some cases it could be a little expensive. Temporarily changing the application code is sometimes a hard task.

There also other limitations you need to consider:

  • Resharing is not permitted if an index built is running
  • Some queries will return error if you didn’t include both the current and new shard keys: deleteOne(), findAnd Modify(), updateOne() … check the manual for the full list
  • You can reshard only one collection at a time
  • You cannot use addShard(), removeShard(), dropDatabase(), db.createCollection() while resharding is running
  • The new shard key cannot have a uniqueness constraint
  • Resharding a collection with a uniqueness constraint is not supported

Let’s Test Resharding in MongoDB

To test the new feature, I created a sharded cluster using AWS instances, t2-large with 2 CPUs, 8 GB RAM, and EBS volume. I deployed a two-shard cluster using just a single member for each replica set.

Let’s create a sharded collection and insert some sample data.

At the beginning we create the shard key on the age field. To create the random data, we use some functions and a javascript loop to insert one million documents.

[direct: mongos] testdb> sh.shardCollection("testdb.testcoll", { age: 1} )
{
  collectionsharded: 'testdb.testcoll',
  ok: 1,
  '$clusterTime': {
    clusterTime: Timestamp({ t: 1628588848, i: 25 }),
    signature: {
      hash: Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
      keyId: Long("0")
    }
  },
  operationTime: Timestamp({ t: 1628588848, i: 21 })
}

[direct: mongos] testdb> function getRandomInt(min, max) {
...     return Math.floor(Math.random() * (max - min + 1)) + min;
... }
[Function: getRandomInt]

[direct: mongos] testdb> var day = 1000 * 60 * 60 * 24;

[direct: mongos] testdb> function getRandomDate() {
... return new Date(Date.now() - (Math.floor(Math.random() * day)))
... }
[Function: getRandomDate]

[direct: mongos] testdb> function getRandomString() {
... return (Math.random()+1).toString(36).substring(2)
... }
[Function: getRandomString]

[direct: mongos] testdb> for (var i=1; i<=1000000; i++) {
... db.testcoll.insert(
..... {
....... uid: i,
....... name: getRandomString(),
....... date_created: getRandomDate(),
....... age: getRandomInt(1,100),
....... address: getRandomString(),
....... city: getRandomString(),
....... country: getRandomString()
....... }
..... )
... }

Now we have our initial collection. Let’s take a look at the distribution of the chunks and the sharding status of the cluster.

{
  database: {
    _id: 'testdb',
    primary: 'sh1-rs',
    partitioned: true,
    version: {
    uuid: UUID("efda5b69-1ffc-42c3-abed-ad20e08e321d"),
    timestamp: Timestamp({ t: 1628527701, i: 21 }),
    lastMod: 1
  }
},
collections: {
  'testdb.testcoll': {
    shardKey: { age: 1 },
    unique: false,
    balancing: true,
    chunkMetadata: [
      { shard: 'sh0-rs', nChunks: 2 },
      { shard: 'sh1-rs', nChunks: 3 }
    ],
    chunks: [
      { min: { age: MinKey() }, max: { age: 1 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 0 }) },
      { min: { age: 1 }, max: { age: 42 }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 4, i: 0 }) },
      { min: { age: 42 }, max: { age: 72 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 2 }) },
      { min: { age: 72 }, max: { age: 100 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 3 }) },
      { min: { age: 100 }, max: { age: MaxKey() }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 5, i: 1 }) }
    ],
    tags: []
    }
  }
}

With the current shard key on the age field we’ve got 5 chunks in total, 2 on sh0-rs and 3 on sh1-rs. The total size of the collection is around 200MB (uncompressed).

It’s time to test the resharding. Let’s try to change the shard key to { name: 1 }. The following is the right syntax where we provide the namespace to reshard and the new shard key:

[direct: mongos] testdb> db.adminCommand({
... reshardCollection: "testdb.testcoll",
... key: { name: 1 }
... })

From another connection we can monitor the status of the resharding:

[direct: mongos] testdb> db.getSiblingDB("admin").aggregate([ 
{ $currentOp: { allUsers: true, localOps: false } }, 
{ $match: { type: "op", "originatingCommand.reshardCollection": "testdb.testcoll" } }])
[
  {
    shard: 'sh0-rs',
    type: 'op',
    desc: 'ReshardingRecipientService f86967fe-36a9-4836-8972-1061a61230df',
    op: 'command',
    ns: 'testdb.testcoll',
    originatingCommand: {
      reshardCollection: 'testdb.testcoll',
      key: { name: 1 },
      unique: false,
      collation: { locale: 'simple' }
    },
    totalOperationTimeElapsedSecs: Long("145"),
    remainingOperationTimeEstimatedSecs: Long("173"),
    approxDocumentsToCopy: Long("624135"),
    documentsCopied: Long("569868"),
    approxBytesToCopy: Long("95279851"),
    bytesCopied: Long("86996201"),
    totalCopyTimeElapsedSecs: Long("145"),
    oplogEntriesFetched: Long("0"),
    oplogEntriesApplied: Long("0"),
    totalApplyTimeElapsedSecs: Long("0"),
    recipientState: 'cloning',
    opStatus: 'running'
  },
  {
    shard: 'sh0-rs',
    type: 'op',
    desc: 'ReshardingDonorService f86967fe-36a9-4836-8972-1061a61230df',
    op: 'command',
    ns: 'testdb.testcoll',
    originatingCommand: {
      reshardCollection: 'testdb.testcoll',
      key: { name: 1 },
      unique: false,
      collation: { locale: 'simple' }
    },
    totalOperationTimeElapsedSecs: Long("145"),
    remainingOperationTimeEstimatedSecs: Long("173"),
    countWritesDuringCriticalSection: Long("0"),
    totalCriticalSectionTimeElapsedSecs: Long("0"),
    donorState: 'donating-initial-data',
    opStatus: 'running'
  }, 
  {
    shard: 'sh1-rs',
    type: 'op',
    desc: 'ReshardingDonorService f86967fe-36a9-4836-8972-1061a61230df',
    op: 'command',
    ns: 'testdb.testcoll',
    originatingCommand: {
      reshardCollection: 'testdb.testcoll',
      key: { name: 1 },
      unique: false,
      collation: { locale: 'simple' }
    },
    totalOperationTimeElapsedSecs: Long("145"),
    remainingOperationTimeEstimatedSecs: Long("276"),
    countWritesDuringCriticalSection: Long("0"),
    totalCriticalSectionTimeElapsedSecs: Long("0"),
    donorState: 'donating-initial-data', 
    opStatus: 'running'
  },
  {
    shard: 'sh1-rs',
    type: 'op',
    desc: 'ReshardingRecipientService f86967fe-36a9-4836-8972-1061a61230df',
    op: 'command',
    ns: 'testdb.testcoll',
    originatingCommand: {
      reshardCollection: 'testdb.testcoll',
      key: { name: 1 },
      unique: false,
      collation: { locale: 'simple' }
    },
    totalOperationTimeElapsedSecs: Long("145"),
    remainingOperationTimeEstimatedSecs: Long("276"),
    approxDocumentsToCopy: Long("624135"),
    documentsCopied: Long("430132"),
    approxBytesToCopy: Long("95279851"),
    bytesCopied: Long("65663031"),
    totalCopyTimeElapsedSecs: Long("145"),
    oplogEntriesFetched: Long("0"),
    oplogEntriesApplied: Long("0"),
    totalApplyTimeElapsedSecs: Long("0"),
    recipientState: 'cloning',
    opStatus: 'running'
  }
]

With the totalOperationTimeElapsedSecs you can see the time elapsed and with the remainingOperationTimeEstimatedSecs the estimated time for completing the operation.

While resharding is running we are allowed to write into the collection. For example, I tried the following and it worked:

[direct: mongos] testdb> db.testcoll.insert({ name: "new doc" })
{
  acknowledged: true,
  insertedIds: { '0': ObjectId("61125fa53916a70b7fdd8f6c") }
}

The full resharding of the collection took around 5 minutes. Let’s now see the new status of the sharding.

{
  database: {
    _id: 'testdb',
    primary: 'sh1-rs',
    partitioned: true,
    version: {
      uuid: UUID("efda5b69-1ffc-42c3-abed-ad20e08e321d"),
      timestamp: Timestamp({ t: 1628527701, i: 21 }),
      lastMod: 1
    }
  },
  collections: {
    'testdb.testcoll': {
    shardKey: { name: 1 },
    unique: false,
    balancing: true,
    chunkMetadata: [
      { shard: 'sh0-rs', nChunks: 3 },
      { shard: 'sh1-rs', nChunks: 3 }
    ],
    chunks: [
      { min: { name: MinKey() }, max: { name: '7gbk73hf42g' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 6, i: 0 }) },
      { min: { name: '7gbk73hf42g' }, max: { name: 'cv1oqoetetf' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 1 }) },
      { min: { name: 'cv1oqoetetf' }, max: { name: 'kuk1p3z8kc' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 2, i: 2 }) },
      { min: { name: 'kuk1p3z8kc' }, max: { name: 'ppq4ubqwywh' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 2, i: 3 }) },
      { min: { name: 'ppq4ubqwywh' }, max: { name: 'zsods6qhqp' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 1, i: 3 }) },
      { min: { name: 'zsods6qhqp' }, max: { name: MaxKey() }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 6, i: 1 }) }
    ],
    tags: []
    }
  }
}

We can see the shard key has been changed, and the chunks are now evenly distributed.

[direct: mongos] testdb> db.testcoll.getShardDistribution()
Shard sh1-rs at sh1-rs/172.30.0.108:27017
{
  data: '92.72MiB',
  docs: 636938,
  chunks: 3,
  'estimated data per chunk': '30.9MiB',
  'estimated docs per chunk': 212312
}
---
Shard sh0-rs at sh0-rs/172.30.0.221:27017
{
  data: '82.96MiB',
  docs: 569869,
  chunks: 3,
  'estimated data per chunk': '27.65MiB',
  'estimated docs per chunk': 189956
}
---
Totals
{
  data: '175.69MiB',
  docs: 1206807,
  chunks: 6,
  'Shard sh1-rs': [
    '52.77 % data',
    '52.77 % docs in cluster',
    '152B avg obj size on shard'
  ],
  'Shard sh0-rs': [
    '47.22 % data',
    '47.22 % docs in cluster',  
    '152B avg obj size on shard'
  ]
}

Good. It worked.

Out of curiosity, during the resharding, the temporary collection being copied is visible. Just run sh.status():

    collections: {
      'testdb.system.resharding.defda80d-ea5f-4e52-9a7f-9b3e2ed8ddf1': {
        shardKey: { name: 1 },
        unique: false,
        balancing: true,
        chunkMetadata: [
          { shard: 'sh0-rs', nChunks: 3 },
          { shard: 'sh1-rs', nChunks: 3 }
        ],
        chunks: [
          { min: { name: MinKey() }, max: { name: '9bnfw8n23l' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 2, i: 0 }) },
          { min: { name: '9bnfw8n23l' }, max: { name: 'etd3ho9vfwg' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 2, i: 1 }) },
          { min: { name: 'etd3ho9vfwg' }, max: { name: 'kmht0br01l' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 1, i: 2 }) },
          { min: { name: 'kmht0br01l' }, max: { name: 'sljcb1s70g' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 1, i: 3 }) },
          { min: { name: 'sljcb1s70g' }, max: { name: 'zzi3ijuw2f' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 1, i: 4 }) },
          { min: { name: 'zzi3ijuw2f' }, max: { name: MaxKey() }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 1, i: 5 }) }
        ],
        tags: []
      },
      'testdb.testcoll': {
        shardKey: { age: 1 },
        unique: false,
        balancing: true,
        chunkMetadata: [
          { shard: 'sh0-rs', nChunks: 2 },
          { shard: 'sh1-rs', nChunks: 3 }
        ],
        chunks: [
          { min: { age: MinKey() }, max: { age: 1 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 0 }) },
          { min: { age: 1 }, max: { age: 42 }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 4, i: 0 }) },
          { min: { age: 42 }, max: { age: 72 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 2 }) },
          { min: { age: 72 }, max: { age: 100 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 3 }) },
          { min: { age: 100 }, max: { age: MaxKey() }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 5, i: 1 }) }
        ],
        tags: []
      }
    }
  }

testdb.system.resharding.defda80d-ea5f-4e52-9a7f-9b3e2ed8ddf1 is the newly created collection.

Some Graphs from Percona Monitoring and Management

I deployed Percona Monitoring and Management (PMM) to monitor the cluster and for evaluating the impact of the resharding operation. As said, I used a little test environment with no workload at all, so what we can see on the following graphs is just the impact of the resharding.

The cluster has one config server and two shards. Each replica set is just a single member running on a dedicated virtual host in AWS EC2.

Let’s see the CPU usage

percona monitoring and management mongodb

The node in the middle is the config server. The data-bearing nodes had more impact in terms of CPU usage, quite relevant. The config server didn’t have any impact.

Let’s see the Disk utilization in terms of IOPS and latency:

Disk utilization

The same as the CPU, the config server didn’t have any impact. Instead, the shards increased the IOPS as expected since a new collection has been created. The disk latency had spikes to more than 20 ms at the beginning and at the end of the process.

MongoDB increased the latency of the read operations.

The WiredTiger storage engine increased the number of transactions processed and the cache activity as expected.

Other metrics increased but they are not included here. This behavior was expected and it’s normal for a resharding operation.

The resharding is quite expensive, even for a small collection, in particular for the CPU utilization. Anyway, more tests are needed with larger collections, in the GB/TB magnitude.

Remember the resharding time depends on the size of the collection. Be aware of that and expect the performance decrease for your servers for some time when you plan to do a resharding.

Conclusions

Resharding in MongoDB 5.0 is an amazing new feature but it comes with some additional costs and limitations. Indeed, resharding requires resources in terms of disk space, I/O, and CPU. Be aware of that and plan carefully your resharding operations.

As a best practice, I suggest spending as much time as possible the first time you decide to shard a collection. If you choose the optimal shard key from the beginning, then you won’t need resharding. A good schema design is always the best approach when working with MongoDB.

I suggest continuously monitoring the status of the chunk distribution in your collections. As soon as you notice there’s something wrong with the shard key and the distribution is uneven and it’s getting worse, then use reshardCollection as soon as you can, while the collection is not too large.

Another suggestion is to do resharding only during a maintenance window because the performance impact for your cluster could be significant in the case of very large collections.

Further readings:

https://docs.mongodb.com/manual/core/sharding-reshard-a-collection/

https://docs.mongodb.com/v5.0/core/sharding-choose-a-shard-key/

https://docs.mongodb.com/manual/reference/command/refineCollectionShardKey/

https://www.percona.com/blog/mongodb-5-0-is-coming-in-hot-what-do-database-experts-across-the-community-think/

Percona Distribution for MongoDB is a freely available MongoDB database alternative, giving you a single solution that combines the best and most important enterprise components from the open source community, designed and tested to work together.

Download Percona Distribution for MongoDB Today!

Jul
06
2021
--

Sharding With Zones Based on Compound Shard-Keys on MongoDB 4.4

Compound Shard-Keys on MongoDB 4.4

Compound Shard-Keys on MongoDB 4.4This article was written with the main purpose of showing you how to determine zones on a shard when using compound shard keys.

Defining Zones in Shards means pre-defining where certain chunks will be stored and amongst which set of particular shards they will be balanced according to the shard key definition.

MongoDB 4.4 brings the possibility to shard a collection and determine zones by compound keys, including mixing a hash key with non-hashed keys.  Hashed keys may be placed in the prefix of the index (shard key) or not. Depending on the method chosen, different settings must be in compliance with the balancer and this article will also show you a couple of examples.

Defining the Index Prior to Sharding

Even though it is not always required to create indexes in advance of running the “shardCollection” command, I am defining it to better illustrate the procedure

  • The collections colltest1 will be sharded based on a key containing the hashed key in the prefix
mongos> db.getSiblingDB("dbtest").colltest1.createIndex({_id:"hashed","location":1},{unique:false})
{
	"raw" : {
		"shard02/localhost:40003" : {
			"createdCollectionAutomatically" : true,
			"numIndexesBefore" : 1,
			"numIndexesAfter" : 2,
			"commitQuorum" : "votingMembers",
			"ok" : 1
		}
	},
	"ok" : 1,
	"operationTime" : Timestamp(1624357456, 7),
	"$clusterTime" : { ... }
}

  • The collections colltest2 will be sharded based on a key containing the non-hashed key in the prefix
mongos> db.getSiblingDB("dbtest").colltest2.createIndex({"location":1,_id:"hashed"},{unique:false})
{
	"raw" : {
		"shard02/localhost:40003" : {
			"createdCollectionAutomatically" : true,
			"numIndexesBefore" : 1,
			"numIndexesAfter" : 2,
			"commitQuorum" : "votingMembers",
			"ok" : 1
		}
	},
	"ok" : 1,
	"operationTime" : Timestamp(1624357534, 2),
	"$clusterTime" : { ... }
}

It is important to highlight  a couple of important points about creating the indexes for the shard keys:

  • Once the collection is empty or if it is a new one, the command used to get a collection sharded will automatically create the indexes if they are not present yet. This will be described in the upcoming sections.
  • If the shard key prefix will not be a hashed value, the indexes shall be created with the option {unique:false}

Creating the Initial Chunks Based on Hashed Key as a Prefix

There are, basically, a couple of requirements to ensure that the collection will be in compliance with the balancer and the initial chunk distribution will be optimally performed. 

  • For each non-hashed value, a range of the hashed key must be defined with upper and lower boundaries. 
  • The collection must be sharded with the option presplitHashedZones: true if there is a hashed field.

The below example shows how to shard the collection colltest1.

Defining the Zones and Assigning to the Shards (To make things brief, only one zone “LATAM” is used in this example.)

mongos> sh.addShardToZone("shard01", "LATAM")
{
	"ok" : 1,
	"operationTime" : Timestamp(1624359227, 38),
	"$clusterTime" : { ... }
}
mongos> sh.addShardToZone("shard02", "LATAM")
{
	"ok" : 1,
	"operationTime" : Timestamp(1624359231, 1),
	"$clusterTime" : {... }
}

If you check the output of the sh.status(), you will notice that tags were created and assigned to the shards according to the zones definition:

mongos> sh.status()
--- Sharding Status --- 
  sharding version: {
  	"_id" : 1,
  	"minCompatibleVersion" : 5,
  	"currentVersion" : 6,
  	"clusterId" : ObjectId("60d1be52be9f36000b01a9f0")
  }
  shards:
        {  "_id" : "shard01",  "host" : "shard01/localhost:40002",  "state" : 1,  "tags" : [ "LATAM" ] }
        {  "_id" : "shard02",  "host" : "shard02/localhost:40003",  "state" : 1,  "tags" : [ "LATAM" ] }        
<<the rest of status output was intentionally truncated to avoid unnecessary verbosity>>

Creating the Zone Ranges

mongos> sh.updateZoneKeyRange("dbtest.colltest1",{ "_id" : MinKey, "location" : MinKey },{ _id:MaxKey,"location" : MaxKey },"LATAM");
{
	"ok" : 1,
	"operationTime" : Timestamp(1624361452, 1),
	"$clusterTime" : { ... }
}

Enabling Shard

mongos> sh.enableSharding("dbtest")
{
	"ok" : 1,
	"operationTime" : Timestamp(1624361047, 6),
	"$clusterTime" : { ... }
}
mongos> sh.shardCollection("dbtest.colltest1",{_id:"hashed","location":1},false,{ presplitHashedZones: true })
{
	"collectionsharded" : "dbtest.colltest1",
	"collectionUUID" : UUID("30b2efff-2e2e-4c72-9a82-9593f227002f"),
	"ok" : 1,
	"operationTime" : Timestamp(1624361528, 31),
	"$clusterTime" : { ... }
}

In this example, the namespace dbtest.colltest1 will be evenly distributed according to the zone LATAM which will reach the shards shard01 and shard02. Looking at the sh.status() again, you will see that the initial chunks were created following the range defined above.

Creating the Initial Chunks Based on Non-Hashed Key as a Prefix

This example section will be a little bit more complicated to make the shard key compliant with the balancer for the initial chunk distribution.

  • You need to specify MinKey for each field in the shard key, which defines the lower boundary of each zone range.
  • Every zone must comprise a range, so at least one of the fields in the shard key must have an upper-boundary value larger than its MinKey
  • Every combination of values for the fields of the shard key will fall within the range of one of the defined zones.  
  • At the moment of sharding the collection, presplitHashedZones must be set to true.
  • Not required, but likely desirable: One of the zones should define its upper boundary as MaxKey so it acts as a catchall for out-of-range values.

Defining the Zones and Assigning Them to the Shards

The zones were defined differently for this example:

mongos> sh.status()
--- Sharding Status --- 
  sharding version: {
  	"_id" : 1,
  	"minCompatibleVersion" : 5,
  	"currentVersion" : 6,
  	"clusterId" : ObjectId("60d1be52be9f36000b01a9f0")
  }
  shards:
        {  "_id" : "shard01",  "host" : "shard01/localhost:40002",  "state" : 1,  "tags" : [ "EU" ] }
        {  "_id" : "shard02",  "host" : "shard02/localhost:40003",  "state" : 1,  "tags" : [ "LATAM" ] }
        {  "_id" : "shard03",  "host" : "shard03/localhost:40004",  "state" : 1,  "tags" : [ "AMER" ] }
        {  "_id" : "shard04",  "host" : "shard04/localhost:40005",  "state" : 1,  "tags" : [ "APAC" ] }

Creating the Zone Ranges

mongos> sh.updateZoneKeyRange("dbtest2.colltest2",{ "location": "DC01", "_id" : MinKey },{ "location": "DC02", "_id" : MinKey },"LATAM");
{
	"ok" : 1,
	"operationTime" : Timestamp(1624364375, 1),
	"$clusterTime" : { ... }
}
mongos> sh.updateZoneKeyRange("dbtest2.colltest2",{ "location": "DC02", "_id" : MinKey },{ "location": MaxKey, "_id" : MinKey },"EU");
{
	"ok" : 1,
	"operationTime" : Timestamp(1624364430, 1),
	"$clusterTime" : { ... }
}

Enabling Shard

mongos> sh.enableSharding("dbtest2")
{
	"ok" : 1,
	"operationTime" : Timestamp(1624366057, 5),
	"$clusterTime" : { ... }
}
mongos> sh.shardCollection("dbtest2.colltest2",{"location":1,_id:"hashed"},false,{ presplitHashedZones: true })
{
	"collectionsharded" : "dbtest2.colltest2",
	"collectionUUID" : UUID("b8a64972-97df-4791-8019-9526d2f8d405"),
	"ok" : 1,
	"operationTime" : Timestamp(1624366085, 37),
	"$clusterTime" : { ... }
}
mongos>

The above example basically describes how to use a non-hashed field on the prefix of a shard key to ensure that certain values of that field will reach certain zones (determined as tags on shards) and the boundaries applied on the hashed field will ensure even distribution. In that case, all the docs of the namespace dbtest2.colltest2 with the minimum _id of location DC01 and the minimum _id of the location DC02, will be placed on the zone LATAM (shard02). And the next docs from the minimum _id of the location DC02 until the rest will be placed on the zone EU (shard 01)

It is very important to highlight that if the collection is not in compliance with the balancer, the migration of the chunks will never happen, though all the chunks will stay on the Primary Shard. It is possible to predict that situation right after enabling the sharding on the collection by looking at the output of the command sh.balancerCollectionStatus

mongos> sh.balancerCollectionStatus("dbtest.colltest1")
{
	"balancerCompliant" : true,
	"ok" : 1,
	"operationTime" : Timestamp(1623691376, 1),
	"$clusterTime" : { ... }
}

If the balancerCompliant is true, means that the balancer will be able to split and migrate the chunks.

Conclusion

Defining the shard key is the most important step of deploying a healthy sharded cluster. Having a field on the shard key which contains a few distinct values would compromise the shard distribution, and as consequence, the performance. Hence, this is a great improvement coming along in MongoDB 4.4 which makes it possible to have keys with low cardinality defining the boundaries, yet still ensuring that the shard will rely on a hashed distribution based on a very selective key.

May
27
2021
--

Webinar June 29: Unlocking the Mystery of MongoDB Shard Key Selection

Choose the Right MongoDB Shard Key

Choose the Right MongoDB Shard KeyDo You Know How to Choose the Right MongoDB Shard Key for Your Business?

In our upcoming panel, Percona MongoDB experts Mike Grayson, Kimberly Wilkins, and Vinicius Grippa will discuss the complex issue of MongoDB shard key selection and offer advice on the measures to take if things go wrong.

Selecting the right shard key is one of the most important MongoDB design decisions you make, as it impacts performance and data management. Choosing the wrong shard key can have a disastrous effect on both.

MongoDB 5.0 is due to be released this summer, and it is likely to include another big change around sharding. Following last year’s 4.4 release that included refinable shard keys, we expect to see a new feature that allows for fully changeable shard keys for the first time.

But, even with refinable and changeable options, shard key selection will continue to be a crucial MongoDB task.

Join us for the panel, where Mike, Kimberly, and Vinicius will highlight some of the perils and pitfalls to avoid, as well as offering shard key best practices such as:

* Factors to consider when selecting your shard key

* Hidden “gotcha’s” around shard selection and architecture

* Examples of the worst shard keys our experts have seen

* What happens when you have a busted shard key, and how you can mitigate the impact

* What’s happening “under MongoDB’s hood” with all the changes?

* The future of shard keys for MongoDB

Please join Percona Technical Experts Mike Grayson, Kimberly Wilkins, and Vinicius Grippa on June 29, 2021, at 1 pm EDT for their webinar Unlocking the Mystery of MongoDB Shard Key Selection.

Register for Webinar

If you can’t attend, sign up anyway, and we’ll send you the slides and recording afterward.

Mar
22
2021
--

Storing Kubernetes Operator for Percona Server for MongoDB Secrets in Github

storing kubernetes MongoDB secrets github

storing kubernetes MongoDB secrets githubMore and more companies are adopting GitOps as the way of implementing Continuous Deployment. Its elegant approach built upon a well-known tool wins the hearts of engineers. But even if your git repository is private, it’s strongly discouraged to store keys and passwords in unencrypted form.

This blog post will show how easy it is to use GitOps and keep Kubernetes secrets for Percona Kubernetes Operator for Percona Server for MongoDB securely in the repository with Sealed Secrets or Vault Secrets Operator.

Sealed Secrets

Prerequisites:

  • Kubernetes cluster up and running
  • Github repository (optional)

Install Sealed Secrets Controller

Sealed Secrets rely on asymmetric cryptography (which is also used in TLS), where the private key (which in our case is stored in Kubernetes) can decrypt the message encrypted with the public key (which can be stored in public git repository safely). To make this task easier, Sealed Secrets provides the kubeseal tool, which helps with the encryption of the secrets.

Install kubeseal operator into your Kubernetes cluster:

kubectl apply -f https://github.com/bitnami-labs/sealed-secrets/releases/download/v0.15.0/controller.yaml

It will install the controller into the kube-system namespace and provide the Custom Resource Definition

sealedsecrets.bitnami.com

. All resources in Kubernetes with

kind: SealedSecrets

will be handled by this Operator.

Download the kubeseal binary:

wget https://github.com/bitnami-labs/sealed-secrets/releases/download/v0.15.0/kubeseal-linux-amd64 -O kubeseal
sudo install -m 755 kubeseal /usr/local/bin/kubeseal

Encrypt the Keys

In this example, I intend to store important secrets of the Percona Kubernetes Operator for Percona Server for MongoDB in git along with my manifests that are used to deploy the database.

First, I will seal the secret file with system users, which is used by the MongoDB Operator to manage the database. Normally it is stored in deploy/secrets.yaml.

kubeseal --format yaml < secrets.yaml  > blog-data/sealed-secrets/mongod-secrets.yaml

This command creates the file with encrypted contents, you can see it in the blog-data/sealed-secrets repository here. It is safe to store it publicly as it can only be decrypted with a private key.

Executing

kubectl apply -f blog-data/sealed-secrets/mongod-secrets.yaml

does the following:

  1. A sealedsecrets custom resource (CR) is created. You can see it by executing
    kubectl get sealedsecrets

    .

  2. The Sealed Secrets Operator receives the event that a new sealedsecrets CR is there and decrypts it with the private key.
  3. Once decrypted, a regular Secrets object is created which can be used as usual.

$ kubectl get sealedsecrets
NAME               AGE
my-secure-secret   20m

$ kubectl get secrets my-secure-secret
NAME               TYPE     DATA   AGE
my-secure-secret   Opaque   10     20m

Next, I will also seal the keys for my S3 bucket that I plan to use to store backups of my MongoDB database:

kubeseal --format yaml < backup-s3.yaml  > blog-data/sealed-secrets/s3-secrets.yaml
kubectl apply -f blog-data/sealed-secrets/s3-secrets.yaml

Vault Secrets Operator

Sealed Secrets is the simplest approach, but it is possible to achieve the same result with HashiCorp Vault and Vault Secrets Operator. It is a more advanced, mature, and feature-rich approach.

Prerequisites:

Vault Secrets Operator also relies on Custom Resource, but all the keys are stored in HashiCorp Vault:

Preparation

Create a policy on the Vault for the Operator:

cat <<EOF | vault policy write vault-secrets-operator -
path "kvv2/data/*" {
  capabilities = ["read"]
}
EOF

The policy might look a bit differently, depending on where your secrets are.

Create and fetch the token for the policy:

$ vault token create -period=24h -policy=vault-secrets-operator

Key                  Value                                                                                                                                                                                        
---                  -----                                                                                               
token                s.0yJZfCsjFq75GiVyKiZgYVOm
...

Write down the token, as you will need it in the next step.

Create the Kubernetes Secret so that the Operator can authenticate with the Vault:

export VAULT_TOKEN=s.0yJZfCsjFq75GiVyKiZgYVOm
export VAULT_TOKEN_LEASE_DURATION=86400

cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
metadata:
  name: vault-secrets-operator
type: Opaque
data:
  VAULT_TOKEN: $(echo -n "$VAULT_TOKEN" | base64)
  VAULT_TOKEN_LEASE_DURATION: $(echo -n "$VAULT_TOKEN_LEASE_DURATION" | base64)
EOF

Deploy Vault Secrets Operator

It is recommended to deploy the Operator with Helm, but before we need to create the values.yaml file to configure the operator.

environmentVars:
  - name: VAULT_TOKEN
    valueFrom:
      secretKeyRef:
        name: vault-secrets-operator
        key: VAULT_TOKEN
  - name: VAULT_TOKEN_LEASE_DURATION
    valueFrom:
      secretKeyRef:
        name: vault-secrets-operator
        key: VAULT_TOKEN_LEASE_DURATION
vault:
  address: "http://vault.vault.svc:8200"

Environment variables are pointing to the Secret that was created in the previous chapter to authenticate with Vault. We also need to provide the Vault address for the Operator to retrieve the secrets.

Now we can deploy the Vault Secrets Operator:

helm repo add ricoberger https://ricoberger.github.io/helm-charts
helm repo update

helm upgrade --install vault-secrets-operator ricoberger/vault-secrets-operator -f blog-data/sealed-secrets/values.yaml

Give me the Secret

I have a key created in my HashiCorp Vault:

$ vault kv get kvv2/mongod-secret
…
Key                                 Value
---                                 -----                                                                                                                                                                         
MONGODB_BACKUP_PASSWORD             <>
MONGODB_CLUSTER_ADMIN_PASSWORD      <>
MONGODB_CLUSTER_ADMIN_USER          <>
MONGODB_CLUSTER_MONITOR_PASSWORD    <>
MONGODB_CLUSTER_MONITOR_USER        <>                                                                                                                                                               
MONGODB_USER_ADMIN_PASSWORD         <>
MONGODB_USER_ADMIN_USER             <>

It is time to create the secret out of it. First, we will create the Custom Resource object of

kind: VaultSecret

.

$ cat blog-data/sealed-secrets/vs.yaml
apiVersion: ricoberger.de/v1alpha1
kind: VaultSecret
metadata:
  name: my-secure-secret
spec:
  path: kvv2/mongod-secret
  type: Opaque

$ kubectl apply -f blog-data/sealed-secrets/vs.yaml

The Operator will connect to HashiCorp Vault and create regular Secret object automatically:

$ kubectl get vaultsecret
NAME               SUCCEEDED   REASON    MESSAGE              LAST TRANSITION   AGE
my-secure-secret   True        Created   Secret was created   47m               47m

$ kubectl get secret  my-secure-secret
NAME               TYPE     DATA   AGE
my-secure-secret   Opaque   7      47m

Deploy MongoDB Cluster

Now that the secrets are in place, it is time to deploy the Operator and the DB cluster:

kubectl apply -f blog-data/sealed-secrets/bundle.yaml
kubectl apply -f blog-data/sealed-secrets/cr.yaml

The cluster will be up in a minute or two and use secrets we deployed.

By the way, my cr.yaml deploys MongoDB cluster with two shards. Multiple shards support was added in version 1.7.0of the Operator – I encourage you to try it out. Learn more about it here: Percona Server for MongoDB Sharding.

Mar
10
2021
--

A Peek at Percona Kubernetes Operator for Percona Server for MongoDB New Features

Percona Kubernetes Operator for Percona Server for MongoDB New Features

Percona Kubernetes Operator for Percona Server for MongoDB New FeaturesThe latest 1.7.0 release of Percona Kubernetes Operator for Percona Server for MongoDB came out just recently and enables users to:

Today we will look into these new features, the use cases, and highlight some architectural and technical decisions we made when implementing them.

Sharding

The 1.6.0 release of our Operator introduced single shard support, which we highlighted in this blog post and explained why it makes sense. But horizontal scaling is not possible without support for multiple shards.

Adding a Shard

A new shard is just a new ReplicaSet which can be added under spec.replsets in cr.yaml:

spec:
  ...
  replsets:
  - name: rs0
    size: 3
  ....
  - name: rs1
    size: 3
  ...

Read more on how to configure sharding.

In the Kubernetes world, a MongoDB ReplicaSet is a StatefulSet with a number of pods specified in

spec.replsets.[].size

variable.

Once pods are up and running, the Operator does the following:

  • Initiates ReplicaSet by connecting to newly created pods running mongod
  • Connects to mongos and adds a shard with sh.addShard() command

    adding a shard mongodb operator

Then the output of db.adminCommand({ listShards:1 }) will look like this:

        "shards" : [
                {
                        "_id" : "replicaset-1",
                        "host" : "replicaset-1/percona-cluster-replicaset-1-0.percona-cluster-replicaset-1.default.svc.cluster.local:27017,percona-cluster-replicaset-1-1.percona-cluster-replicaset-1.default.svc.cluster.local:27017,percona-cluster-replicaset-1-2.percona-cluster-replicaset-1.default.svc.cluster.local:27017",
                        "state" : 1
                },
                {
                        "_id" : "replicaset-2",
                        "host" : "replicaset-2/percona-cluster-replicaset-2-0.percona-cluster-replicaset-2.default.svc.cluster.local:27017,percona-cluster-replicaset-2-1.percona-cluster-replicaset-2.default.svc.cluster.local:27017,percona-cluster-replicaset-2-2.percona-cluster-replicaset-2.default.svc.cluster.local:27017",
                        "state" : 1
                }
        ],

Have open source expertise to share? Submit your talk for Percona Live ONLINE!

Deleting a Shard

Percona Operators are built to simplify the deployment and management of the databases on Kubernetes. Our goal is to provide resilient infrastructure, but the operator does not manage the data itself. Deleting a shard requires moving the data to another shard before removal, but there are a couple of caveats:

  • Sometimes data is not moved automatically by MongoDB – unsharded collections or jumbo chunks
  • We hit the storage problem – what if another shard does not have enough disk space to hold the data?

shard does not have enough disk space to hold the data

There are a few choices:

  1. Do not touch the data. The user needs to move the data manually and then the operator removes the empty shard.
  2. The operator decides where to move the data and deals with storage issues by upscaling if necessary.
    • Upscaling the storage can be tricky, as it requires certain capabilities from the Container Storage Interface (CNI) and the underlying storage infrastructure.

For now, we decided to pick option #1 and won’t touch the data, but in future releases, we would like to work with the community to introduce fully-automated shard removal.

When the user wants to remove the shard now, we first check if there are any non-system databases present on the ReplicaSet. If there are none, the shard can be removed:

func (r *ReconcilePerconaServerMongoDB) checkIfPossibleToRemove(cr *api.PerconaServerMongoDB, usersSecret *corev1.Secret, rsName string) error {
  systemDBs := map[string]struct{}{
    "local": {},
    "admin": {},
    "config":  {},
  }

delete a shard

Custom Sidecars

The sidecar container pattern allows users to extend the application without changing the main container image. They leverage the fact that all containers in the pod share storage and network resources.

Percona Operators have built-in support for Percona Monitoring and Management to gain monitoring insights for the databases on Kubernetes, but sometimes users may want to expose metrics to other monitoring systems.  Lets see how mongodb_exporter can expose metrics running as a sidecar along with ReplicaSet containers.

1. Create the monitoring user that the exporter will use to connect to MongoDB. Connect to mongod in the container and create the user:

> db.getSiblingDB("admin").createUser({
    user: "mongodb_exporter",
    pwd: "mysupErpassword!123",
    roles: [
      { role: "clusterMonitor", db: "admin" },
      { role: "read", db: "local" }
    ]
  })

2. Create the Kubernetes secret with these login and password. Encode both the username and password with base64:

$ echo -n mongodb_exporter | base64
bW9uZ29kYl9leHBvcnRlcg==
$ echo -n 'mysupErpassword!123' | base64
bXlzdXBFcnBhc3N3b3JkITEyMw==

Put these into the secret and apply:

$ cat mongoexp_secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: mongoexp-secret
data:
  username: bW9uZ29kYl9leHBvcnRlcg==
  password: bXlzdXBFcnBhc3N3b3JkITEyMw==

$ kubectl apply -f mongoexp_secret.yaml

3. Add a sidecar for mongodb_exporter into cr.yaml and apply:

replsets:
- name: rs0
  ...
  sidecars:
  - image: bitnami/mongodb-exporter:latest
    name: mongodb-exporter
    env:
    - name: EXPORTER_USER
      valueFrom:
        secretKeyRef:
          name: mongoexp-secret
          key: username
    - name: EXPORTER_PASS
      valueFrom:
        secretKeyRef:
          name: mongoexp-secret
          key: password
    - name: POD_IP
      valueFrom:
        fieldRef:
          fieldPath: status.podIP
    - name: MONGODB_URI
      value: "mongodb://$(EXPORTER_USER):$(EXPORTER_PASS)@$(POD_IP):27017"
    args: ["--web.listen-address=$(POD_IP):9216"

$ kubectl apply -f deploy/cr.yaml

All it takes now is to configure the monitoring system to fetch the metrics for each mongod Pod. For example, prometheus-operator will start fetching metrics once annotations are added to ReplicaSet pods:

replsets:
- name: rs0
  ...
  annotations:
    prometheus.io/scrape: 'true'
    prometheus.io/port: '9216'

PVCs Clean Up

Running CICD pipelines that deploy MongoDB clusters on Kubernetes is a common thing. Once these clusters are terminated, the Persistent Volume Claims (PVCs) are not. We have now added automation that removes PVCs after cluster deletion. We rely on Kubernetes Finalizers – asynchronous pre-delete hooks. In our case we hook the finalizer to the Custom Resource (CR) object which is created for the MongoDB cluster.

PVCs Clean Up

A user can enable the finalizer through cr.yaml in the metadata section:

metadata:
  name: my-cluster-name
  finalizers:
     - delete-psmdb-pvc

Conclusion

Percona is committed to providing production-grade database deployments on Kubernetes. Our Percona Kubernetes Operator for Percona Server for MongoDB is a feature-rich tool to deploy and manage your MongoDB clusters with ease. Our Operator is free and open source. Try it out by following the documentation here or help us to make it better by contributing your code and ideas to our Github repository.

May
24
2019
--

An Overview of Sharding in PostgreSQL and How it Relates to MongoDB’s

PostgreSQL LogoA couple of weeks ago I presented at Percona University São Paulo about the new features in PostgreSQL that allow the deployment of simple shards. I’ve tried to summarize the main points in this post, as well as providing an introductory overview of sharding itself. Please note I haven’t included any third-party extensions that provide sharding for PostgreSQL in my discussion below.

Partitioning in PostgreSQL

In a nutshell, until not long ago there wasn’t a dedicated, native feature in PostgreSQL for table partitioning. Not that that prevented people from doing it anyway: the PostgreSQL community is very creative. There’s a table inheritance feature in PostgreSQL that allows the creation of child tables with the same structure as a parent table. That, combined with the employment of proper constraints in each child table along with the right set of triggers in the parent table, has provided practical “table partitioning” in PostgreSQL for years (and still works). Here’s an example:

Using table inheritance

CREATE TABLE temperature (
  id BIGSERIAL PRIMARY KEY NOT NULL,
  city_id INT NOT NULL,
  timestamp TIMESTAMP NOT NULL,
  temp DECIMAL(5,2) NOT NULL
);

Figure 1a. Main (or parent) table

CREATE TABLE temperature_201901 (CHECK (timestamp >= DATE '2019-01-01' AND timestamp <= DATE '2019-01-31')) INHERITS (temperature);
CREATE TABLE temperature_201902 (CHECK (timestamp >= DATE '2019-02-01' AND timestamp <= DATE '2019-02-28')) INHERITS (temperature);
CREATE TABLE temperature_201903 (CHECK (timestamp >= DATE '2019-03-01' AND timestamp <= DATE '2019-03-31')) INHERITS (temperature);

Figure 1b. Child tables inherit the structure of the parent table and are limited by constraints

CREATE OR REPLACE FUNCTION temperature_insert_trigger()
RETURNS TRIGGER AS $$
BEGIN
    IF ( NEW.timestamp >= DATE '2019-01-01' AND NEW.timestamp <= DATE '2019-01-31' ) THEN INSERT INTO temperature_201901 VALUES (NEW.*);
    ELSIF ( NEW.timestamp >= DATE '2019-02-01' AND NEW.timestamp <= DATE '2019-02-28' ) THEN INSERT INTO temperature_201902 VALUES (NEW.*);
    ELSIF ( NEW.timestamp >= DATE '2019-03-01' AND NEW.timestamp <= DATE '2019-03-31' ) THEN INSERT INTO temperature_201903 VALUES (NEW.*);
    ELSE RAISE EXCEPTION 'Date out of range!';
    END IF;
    RETURN NULL;
END;
$$
LANGUAGE plpgsql;

Figure 1c. A function that controls in which child table a new entry should be added according to the timestamp field

CREATE TRIGGER insert_temperature_trigger
    BEFORE INSERT ON temperature
    FOR EACH ROW EXECUTE PROCEDURE temperature_insert_trigger();

Figure 1d. A trigger is added to the parent table that calls the function above when an INSERT is performed

The biggest drawbacks for such an implementation was related to the amount of manual work needed to maintain such an environment (even though a certain level of automation could be achieved through the use of 3rd party extensions such as pg_partman) and the lack of optimization/support for “distributed” queries. The PostgreSQL optimizer wasn’t advanced enough to have a good understanding of partitions at the time, though there were workarounds that could be used such as employing constraint exclusion.

Declarative partitioning

About 1.5 year ago, PostgreSQL 10 was released with a bunch of new features, among them native support for table partitioning through the new declarative partitioning feature. Here’s how we could partition the same temperature table using this new method:

CREATE TABLE temperature (
  id BIGSERIAL NOT NULL,
  city_id INT NOT NULL,
  timestamp TIMESTAMP NOT NULL,
  temp DECIMAL(5,2) NOT NULL
) PARTITION BY RANGE (timestamp);

Figure 2a. Main table structure for a partitioned table

CREATE TABLE temperature_201901 PARTITION OF temperature FOR VALUES FROM ('2019-01-01') TO ('2019-02-01');
CREATE TABLE temperature_201902 PARTITION OF temperature FOR VALUES FROM ('2019-02-01') TO ('2019-03-01');
CREATE TABLE temperature_201903 PARTITION OF temperature FOR VALUES FROM ('2019-03-01') TO ('2019-04-01');

Figure 2b. Tables defined as partitions of the main table; with declarative partitioning, there was no need for triggers anymore.

It still missed the greater optimization and flexibility needed to consider it a complete partitioning solution. It wasn’t possible, for example, to perform an UPDATE that would result in moving a row from one partition to a different one, but the foundation had been laid. Fast forward another year and PostgreSQL 11 builds on top of this, delivering additional features like:

  • the possibility to define a default partition, to which any entry that wouldn’t fit a corresponding partition would be added to.
  • having indexes added to the main table “replicated” to the underlying partitions, which improved declarative partitioning usability.
  • support for Foreign Keys

These are just a few of the features that led to a more mature partitioning solution.

Sharding in PostgreSQL

By now you might be reasonably questioning my premise, and that partitioning is not sharding, at least not in the sense and context you would have expected this post to cover. In fact, PostgreSQL has implemented sharding on top of partitioning by allowing any given partition of a partitioned table to be hosted by a remote server. The basis for this is in PostgreSQL’s Foreign Data Wrapper (FDW) support, which has been a part of the core of PostgreSQL for a long time. While technically possible to implement, we just couldn’t make practical use of it for sharding using the table inheritance + triggers approach. Declarative partitioning allowed for much better integration of these pieces making sharding – partitioned tables hosted by remote servers – more of a reality in PostgreSQL.

CREATE TABLE temperature_201904 (
  id BIGSERIAL NOT NULL,
  city_id INT NOT NULL,
  timestamp TIMESTAMP NOT NULL,
  temp DECIMAL(5,2) NOT NULL
);

Figure 3a. On the remote server we create a “partition” – nothing but a simple table

CREATE EXTENSION postgres_fdw;
GRANT USAGE ON FOREIGN DATA WRAPPER postgres_fdw to app_user;
CREATE SERVER shard02 FOREIGN DATA WRAPPER postgres_fdw
    OPTIONS (dbname 'postgres', host 'shard02', port
    '5432');
CREATE USER MAPPING for app_user SERVER shard02
    OPTIONS (user 'fdw_user', password 'secret');

Figure 3b. On the local server the preparatory steps involve loading the postgres_fdw extension, allowing our local application user to use that extension, creating an entry to access the remote server, and finally mapping that user with a user in the remote server (fdw_user) that has local access to the table we’ll use as a remote partition.

CREATE FOREIGN TABLE temperature_201904 PARTITION OF temperature
    FOR VALUES FROM ('2019-04-01') TO ('2019-05-01')
    SERVER remoteserver01;

Figure 3c. Now it’s simply a matter of creating a proper partition of our main table in the local server that will be linked to the table of the same name in the remote server

You can read more about postgres_fdw in Foreign Data Wrappers in PostgreSQL and a closer look at postgres_fdw.

When does it make sense to partition a table?

There are a several principle reasons to partition a table:

  1. When a table grows so big that searching it becomes impractical even with the help of indexes (which will invariably become too big as well).
  2. When data management is such that the target data is often the most recently added and/or older data is constantly being purged/archived, or even not being searched anymore (at least not as often).
  3. If you are loading data from different sources and maintaining it as a data warehousing for reporting and analytics.
  4. For a less expensive archiving or purging of massive data that avoids exclusive locks on the entire table.

When should we resort to sharding?

Here are a couple of classic cases:

  1. To scale out (horizontally), when even after partitioning a table the amount of data is too great or too complex to be processed by a single server.
  2. Use cases where the data in a big table can be divided into two or more segments that would benefit the majority of the search patterns. A common example used to describe a scenario like this is that of a company whose customers are evenly spread across the United States and searches to a target table involves the customer ZIP code. A shard then could be used to host entries of customers located on the East coast and another for customers on the West coast.

Note though this is by no means an extensive list.

How should we shard the data?

With sharding (in this context) being “distributed” partitioning, the essence for a successful (performant) sharded environment lies in choosing the right shard key – and by “right” I mean one that will distribute your data across the shards in a way that will benefit most of your queries. In the example above, using the customer ZIP code as shard key makes sense if an application will more often be issuing queries that will hit one shard (East) or the other (West). However, if most queries would filter by, say, birth date, then all queries would need to be run through all shards to recover the full result set. This could easily backfire on performance with the shard approach, by not selecting the right shard key or simply by having such a heterogeneous workload that no shard key would be able to satisfy it.

It only ever makes sense to shard if the nature of the queries involving the target table(s) is such that distributed processing will be the norm and constitute an advantage far greater than any overhead caused by a minority of queries that rely on JOINs involving multiple shards. Due to the distributed nature of sharding such queries will necessarily perform worse if compared to having them all hosted on the same server.

Why not simply rely on replication or clustering?

Sharding should be considered in those situations where you can’t efficiently break down a big table through data normalization or use an alternative approach and maintaining it on a single server is too demanding. The table is then partitioned and the partitions distributed across different servers to spread the load across many servers. It doesn’t need to be one partition per shard, often a single shard will host a number of partitions.

Note how sharding differs from traditional “share all” database replication and clustering environments: you may use, for instance, a dedicated PostgreSQL server to host a single partition from a single table and nothing else. However, these data scaling technologies may well complement each other: a PostgreSQL database may host a shard with part of a big table as well as replicate smaller tables that are often used for some sort of consultation (read-only), such as a price list, through logical replication.

How does sharding in PostgreSQL relates to sharding in MongoDB®?

MongoDB® tackles the matter of managing big collections straight through sharding: there is no concept of local partitioning of collections in MongoDB. In fact, the whole MongoDB scaling strategy is based on sharding, which takes a central place in the database architecture. As such, the sharding process has been made as transparent to the application as possible: all a DBA has to do is to define the shard key.

Instead of connecting to a reference database server the application will connect to an auxiliary router server named mongos which will process the queries and request the necessary information to the respective shard. It knows which shard contains what because they maintain a copy of the metadata that maps chunks of data to shards, which they get from a config server, another important and independent component of a MongoDB sharded cluster. Together, they also play a role in maintaining good data distribution across the shards, actively splitting and migrating chunks of data between servers as needed.

In PostgreSQL the application will connect and query the main database server. There isn’t an intermediary router such as the mongos but PostgreSQL’s query planner will process the query and create an execution plan. When data requested from a partitioned table is found on a remote server PostgreSQL will request the data from it, as shown in the EXPLAIN output below:

…
   Remote SQL: UPDATE public.emp SET sal = $2 WHERE ctid = $1
   ->  Nested Loop  (cost=100.00..300.71 rows=669 width=118)
     	Output: emp.empno, emp.ename, emp.job, emp.mgr, emp.hiredate, (emp.sal * '1.1'::double precision), emp.comm, emp.deptno, emp.ctid, salgrade.ctid
     	Join Filter: ((emp.sal > (salgrade.losal)::double precision) AND (emp.sal < (salgrade.hisal)::double precision)) ->  Foreign Scan on public.emp  (cost=100.00..128.06 rows=602 width=112)
           	Output: emp.empno, emp.ename, emp.job, emp.mgr, emp.hiredate, emp.sal, emp.comm, emp.deptno, emp.ctid
…

Figure 4: excerpt of an EXPLAIN plan that involves processing a query in a remote server

Note in the above query the mention “Remote SQL”. A lot of optimizations have been made in the execution of remote queries in PostgreSQL 10 and 11, which contributed to mature and improve the sharding solution. Among them is support for having grouping and aggregation operations executed on the remote server itself (“push down”) rather than recovering all rows and having them processed locally.

What is missing in PostgreSQL implementation?

There is, however, still room for improvement. In terms of remote execution, reports from the community indicate not all queries are performing as they should. For example, in some cases the PostgreSQL planner is not performing a full push-down, resulting in shards transferring more data than required. Parallel scheduling of queries that touch multiple shards is not yet implemented: for now, the execution is taking place sequentially, one shard at a time, which takes longer to complete. When it comes to the maintenance of partitioned and sharded environments, changes in the structure of partitions are still complicated and not very practical. For example, when you add a new partition to a partitioned table with an appointed default partition you may need to detach the default partition first if it contains rows that would now fit in the new partition, manually move those to the new partition, and finally re-attach the default partition back in place.

But that is all part of a maturing technology. We’re looking forward to PostgreSQL 12 and what it will bring in the partitioning and sharding fronts.


Image based on photos by Leonardo Quatrocchi from Pexels

 

Jul
18
2018
--

Webinar Wed 7/19: MongoDB Sharding

MongoDB shard zones

MongoDB shard zonesPlease join Percona’s Senior Support Engineer, Adamo Tonete as he presents MongoDB Sharding 101 on July 19th, 2018, at 12:30 PM PDT (UTC-7) / 3:30 PM EDT (UTC-4).

 

This tutorial is a continuation of advanced topics for the DBA. In it, we will share best practices and tips on how to perform the most common activities.

In this tutorial, we are going to cover MongoDB sharding.

Register Now

The post Webinar Wed 7/19: MongoDB Sharding appeared first on Percona Database Performance Blog.

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com