The increase of cloud-native technologies is transforming how we manage databases. Since I stepped into the world of databases and cloud-native technologies, I have encountered several initiatives aimed at developing and optimizing database operations in the cloud, and Kubernetes plays a crucial role in this shift through Operators. While the core concepts and techniques of […]
Percona Everest: An Open Source Solution for MongoDB Sharding and Backups
Valkey/Redis Sharding Using the Native Clustering Feature
In this blog post, we are going to implement the concept of sharding in a Valkey setup. This is a built-in feature and can be implemented by enabling clustering in the Valkey configuration.Sharding, in general, helps in distributing/scaling application writes over multiple nodes. In a similar fashion, it works in Valkey. Here, it uses the […]
Merging Empty Chunks in MongoDB
I recently wrote about one of the problems we can encounter while working with sharded clusters, which is Finding Undetected Jumbo Chunks in MongoDB. Another issue that we might run into is dealing with empty chunk management.
Chunk Maintenance
As we know, there is also an autoSplitter process that partitions chunks when they become too big. There is also a balancer process that takes care of moving chunks to ensure even distribution between all shards. So as data grows, chunks are partitioned and perhaps moved over to other shards and all is well.
But what happens when we delete data? It can be the case that some chunks are now empty. If we delete a lot of data, perhaps a significant number of the chunks will be empty. This can be a significant issue for sharded collections with a TTL index.
Potential Issues
One of the potential problems when dealing with a high percentage of empty chunks is uneven data distribution. The balancer will make sure the number of chunks on each shard is roughly the same, but it does not take into account whether the chunks are empty or not. So you might end up with a cluster that looks balanced, but in reality, a few shards have way more data than the rest.
To deal with this problem, the first step is to identify empty chunks.
Identifying Empty Chunks
To illustrate this, let’s consider a client’s collection that is sharded by the “org_id” field. Let’s assume the collection currently has the following chunks ranges:
minKey –> 1
1 -–> 5
5 —-> 10
10 –> 15
15 —-> 20
We can use the dataSize command to determine the size of a chunk. This command receives the chunk range as part of the arguments. For example, to check how many documents we have on the third chunk, we would run:
db.runCommand({ dataSize: "mydatabase.clients", keyPattern: { org_id: 1 }, min: { org_id: 5 }, max: { org_id: 10 } })
This returns a document like the following:
{ "size" : 0, "numObjects" : 0, "millis" : 30, "ok" : 1, "operationTime" : Timestamp(1641829163, 2), "$clusterTime" : { "clusterTime" : Timestamp(1641829163, 3), "signature" : { "hash" : BinData(0,"LbBPsTEahzG/v7I6oe7iyvLr/pU="), "keyId" : NumberLong("7016744225173049401") } } }
If the size is 0 we know we have an empty chunk, and we can consider merging it with either the chunk that comes right after it (with the range 10 ? 15) or the one just before it (with the range 1 ? 5).
Merging Chunks
Assuming we take the first option, here is the mergeChunks command that helps us get this done:
db.adminCommand( { mergeChunks: "database.collection", bounds: [ { "field" : "5" }, { "field" : "15" } ] } )
The new chunk ranges now would be as follows:
minKey –> 1
1 —-> 5
5 —-> 15
15 —-> 20
One caveat is that the chunks we want to merge might not be on the same shard. If that is the case we need to move them together first, using the moveChunk command.
Putting it All Together
Following the above logic, we can iterate through all the chunks in shard key order and check their size. If we find an empty chunk, we merge it with the chunk just before it. If the chunks are not on the same shard, we move them together. The following script can be used to print all the commands required:
var mergeChunkInfo = function(ns){ var chunks = db.getSiblingDB("config").chunks.find({"ns" : ns}).sort({min:1}).noCursorTimeout(); //some counters for overall stats at the end var totalChunks = 0; var totalMerges = 0; var totalMoves = 0; var previousChunk = {}; var previousChunkInfo = {}; var ChunkJustChanged = false; chunks.forEach( function printChunkInfo(currentChunk) { var db1 = db.getSiblingDB(currentChunk.ns.split(".")[0]) var key = db.getSiblingDB("config").collections.findOne({_id:currentChunk.ns}).key; db1.getMongo().setReadPref("secondary"); var currentChunkInfo = db1.runCommand({datasize:currentChunk.ns, keyPattern:key, min:currentChunk.min, max:currentChunk.max, estimate:true }); totalChunks++; // if the current chunk is empty and the chunk before it was not merged in the previous iteration (or was the first chunk) we have candidates for merging if(currentChunkInfo.size == 0 && !ChunkJustChanged) { // if the chunks are contiguous if(JSON.stringify(previousChunk.max) == JSON.stringify(currentChunk.min) ) { // if they belong to the same shard, merge with the previous chunk if(previousChunk.shard.toString() == currentChunk.shard.toString() ) { print('db.runCommand( { mergeChunks: "' + currentChunk.ns.toString() + '",' + ' bounds: [ ' + JSON.stringify(previousChunk.min) + ',' + JSON.stringify(currentChunk.max) + ' ] })'); // after a merge or move, we don't consider the current chunk for the next iteration. We skip to the next chunk. ChunkJustChanged=true; totalMerges++; } // if they contiguous but are on different shards, we need to have both chunks to the same shard before merging, so move the current one and don't merge for now else { print('db.runCommand( { moveChunk: "' + currentChunk.ns.toString() + '",' + ' bounds: [ ' + JSON.stringify(currentChunk.min) + ',' + JSON.stringify(currentChunk.max) + ' ], to: "' + previousChunk.shard.toString() + '" });'); // after a merge or move, we don't consider the current chunk for the next iteration. We skip to the next chunk. ChunkJustChanged=true; totalMoves++; } } else { // chunks are not contiguous (this shouldn't happen unless this is the first iteration) previousChunk=currentChunk; previousChunkInfo=currentChunkInfo; ChunkJustChanged=false; } } else { // if the current chunk is not empty or we already operated with the previous chunk let's continue with the next chunk pair previousChunk=currentChunk; previousChunkInfo=currentChunkInfo; ChunkJustChanged=false; } } ) print("***********Summary Chunk Information***********"); print("Total Chunks: "+totalChunks); print("Total Move Commands to Run: "+totalMoves); print("Total Merge Commands to Run: "+totalMerges); }
We can invoke it from the Mongo shell as follows:
The script will generate all the commands needed to merge pairs of chunks where at least one is empty. After running the generated commands, this should cut the number of empty chunks in half. Running the script multiple times will eventually get rid of all the empty chunks.
Final Notes
Most people are aware of the problems with jumbo chunks; now we have seen how empty chunks can also be problematic in certain scenarios.
It is a good idea to stop the balancer before attempting any operation that modifies chunks (like merging the empty chunks). This ensures that no conflicting operations happen at the same time. Don’t forget to enable back the balancer afterward.
Configuring a MongoDB Sharded Cluster with PMM2 – Part 2
As a DBA, it is important to monitor a database to help us troubleshoot or to understand the health of an instance. Percona Monitoring and Management (PMM v2) is open-source and does a great job in monitoring the databases like MongoDB, MySQL, PostgreSQL, etc.
In this blog post, we will see how to configure a sharded cluster in PMM2. This is a part two version of the previous one which was done with PMM v1, titled Configuring PMM Monitoring for MongoDB Cluster. I have listed the steps to configure the sharded cluster into PMM2 below:
Prepare DB for Monitoring
Before configuring with PMM2, we will need to create a USER for monitoring from the database side. If you need to enable QAN (query analytics), then you will need to enable profiler and some more custom permission like “explainRole” to the user as well. Adding profiler adds up some more little load to the database, so it is better you do prior tests to analyze the load if you want to assess the extra load.
Add PMM Users to the DB
// Change role name / user / password as required db.getSiblingDB("admin").createRole({ role: "explainRole", privileges: [{ resource: { db: "", collection: "" }, actions: [ "listIndexes", "listCollections", "dbStats", "dbHash", "collStats", "find" ] }], roles:[] }) db.getSiblingDB("admin").createUser({ user: "pmm_mongodb", pwd: "password", roles: [ { role: "explainRole", db: "admin" }, { role: "clusterMonitor", db: "admin" }, { role: "read", db: "local" } ] })
Enabling Profiler
This is optional. Run the instance with the profiler or add profiling at the database level to monitor queries in QAN (not applicable for mongos).
To start at the instance level (enables profiling for all databases):
mongod <other options> --profile 2 --slowms 200 --rateLimit 100
or in mongod.conf:
operationProfiling: mode: all slowOpThresholdMs: 200 # (Below variable is available only with Percona Server for MongoDB.) rateLimit: 100
To enable p[rofiling at DB level:
use dbname db.setProfilingLevel(2)
Add MongoDB Instance to the pmm-client
Here use the same –cluster option name for all members from the same cluster and provide service-name to identify it:
sudo pmm-admin add mongodb \ --username=pmm_mongodb --password=password \ --query-source=profiler \ --cluster=mycluster \ --service-name=myc_mongoc2 \ --host= --port=37061
Check the Inventory Service
Then check whether the service was added successfully or not:
$ sudo pmm-admin list Service type Service name Address and port Service ID MongoDB myc_mongoc2 /service_id/02e261a1-e8e0-4eb4-8043-8616424500de Agent type Status Metrics Mode Agent ID Service ID pmm_agent Connected /agent_id/281b4046-4f4b-4897-bd2e-b771d3e97922 node_exporter Running push /agent_id/5e9b17a8-ecb9-47c3-8477-ce322047c4d9 mongodb_exporter Running push /agent_id/0067dd85-9a0a-47dd-976e-ae779deb982b /service_id/5c92f132-3005-45ab-84df-7541c286c34a mongodb_profiler_agent Running /agent_id/18d3d87a-9bb9-48c1-8e3e-d8bae3f043bb /service_id/02e261a1-e8e0-4eb4-8043-8616424500de
From My Test
I used localhost to deploy the sharded cluster for the testing purpose as below:
Members list:
1 mongos (37050), 3 shards consist of 3 member replicaSet each (37051-37059), 3 config members(37060-37062)
Listing one mongod instance from the ps command:
balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ ps -ef | grep mongod -w | head -1 balaguru 41883 2846 1 13:01 ? 00:04:04 mongod --replSet configRepl --dbpath /home/balaguru/mongodb/testshard/data/configRepl/rs1/db --logpath /home/balaguru/mongodb/testshard/data/configRepl/rs1/mongod.log --port 37060 --fork --configsvr --wiredTigerCacheSizeGB 1 --profile 2 --slowms 200 --rateLimit 100 --logappend
Adding mongodb services to pmm-admin:
balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \ --query-source=profiler --cluster=mycluster --service-name=myc_s11 --host= --port=37051 MongoDB Service added. Service ID : /service_id/cc6b3fed-ee16-494e-93f0-0d2e8f60a136 Service name: myc_s11--host= balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \ --query-source=profiler --cluster=mycluster --service-name=myc_s12 --host= --port=37052 MongoDB Service added. Service ID : /service_id/235494d8-aaee-4ca0-bd3a-bf2259e87ecc Service name: myc_s12 balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \ --query-source=profiler --cluster=mycluster --service-name=myc_s13 --host= --port=37053 MongoDB Service added. Service ID : /service_id/55261675-41e7-40f1-95c9-08cac25c4f64 Service name: myc_s13 balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \ --query-source=profiler --cluster=mycluster --service-name=myc_s21 --host= --port=37054 MongoDB Service added. Service ID : /service_id/5c92f132-3005-45ab-84df-7541c286c34a Service name: myc_s21 balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \ --query-source=profiler --cluster=mycluster --service-name=myc_s22 --host= --port=37055 MongoDB Service added. Service ID : /service_id/4de07a5b-5a47-4126-8824-80570bd72cef Service name: myc_s22--host= balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \ --query-source=profiler --cluster=mycluster --service-name=myc_s23 --host= --port=37056 MongoDB Service added. Service ID : /service_id/7bdaaa72-6e00-4f46-a2a9-5205d5f3fff5 Service name: myc_s23 balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \ --query-source=profiler --cluster=mycluster --service-name=myc_s31 --host= --port=37057 MongoDB Service added. Service ID : /service_id/2028e075-bc65-4aae-bcdd-ec616b36e81b Service name: myc_s31 balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \ --query-source=profiler --cluster=mycluster --service-name=myc_s32 --host= --port=37058 MongoDB Service added. Service ID : /service_id/7659231c-f48f-4a65-b651-585ac1f058cd Service name: myc_s32 balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \ --query-source=profiler --cluster=mycluster --service-name=myc_s33 --host= --port=37059 MongoDB Service added. Service ID : /service_id/2c224eaf-c0f1-482b-b23c-8ea4b914c8e5 Service name: myc_s33 balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \ --query-source=profiler --cluster=mycluster --service-name=myc_mongoc1 --host= --port=37060 MongoDB Service added. Service ID : /service_id/09e95cc5-40b7-4a53-9e35-2937ca23395f Service name: myc_mongoc1 balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \ --query-source=profiler --cluster=mycluster --service-name=myc_mongoc2 --host= --port=37061 MongoDB Service added. Service ID : /service_id/02e261a1-e8e0-4eb4-8043-8616424500de Service name: myc_mongoc2 balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin add mongodb --username=pmm_mongodb --password=password \ --query-source=profiler --cluster=mycluster --service-name=myc_mongoc3 --host= --port=37062 MongoDB Service added. Service ID : /service_id/421449d9-8ada-46dd-9c8a-84c0847a8742 Service name: myc_mongoc3
Listing the services added:
balaguru@vinodh-UbuntuPC:~/mongodb/testshard$ pmm-admin list Service type Service name Address and port Service ID MongoDB myc_mongoc2 /service_id/02e261a1-e8e0-4eb4-8043-8616424500de MongoDB myc_mongoc1 /service_id/09e95cc5-40b7-4a53-9e35-2937ca23395f MongoDB myc_s31 /service_id/2028e075-bc65-4aae-bcdd-ec616b36e81b MongoDB myc_s12 /service_id/235494d8-aaee-4ca0-bd3a-bf2259e87ecc MongoDB myc_s33 /service_id/2c224eaf-c0f1-482b-b23c-8ea4b914c8e5 MongoDB myc_mongos /service_id/3f4f56be-6259-4579-88b7-bb4d0c29204b MongoDB myc_mongoc3 /service_id/421449d9-8ada-46dd-9c8a-84c0847a8742 MongoDB myc_s22 /service_id/4de07a5b-5a47-4126-8824-80570bd72cef MongoDB myc_s13 /service_id/55261675-41e7-40f1-95c9-08cac25c4f64 MongoDB myc_s21 /service_id/5c92f132-3005-45ab-84df-7541c286c34a MongoDB myc_s32 /service_id/7659231c-f48f-4a65-b651-585ac1f058cd MongoDB myc_s23 /service_id/7bdaaa72-6e00-4f46-a2a9-5205d5f3fff5 MongoDB myc_s11 /service_id/cc6b3fed-ee16-494e-93f0-0d2e8f60a136 Agent type Status Metrics Mode Agent ID Service ID pmm_agent Connected /agent_id/281b4046-4f4b-4897-bd2e-b771d3e97922 node_exporter Running push /agent_id/5e9b17a8-ecb9-47c3-8477-ce322047c4d9 mongodb_exporter Running push /agent_id/0067dd85-9a0a-47dd-976e-ae779deb982b /service_id/5c92f132-3005-45ab-84df-7541c286c34a mongodb_exporter Running push /agent_id/071ec1ae-ff35-4fa1-a4c9-4d5bca705131 /service_id/09e95cc5-40b7-4a53-9e35-2937ca23395f mongodb_exporter Running push /agent_id/5e045290-36c2-410b-86e9-b4945cd7ecfb /service_id/3f4f56be-6259-4579-88b7-bb4d0c29204b mongodb_exporter Running push /agent_id/6331b519-da6e-47c0-be7e-92f2ac142fa5 /service_id/2c224eaf-c0f1-482b-b23c-8ea4b914c8e5 mongodb_exporter Running push /agent_id/6ce78e1c-be6a-4ffd-844b-8afdc0ee5700 /service_id/235494d8-aaee-4ca0-bd3a-bf2259e87ecc mongodb_exporter Running push /agent_id/6ed1bcc2-3561-4c65-95e1-11b3cc051194 /service_id/cc6b3fed-ee16-494e-93f0-0d2e8f60a136 mongodb_exporter Running push /agent_id/7721bd24-7408-431d-abcb-3239459df75a /service_id/7659231c-f48f-4a65-b651-585ac1f058cd mongodb_exporter Running push /agent_id/999c0152-656e-4941-a1fb-003df2dbfbf6 /service_id/2028e075-bc65-4aae-bcdd-ec616b36e81b mongodb_exporter Running push /agent_id/9e63f2d9-7e75-45ee-927d-b1406d4797e0 /service_id/55261675-41e7-40f1-95c9-08cac25c4f64 mongodb_exporter Running push /agent_id/ca3ab511-29eb-4c68-b037-23ab13fa92ff /service_id/4de07a5b-5a47-4126-8824-80570bd72cef mongodb_exporter Running push /agent_id/cd1066eb-f917-4d7e-b284-8d8a8bc7c652 /service_id/7bdaaa72-6e00-4f46-a2a9-5205d5f3fff5 mongodb_exporter Running push /agent_id/e2ef230a-d84b-428c-921b-b6da7c3180f3 /service_id/421449d9-8ada-46dd-9c8a-84c0847a8742 mongodb_exporter Running push /agent_id/e3f7ba25-6592-4cb4-aae6-7431b3b6a6da /service_id/02e261a1-e8e0-4eb4-8043-8616424500de mongodb_profiler_agent Running /agent_id/18d3d87a-9bb9-48c1-8e3e-d8bae3f043bb /service_id/02e261a1-e8e0-4eb4-8043-8616424500de mongodb_profiler_agent Running /agent_id/1cf5ee8a-b5b5-4133-896c-fafccc164f54 /service_id/5c92f132-3005-45ab-84df-7541c286c34a mongodb_profiler_agent Running /agent_id/4b13cc24-fbd2-47cc-955d-c2a65624d2be /service_id/55261675-41e7-40f1-95c9-08cac25c4f64 mongodb_profiler_agent Running /agent_id/4de795cf-f047-49e6-a3bc-dc2ab1b2bc86 /service_id/cc6b3fed-ee16-494e-93f0-0d2e8f60a136 mongodb_profiler_agent Running /agent_id/89ae83c7-e62c-48f6-9e8c-597ce978c8ce /service_id/4de07a5b-5a47-4126-8824-80570bd72cef mongodb_profiler_agent Running /agent_id/98343388-a246-4767-8838-ded8f8de5191 /service_id/235494d8-aaee-4ca0-bd3a-bf2259e87ecc mongodb_profiler_agent Running /agent_id/a5df9e6b-037e-486a-bc95-afe20095cf98 /service_id/7bdaaa72-6e00-4f46-a2a9-5205d5f3fff5 mongodb_profiler_agent Running /agent_id/a6bda9b4-989a-427b-ae64-5deffc2b9ba2 /service_id/7659231c-f48f-4a65-b651-585ac1f058cd mongodb_profiler_agent Running /agent_id/c59c40ca-63ee-4497-b297-403faa9d4ec0 /service_id/2c224eaf-c0f1-482b-b23c-8ea4b914c8e5 mongodb_profiler_agent Running /agent_id/c7f84a08-4823-455b-93a3-168eee19329b /service_id/3f4f56be-6259-4579-88b7-bb4d0c29204b mongodb_profiler_agent Running /agent_id/e85d0757-7542-4b38-bfed-81ded8bf309c /service_id/421449d9-8ada-46dd-9c8a-84c0847a8742 mongodb_profiler_agent Running /agent_id/ed81849a-6fc9-46f3-a5dc-e6c288409009 /service_id/09e95cc5-40b7-4a53-9e35-2937ca23395f mongodb_profiler_agent Running /agent_id/f9d26161-4827-4bed-a85f-cbe3ce9478ab /service_id/2028e075-bc65-4aae-bcdd-ec616b36e81b vmagent Running push /agent_id/a662e1f6-31d3-4514-8f83-ea31e0165d61
PMM Dashboards
From PMM Dashboards, you can then view the replSet summary as well as the sharded cluster summary.
Cluster Summary
This dashboard gives information about the sharded/unsharded databases, shards, chunks, cursor details, etc.
ReplSet Summary:
This dashboard tells about the replication information like replica lag, operations, heartbeat, ping time, etc.
MongoDB Instance Overview:
This is the general dashboard for a MongoDB instance which provides generic information about the connections, memory usage, latency, etc
WiredTiger Details:
This is the main dashboard that you’ll need most to analyze the problems here as it shows the wiredTiger information. The main metrics that you need to monitor here are the WT cache utilization, evictions of modified or unmodified pages, write/read tickets utilization, index/objects scans, etc.
If you enable the profiling, then you could see the queries used in the database here. You can filter them easily as shown in the screenshot below. Also, you can get the explain plan to check whether they utilize the COLLSCAN (disk reads) or IXSCAN (uses index). Also, you can check the counts, load, etc.
As said, Percona Monitoring and Management 2 is very easy to configure to monitor the databases and it is recommended too. It’s better now rather than late to configure the monitoring. PMM2 is managed by Percona which is totally free and you can raise any bugs here – If you have doubts, you can leave your questions here –
Complete the 2021 Percona Open Source Data Management Software Survey
Horizontal Scaling in MySQL – Sharding Followup
In a previous post, A Horizontal Scalability Mindset for MySQL, I discussed the concerns around growing individual MySQL instances too large and some basic strategies:
- Optimizing/minimizing size with proper data types
- Removing unused/duplicate indexes
- Keeping your Primary Keys small
- Pruning data
Finally, if those methods have been exhausted, I touched on horizontal sharding as the approach to keep individual instances at a reasonable size. When discussing my thoughts across our internal teams, there was lots of feedback that we needed to dive into the sharding topic in more detail. This post aims to give more theory and considerations around sharding along with a lightweight ProxySQL sample implementation.
What is Sharding?
Sharding is a word that is frequently used but is often misunderstood. Classic vertical scaling (often just called “scaling”) is when you increase the resources on a single server to handle more data and/or load. Horizontal scaling (aka sharding) is when you actually split your data into smaller, independent buckets and keep adding new buckets as needed.
A sharded environment has two or more groups of MySQL servers that are isolated and independent from each other. While specific implementation details are dependent on your environment, here are some general guidelines:
- A shard can be single server or replication hierarchy
- Each shard has identical schemas – you can run the same query pattern against any shard and expect to get a result set (of course varying the WHERE clause of the identifier)
- A given identifier belongs to exactly one shard at a time – you should NOT have a Customer ID spanning shards (unless only temporarily during migration between shards for example – this is an edge case that requires special handling)
With this understanding of sharding, let’s look at some of the key considerations and challenges for actual implementation.
Sharding Considerations and Challenges
While sharding often becomes the only way to manage exploding data, most experts and architects advise that it is delayed as long as possible. If it is such a great way to handle data, then why is that? The primary reason is complexity. While vertical scaling is trivial (just add more hardware and make no other changes), sharding requires significant thought and planning. The main challenges fall into a few primary buckets:
- How should I split my data?
- What if my data spans shards?
- How do I find the data I need?
Each bucket has its own set of challenges, but each can also be an overall blocker when trying to implement sharding.
How Do I Split My Data?
Picking the shard key is the MOST IMPORTANT part of implementing sharding. The main concept is to pick an identifier that is shared across records, such as CustomerID, ClientID, or UserID. Generally, you’ll want to pick the smallest unit possible (generally at the UserID level), but this also depends on what level of aggregation is needed (more on this later). The risk here is that the key isn’t selective enough and you end up not splitting the data evenly. However, if you are too selective, then you risk breaking up units that logically should remain together (think sharding on ForumPostID vs UserID).
What if My Data Spans Shards?
This is often a deciding factor that prevents an organization from sharding. Splitting the data and retrieving individual records, while not trivial, is relatively straightforward. In many workloads, this is enough. Locate my small subset of data and fetch it. When the use case dictates aggregate queries, however, this workflow breaks down. Take this common query to find the newest users across the platform:
SELECT user_id, date_created FROM users ORDER BY date_created DESC LIMIT 10;
When all the users are stored in a single server, this query is trivial. However, when the data is split across multiple shards, we now need a way to aggregate this data. Implementing this is outside the scope of this post, but the general approach looks something like this:
- Issue the query on every shard
- Collect the results from each shard in a central aggregation service
- Combine/filter the individual results into a single result set
If your use case relies heavily on aggregate queries, implementing this aggregation layer is critical and often very complicated. This is also the use case that discourages so many from sharding, even when it is necessary.
In an ideal scenario, the active OLTP workload can be contained within individual shards. Reporting and data warehousing requires the implementation of distributed queries which can be managed through different systems, or via custom aggregation using multi-source MySQL asynchronous replication.
How Do I Find My Data?
Now, assuming that you have chosen a sharding key, you need to be able to retrieve records when needed. This is the next major hurdle that needs to be overcome. There are two distinct components to this challenge:
- Determining which shard holds the data
- How to connect to that shard from my application
Finding the ShardID
Determining which shard holds that data can be as simple or complex as you want. However, with each approach, there are tradeoffs. The simple approach using a simple hash/modulus to determine the shard looks something like this:
shardID = identifier % numShards return shardID
The most basic example would be sharding by userID across 2 shards. UserIDs that are even would be on shard 0 and odd userIDs would be on shard 1. While incredibly simplistic, what do I do when 2 shards aren’t enough? The goal of this architecture is to just add more shards when individual shards get too big. So now, when I add new shards, the number of shards increases so the shardID may change for many of the records.
A more flexible approach would be to use a directory service. In this approach, there is a very simple datastore that maps an identifier to a shard. To determine which shard holds the data, you simply query the datastore with the identifier and it returns the shardID. This gives incredible flexibility for moving data between shards, but also adds complexity to the system. Naturally, this service itself would need to be highly available, but doesn’t have to be in MySQL. In addition to complexity, you have potentially introduced additional latency to the system with an extra lookup.
Connecting to the Shard
Once we have the proper shardID, we need to connect to the database cluster in question. Again, we have options and the implementation is dependent on the application. We can look at one of two main approaches:
- Use a different connection string for each shard
- Leverage a proxy layer that routes intelligently
Using a different connection string is generally self-explanatory: I have a collection of shardIDs and their corresponding connection strings. When I know the shardID, I then fetch the connection string, connect to the cluster, and away we go.
But we know developers don’t want to (and shouldn’t) mess with connection strings and credentials. So a proxy layer that can route the query based on the query itself or some metadata in the query is ideal. Being a layer 7 service, ProxySQL is able to inspect the queries themselves and make decisions based on defined rules. While definitely not the only way, let’s look at a sample ProxySQL implementation that uses query comments to route to the correct shard.
Sample ProxySQL Implementation
For our example, here is the basic architecture:
- Sharding users by UserID
- Directory service that does simple routing based on modulus
- ProxySQL is already used for HA
- Data is split across 3 shards
A typical user query may look something like this:
SELECT * FROM users WHERE user_id = 5
With UserID being the shard key and simple modulus sharding strategy, we can identify the shardID easily for this query:
Identifier % numShards = shardID 5 % 3 = 2
With the shardID defined, we can now inject a comment into the original query that can be used by ProxySQL to properly route the query to the correct shard:
SELECT /* shard=shard_2 */ * FROM users WHERE user_id = 5
ProxySQL uses the concept of hostgroups to define routing internally. In many HA cases, this is simply a reader and writer hostgroup. For the sharding architecture, however, we can expand that define hostgroups as actual shards:
INSERT INTO mysql_servers (hostgroup_id, hostname, comment) VALUES (1,'','Shard 0'), (2,'','Shard 1') (3,'','Shard 2');
With the hostgroups in place, we just need to add a ProxySQL rule to choose the hostgroup based on the injected comment:
INSERT INTO mysql_query_rules (rule_id,active,match_pattern,destination_hostgroup,apply) VALUES (5,1,'\S*\s*\/\*\s*shard=shard_0\s*\*\/\S*\s*',1,1), (6,1,'\S*\s*\/\*\s*shard=shard_1\s*\*\/\S*\s*',2,1), (7,1,'\S*\s*\/\*\s*shard=shard_2\s*\*\/\S*\s*',3,1);
Now, after we issue a few queries, you can see in the ProxySQL stats that we are matching the shardID and routing accordingly:
*************************** 1. row *************************** active: 1 hits: 3 rule_id: 6 match_digest: NULL match_pattern: \S*\s*\/\*\s*shard=shard_1\s*\*\/\S*\s* replace_pattern: NULL cache_ttl: NULL apply: 1 flagIN: 0 flagOUT: NULL *************************** 2. row *************************** active: 1 hits: 5 rule_id: 7 match_digest: NULL match_pattern: \S*\s*\/\*\s*shard=shard_2\s*\*\/\S*\s* replace_pattern: NULL cache_ttl: NULL apply: 1 flagIN: 0 flagOUT: NULL
Obviously, this is a massive simplification and you’d want rules that are more generic and more complex hostgroups. For example, your deployment would need to account for queries where no shard hint is provided or the case where an invalid shardID is supplied. However, it displays the potential of writing a custom routing layer within the very lightweight tool ProxySQL. This approach, while it requires some additional thought and logic, could be a viable first step towards splitting your data horizontally.
Again, this is just a sample proof-of-concept implementation. Each use case is different. The main takeaway from this post is that manual sharding is possible, but requires significant planning and implementation work.
Percona Distribution for MySQL is the most complete, stable, scalable, and secure, open-source MySQL solution available, delivering enterprise-grade database environments for your most critical business applications… and it’s free to use!
Finding Undetected Jumbo Chunks in MongoDB
I recently came across an interesting case of performance issues during balancing in a MongoDB cluster. Digging through the logs, it became clear the problem was related to chunk moves taking a long time.
As we know, the default maximum chunk size is 64 MB. So these migrations are supposed to be very fast in most of the hardware in use nowadays.
In this case, there were several chunks way above that limit being moved around. How can this happen? Shouldn’t those chunks have been marked as Jumbo?
Recap on Chunk Moves
For starters, let’s review what we know about chunk moves.
MongoDB does not move a chunk if the number of documents in it is greater than 1.3 times the result of dividing the configured chunk size (default 64 MB) by the average document size.
If we have a collection with an average document size of 2 KB, a chunk with more than 42597 (65535 / 2 * 1.3) documents won’t be balanced. This might be an issue for collections with non-uniform document sizes, as the estimation won’t be very good in that case.
Also, if a chunk grows past the maximum size, it is flagged as Jumbo. This means the balancer will not try to move it. You can check out the following article for more information about dealing with jumbo chunks.
Finally, we must keep in mind that chunk moves take a metadata lock at a certain part of the process, so write operations are blocked momentarily near the end of a chunk move.
The autoSplitter in Action
Before MongoDB 4.2, the autoSplitter process was responsible for ensuring chunks do not exceed the maximum size runs on the mongos router. The router process keeps some statistics about the operations it performs. These statistics are consulted to make chunk-splitting decisions.
The problem is that in a production environment, there are usually many of these processes deployed. An individual mongos router only has a partial idea of what’s happening with a particular chunk of data.
If multiple mongos routers change data on the same chunk, it could grow beyond the maximum size and still go unnoticed.
There are many reported issues related to this topic (for example, see SERVER-44088, SERVER-13806, and SERVER-16715).
Frequently restarting mongos processes could also lead to the same problem. This causes the statistics used to make chunk-splitting decisions to be reset.
Because of these issues, the autoSplitter process was changed to run on the primary member of each shard’s replica set. This happened in MongoDB version 4.2 (see SERVER-9287 for more details).
The process should better prevent jumbo chunks now, as each shard has a more accurate view of what data it contains (and how it is being changed).
The MongoDB version, in this case, was 3.6, and the autoSplitter was not doing as good a job as expected.
Finding Undetected Jumbo Chunks in MongoDB
For starters, I came across a script to print a summary of the chunks statistics here. We can build upon it to also display information about Jumbo chunks and generate the commands required to manually split any chunks exceeding the maximum size.
var allChunkInfo = function(ns){ var chunks = db.getSiblingDB("config").chunks.find({"ns" : ns}).sort({min:1}).noCursorTimeout(); //this will return all chunks for the ns ordered by min var totalChunks = 0; var totalJumbo = 0; var totalSize = 0; var totalEmpty = 0; chunks.forEach( function printChunkInfo(chunk) { var db1 = db.getSiblingDB(chunk.ns.split(".")[0]) // get the database we will be running the command against later var key = db.getSiblingDB("config").collections.findOne({_id:chunk.ns}).key; // will need this for the dataSize call // dataSize returns the info we need on the data, but using the estimate option to use counts is less intensive var dataSizeResult = db1.runCommand({datasize:chunk.ns, keyPattern:key, min:chunk.min, max:chunk.max, estimate:false}); if(dataSizeResult.size > 67108864) { totalJumbo++; print('sh.splitFind("' + chunk.ns.toString() + '", ' + JSON.stringify(chunk.min) + ')' + ' // '+ chunk.shard + ' ' + Math.round(dataSizeResult.size/1024/1024) + ' MB ' + dataSizeResult.numObjects ); } totalSize += dataSizeResult.size; totalChunks++; if (dataSizeResult.size == 0) { totalEmpty++ }; //count empty chunks for summary } ) print("***********Summary Chunk Information***********"); print("Total Chunks: "+totalChunks); print("Total Jumbo Chunks: "+totalJumbo); print("Average Chunk Size (Mbytes): "+(totalSize/totalChunks/1024/1024)); print("Empty Chunks: "+totalEmpty); print("Average Chunk Size (non-empty): "+(totalSize/(totalChunks-totalEmpty)/1024/1024)); }
The script has to be called from a mongos router as follows:
mongos> allChunkInfo("db.test_col")
And it will print any commands to perform chunk splits as required:
sh.splitFind("db.test_col", {"_id":"jhxT2neuI5fB4o4KBIASK1"}) // shard-1 222 MB 7970 sh.splitFind("db.test_col", {"_id":"zrAESqSZjnpnMI23oh5JZD"}) // shard-2 226 MB 7988 sh.splitFind("db.test_col", {"_id":"SgkCkfSDrY789e9nD4crk9"}) // shard-1 218 MB 7986 sh.splitFind("db.test_col", {"_id":"X5MKEH4j32OhmAhY7LGPMm"}) // shard-1 238 MB 8338 ... ***********Summary Chunk Information*********** Total Chunks: 5047 Total Jumbo Chunks: 120 Average Chunk Size (Mbytes): 19.29779934868946 Empty Chunks: 1107 Average Chunk Size (non-empty): 24.719795257064895
You can see the script prints the commands to split the jumbo chunks, along with the shard where each one lives, the size of the chunk, and the number of documents on it. At the end, summary information is also displayed.
The estimate: true option in the dataSize command causes it to use the average document size of the collection for the estimation. This is not very accurate if the document size has many variances but runs faster and is less resource-intensive. Consider setting that option if you know your document size is more or less uniform.
All that is left is to run the commands generated by the script, and the jumbo chunks should be gone. The balancer will then move chunks during the next balancing window as required to even out the data. Consider scheduling the balancing window outside the peak workload hours to reduce the impact.
Even though MongoDB 3.6 has been EOL for some time now, many people are still running it, mostly due to outdated drivers on the application side that are not compatible with newer releases. Nowadays, the autoSplitter should be doing a better job of keeping chunk size at bay.
If you are still running a MongoDB version older than 4.2, it would be a good idea to review your chunk stats from time to time to avoid some nasty surprises.
Percona Distribution for MongoDB is a freely available MongoDB database alternative, giving you a single solution that combines the best and most important enterprise components from the open source community, designed and tested to work together.
Resharding in MongoDB 5.0
MongoDB 5.0 has been released with a bunch of new features. An important one is the capability to completely redefine the shard key of a collection in a sharded cluster. Resharding was a feature frequently requested by the customers. In MongoDB 4.4 only the refining of the shard was available; from now on you can change your shard key using also different fields.
When you use sharding in MongoDB and you fail to create the shard key of a collection, you can face issues like:
- Unbalanced distribution of the data
- Having jumbo chunks that cannot be split and cannot be migrated
These issues can simply make your cluster very slow, making some of the shards extremely overloaded. Also, the cluster can be really unmanageable and crashes could happen at some point.
The Problem of Changing a Shard Key
I had a customer running a 5-shard cluster with more than 50TB of data. They had a few collections with sub-optimal shard keys. They had one of the shards having more than 75% of the data and workload, and they had thousands of jumbo chunks around. Some of the jumbo chunks reached 200GB in size; really a lot. The cluster was terribly slow, chunk migration attempts were a lot and they failed very frequently, and the application was most of the time unresponsive due to timeouts. Manual splits and migrations of jumbo chunks were impossible. The only solution was to create a different shard key for those collections, but this wasn’t possible with that MongoDB version. The only suggested way to change a shard key was to drop and recreate the collection. Yes, remember also to dump and reload the data. This requires you to stop your application for an impressive amount of time if the dataset is large. That was the case.
In order to avoid stopping the application, we deployed a new empty cluster with a few more shards as well. We created all the empty collections in the new cluster with the optimal shard key. Then we moved the data out of the old cluster with custom scripts, one piece at the time, using boundaries on indexed timestamp fields. The solution required the customer to deploy some application changes. The application was instructed to read/write from a different cluster depending if the affected data was already migrated into the new cluster or not. The entire migration process took weeks to complete. In the end, it worked and the customer now has a balanced cluster with no jumbo chunks.
Anyway, the solution came at some cost in terms of new machines, development effort, and time.
Fortunately, MongoDB 5.0 introduced the reshardCollection command. From now on changing a shard key should be less expensive than in the past. That’s awesome.
In this article, we’ll take a look at how resharding in MongoDB 5.0 works.
The way resharding works is simple. When you issue the reshardCollection command, a new implicit empty collection is created by MongoDB with the new shard key defined, and the data will be moved from the existing collection chunk by chunk. There is a two-second lock required by the balancer to determine the new data distribution, but during the copy phase, the application can continue to read and write the collection with no issues.
Due to this copy phase, the larger the collection is, the more time the resharding takes.
Prerequisites for Resharding
There are some requirements you need to validate before starting the resharding, and some could be expensive, in certain cases.
- Disk space: be sure to have at least 1.2x the size of the collection you’re going to reshard. If the collection is 1TB, you need at least 1.2TB free space in your disk.
- I/O capacity should be below 50%
- CPU load should be below 80%
The resources available must be evaluated on each shard.
If you fail to provide these resources the database will run out of space or the resharding could take longer than expected.
Another important requirement is that you will need to deploy changes to your application queries. To let the database work best during the sharding process, the queries must use filters on both the current shard key and the new shard key. Only at the end of the resharding process you may drop all the filters regarding the old shard key out of your queries.
This requirement is very important and in some cases it could be a little expensive. Temporarily changing the application code is sometimes a hard task.
There also other limitations you need to consider:
- Resharing is not permitted if an index built is running
- Some queries will return error if you didn’t include both the current and new shard keys: deleteOne(), findAnd Modify(), updateOne() … check the manual for the full list
- You can reshard only one collection at a time
- You cannot use addShard(), removeShard(), dropDatabase(), db.createCollection() while resharding is running
- The new shard key cannot have a uniqueness constraint
- Resharding a collection with a uniqueness constraint is not supported
Let’s Test Resharding in MongoDB
To test the new feature, I created a sharded cluster using AWS instances, t2-large with 2 CPUs, 8 GB RAM, and EBS volume. I deployed a two-shard cluster using just a single member for each replica set.
Let’s create a sharded collection and insert some sample data.
At the beginning we create the shard key on the age field. To create the random data, we use some functions and a javascript loop to insert one million documents.
[direct: mongos] testdb> sh.shardCollection("testdb.testcoll", { age: 1} ) { collectionsharded: 'testdb.testcoll', ok: 1, '$clusterTime': { clusterTime: Timestamp({ t: 1628588848, i: 25 }), signature: { hash: Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), keyId: Long("0") } }, operationTime: Timestamp({ t: 1628588848, i: 21 }) } [direct: mongos] testdb> function getRandomInt(min, max) { ... return Math.floor(Math.random() * (max - min + 1)) + min; ... } [Function: getRandomInt] [direct: mongos] testdb> var day = 1000 * 60 * 60 * 24; [direct: mongos] testdb> function getRandomDate() { ... return new Date( - (Math.floor(Math.random() * day))) ... } [Function: getRandomDate] [direct: mongos] testdb> function getRandomString() { ... return (Math.random()+1).toString(36).substring(2) ... } [Function: getRandomString] [direct: mongos] testdb> for (var i=1; i<=1000000; i++) { ... db.testcoll.insert( ..... { ....... uid: i, ....... name: getRandomString(), ....... date_created: getRandomDate(), ....... age: getRandomInt(1,100), ....... address: getRandomString(), ....... city: getRandomString(), ....... country: getRandomString() ....... } ..... ) ... }
Now we have our initial collection. Let’s take a look at the distribution of the chunks and the sharding status of the cluster.
{ database: { _id: 'testdb', primary: 'sh1-rs', partitioned: true, version: { uuid: UUID("efda5b69-1ffc-42c3-abed-ad20e08e321d"), timestamp: Timestamp({ t: 1628527701, i: 21 }), lastMod: 1 } }, collections: { 'testdb.testcoll': { shardKey: { age: 1 }, unique: false, balancing: true, chunkMetadata: [ { shard: 'sh0-rs', nChunks: 2 }, { shard: 'sh1-rs', nChunks: 3 } ], chunks: [ { min: { age: MinKey() }, max: { age: 1 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 0 }) }, { min: { age: 1 }, max: { age: 42 }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 4, i: 0 }) }, { min: { age: 42 }, max: { age: 72 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 2 }) }, { min: { age: 72 }, max: { age: 100 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 3 }) }, { min: { age: 100 }, max: { age: MaxKey() }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 5, i: 1 }) } ], tags: [] } } }
With the current shard key on the age field we’ve got 5 chunks in total, 2 on sh0-rs and 3 on sh1-rs. The total size of the collection is around 200MB (uncompressed).
It’s time to test the resharding. Let’s try to change the shard key to { name: 1 }. The following is the right syntax where we provide the namespace to reshard and the new shard key:
[direct: mongos] testdb> db.adminCommand({ ... reshardCollection: "testdb.testcoll", ... key: { name: 1 } ... })
From another connection we can monitor the status of the resharding:
[direct: mongos] testdb> db.getSiblingDB("admin").aggregate([ { $currentOp: { allUsers: true, localOps: false } }, { $match: { type: "op", "originatingCommand.reshardCollection": "testdb.testcoll" } }]) [ { shard: 'sh0-rs', type: 'op', desc: 'ReshardingRecipientService f86967fe-36a9-4836-8972-1061a61230df', op: 'command', ns: 'testdb.testcoll', originatingCommand: { reshardCollection: 'testdb.testcoll', key: { name: 1 }, unique: false, collation: { locale: 'simple' } }, totalOperationTimeElapsedSecs: Long("145"), remainingOperationTimeEstimatedSecs: Long("173"), approxDocumentsToCopy: Long("624135"), documentsCopied: Long("569868"), approxBytesToCopy: Long("95279851"), bytesCopied: Long("86996201"), totalCopyTimeElapsedSecs: Long("145"), oplogEntriesFetched: Long("0"), oplogEntriesApplied: Long("0"), totalApplyTimeElapsedSecs: Long("0"), recipientState: 'cloning', opStatus: 'running' }, { shard: 'sh0-rs', type: 'op', desc: 'ReshardingDonorService f86967fe-36a9-4836-8972-1061a61230df', op: 'command', ns: 'testdb.testcoll', originatingCommand: { reshardCollection: 'testdb.testcoll', key: { name: 1 }, unique: false, collation: { locale: 'simple' } }, totalOperationTimeElapsedSecs: Long("145"), remainingOperationTimeEstimatedSecs: Long("173"), countWritesDuringCriticalSection: Long("0"), totalCriticalSectionTimeElapsedSecs: Long("0"), donorState: 'donating-initial-data', opStatus: 'running' }, { shard: 'sh1-rs', type: 'op', desc: 'ReshardingDonorService f86967fe-36a9-4836-8972-1061a61230df', op: 'command', ns: 'testdb.testcoll', originatingCommand: { reshardCollection: 'testdb.testcoll', key: { name: 1 }, unique: false, collation: { locale: 'simple' } }, totalOperationTimeElapsedSecs: Long("145"), remainingOperationTimeEstimatedSecs: Long("276"), countWritesDuringCriticalSection: Long("0"), totalCriticalSectionTimeElapsedSecs: Long("0"), donorState: 'donating-initial-data', opStatus: 'running' }, { shard: 'sh1-rs', type: 'op', desc: 'ReshardingRecipientService f86967fe-36a9-4836-8972-1061a61230df', op: 'command', ns: 'testdb.testcoll', originatingCommand: { reshardCollection: 'testdb.testcoll', key: { name: 1 }, unique: false, collation: { locale: 'simple' } }, totalOperationTimeElapsedSecs: Long("145"), remainingOperationTimeEstimatedSecs: Long("276"), approxDocumentsToCopy: Long("624135"), documentsCopied: Long("430132"), approxBytesToCopy: Long("95279851"), bytesCopied: Long("65663031"), totalCopyTimeElapsedSecs: Long("145"), oplogEntriesFetched: Long("0"), oplogEntriesApplied: Long("0"), totalApplyTimeElapsedSecs: Long("0"), recipientState: 'cloning', opStatus: 'running' } ]
With the totalOperationTimeElapsedSecs you can see the time elapsed and with the remainingOperationTimeEstimatedSecs the estimated time for completing the operation.
While resharding is running we are allowed to write into the collection. For example, I tried the following and it worked:
[direct: mongos] testdb> db.testcoll.insert({ name: "new doc" }) { acknowledged: true, insertedIds: { '0': ObjectId("61125fa53916a70b7fdd8f6c") } }
The full resharding of the collection took around 5 minutes. Let’s now see the new status of the sharding.
{ database: { _id: 'testdb', primary: 'sh1-rs', partitioned: true, version: { uuid: UUID("efda5b69-1ffc-42c3-abed-ad20e08e321d"), timestamp: Timestamp({ t: 1628527701, i: 21 }), lastMod: 1 } }, collections: { 'testdb.testcoll': { shardKey: { name: 1 }, unique: false, balancing: true, chunkMetadata: [ { shard: 'sh0-rs', nChunks: 3 }, { shard: 'sh1-rs', nChunks: 3 } ], chunks: [ { min: { name: MinKey() }, max: { name: '7gbk73hf42g' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 6, i: 0 }) }, { min: { name: '7gbk73hf42g' }, max: { name: 'cv1oqoetetf' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 1 }) }, { min: { name: 'cv1oqoetetf' }, max: { name: 'kuk1p3z8kc' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 2, i: 2 }) }, { min: { name: 'kuk1p3z8kc' }, max: { name: 'ppq4ubqwywh' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 2, i: 3 }) }, { min: { name: 'ppq4ubqwywh' }, max: { name: 'zsods6qhqp' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 1, i: 3 }) }, { min: { name: 'zsods6qhqp' }, max: { name: MaxKey() }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 6, i: 1 }) } ], tags: [] } } }
We can see the shard key has been changed, and the chunks are now evenly distributed.
[direct: mongos] testdb> db.testcoll.getShardDistribution() Shard sh1-rs at sh1-rs/ { data: '92.72MiB', docs: 636938, chunks: 3, 'estimated data per chunk': '30.9MiB', 'estimated docs per chunk': 212312 } --- Shard sh0-rs at sh0-rs/ { data: '82.96MiB', docs: 569869, chunks: 3, 'estimated data per chunk': '27.65MiB', 'estimated docs per chunk': 189956 } --- Totals { data: '175.69MiB', docs: 1206807, chunks: 6, 'Shard sh1-rs': [ '52.77 % data', '52.77 % docs in cluster', '152B avg obj size on shard' ], 'Shard sh0-rs': [ '47.22 % data', '47.22 % docs in cluster', '152B avg obj size on shard' ] }
Good. It worked.
Out of curiosity, during the resharding, the temporary collection being copied is visible. Just run sh.status():
collections: { 'testdb.system.resharding.defda80d-ea5f-4e52-9a7f-9b3e2ed8ddf1': { shardKey: { name: 1 }, unique: false, balancing: true, chunkMetadata: [ { shard: 'sh0-rs', nChunks: 3 }, { shard: 'sh1-rs', nChunks: 3 } ], chunks: [ { min: { name: MinKey() }, max: { name: '9bnfw8n23l' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 2, i: 0 }) }, { min: { name: '9bnfw8n23l' }, max: { name: 'etd3ho9vfwg' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 2, i: 1 }) }, { min: { name: 'etd3ho9vfwg' }, max: { name: 'kmht0br01l' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 1, i: 2 }) }, { min: { name: 'kmht0br01l' }, max: { name: 'sljcb1s70g' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 1, i: 3 }) }, { min: { name: 'sljcb1s70g' }, max: { name: 'zzi3ijuw2f' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 1, i: 4 }) }, { min: { name: 'zzi3ijuw2f' }, max: { name: MaxKey() }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 1, i: 5 }) } ], tags: [] }, 'testdb.testcoll': { shardKey: { age: 1 }, unique: false, balancing: true, chunkMetadata: [ { shard: 'sh0-rs', nChunks: 2 }, { shard: 'sh1-rs', nChunks: 3 } ], chunks: [ { min: { age: MinKey() }, max: { age: 1 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 0 }) }, { min: { age: 1 }, max: { age: 42 }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 4, i: 0 }) }, { min: { age: 42 }, max: { age: 72 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 2 }) }, { min: { age: 72 }, max: { age: 100 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 3 }) }, { min: { age: 100 }, max: { age: MaxKey() }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 5, i: 1 }) } ], tags: [] } } }
testdb.system.resharding.defda80d-ea5f-4e52-9a7f-9b3e2ed8ddf1 is the newly created collection.
Some Graphs from Percona Monitoring and Management
I deployed Percona Monitoring and Management (PMM) to monitor the cluster and for evaluating the impact of the resharding operation. As said, I used a little test environment with no workload at all, so what we can see on the following graphs is just the impact of the resharding.
The cluster has one config server and two shards. Each replica set is just a single member running on a dedicated virtual host in AWS EC2.
Let’s see the CPU usage
The node in the middle is the config server. The data-bearing nodes had more impact in terms of CPU usage, quite relevant. The config server didn’t have any impact.
Let’s see the Disk utilization in terms of IOPS and latency:
The same as the CPU, the config server didn’t have any impact. Instead, the shards increased the IOPS as expected since a new collection has been created. The disk latency had spikes to more than 20 ms at the beginning and at the end of the process.
MongoDB increased the latency of the read operations.
The WiredTiger storage engine increased the number of transactions processed and the cache activity as expected.
Other metrics increased but they are not included here. This behavior was expected and it’s normal for a resharding operation.
The resharding is quite expensive, even for a small collection, in particular for the CPU utilization. Anyway, more tests are needed with larger collections, in the GB/TB magnitude.
Remember the resharding time depends on the size of the collection. Be aware of that and expect the performance decrease for your servers for some time when you plan to do a resharding.
Resharding in MongoDB 5.0 is an amazing new feature but it comes with some additional costs and limitations. Indeed, resharding requires resources in terms of disk space, I/O, and CPU. Be aware of that and plan carefully your resharding operations.
As a best practice, I suggest spending as much time as possible the first time you decide to shard a collection. If you choose the optimal shard key from the beginning, then you won’t need resharding. A good schema design is always the best approach when working with MongoDB.
I suggest continuously monitoring the status of the chunk distribution in your collections. As soon as you notice there’s something wrong with the shard key and the distribution is uneven and it’s getting worse, then use reshardCollection as soon as you can, while the collection is not too large.
Another suggestion is to do resharding only during a maintenance window because the performance impact for your cluster could be significant in the case of very large collections.
Further readings:
Percona Distribution for MongoDB is a freely available MongoDB database alternative, giving you a single solution that combines the best and most important enterprise components from the open source community, designed and tested to work together.
Sharding With Zones Based on Compound Shard-Keys on MongoDB 4.4
This article was written with the main purpose of showing you how to determine zones on a shard when using compound shard keys.
Defining Zones in Shards means pre-defining where certain chunks will be stored and amongst which set of particular shards they will be balanced according to the shard key definition.
MongoDB 4.4 brings the possibility to shard a collection and determine zones by compound keys, including mixing a hash key with non-hashed keys. Hashed keys may be placed in the prefix of the index (shard key) or not. Depending on the method chosen, different settings must be in compliance with the balancer and this article will also show you a couple of examples.
Defining the Index Prior to Sharding
Even though it is not always required to create indexes in advance of running the “shardCollection” command, I am defining it to better illustrate the procedure
- The collections colltest1 will be sharded based on a key containing the hashed key in the prefix
mongos> db.getSiblingDB("dbtest").colltest1.createIndex({_id:"hashed","location":1},{unique:false}) { "raw" : { "shard02/localhost:40003" : { "createdCollectionAutomatically" : true, "numIndexesBefore" : 1, "numIndexesAfter" : 2, "commitQuorum" : "votingMembers", "ok" : 1 } }, "ok" : 1, "operationTime" : Timestamp(1624357456, 7), "$clusterTime" : { ... } }
- The collections colltest2 will be sharded based on a key containing the non-hashed key in the prefix
mongos> db.getSiblingDB("dbtest").colltest2.createIndex({"location":1,_id:"hashed"},{unique:false}) { "raw" : { "shard02/localhost:40003" : { "createdCollectionAutomatically" : true, "numIndexesBefore" : 1, "numIndexesAfter" : 2, "commitQuorum" : "votingMembers", "ok" : 1 } }, "ok" : 1, "operationTime" : Timestamp(1624357534, 2), "$clusterTime" : { ... } }
It is important to highlight a couple of important points about creating the indexes for the shard keys:
- Once the collection is empty or if it is a new one, the command used to get a collection sharded will automatically create the indexes if they are not present yet. This will be described in the upcoming sections.
- If the shard key prefix will not be a hashed value, the indexes shall be created with the option {unique:false}
Creating the Initial Chunks Based on Hashed Key as a Prefix
There are, basically, a couple of requirements to ensure that the collection will be in compliance with the balancer and the initial chunk distribution will be optimally performed.
- For each non-hashed value, a range of the hashed key must be defined with upper and lower boundaries.
- The collection must be sharded with the option presplitHashedZones: true if there is a hashed field.
The below example shows how to shard the collection colltest1.
Defining the Zones and Assigning to the Shards (To make things brief, only one zone “LATAM” is used in this example.)
mongos> sh.addShardToZone("shard01", "LATAM") { "ok" : 1, "operationTime" : Timestamp(1624359227, 38), "$clusterTime" : { ... } } mongos> sh.addShardToZone("shard02", "LATAM") { "ok" : 1, "operationTime" : Timestamp(1624359231, 1), "$clusterTime" : {... } }
If you check the output of the sh.status(), you will notice that tags were created and assigned to the shards according to the zones definition:
mongos> sh.status() --- Sharding Status --- sharding version: { "_id" : 1, "minCompatibleVersion" : 5, "currentVersion" : 6, "clusterId" : ObjectId("60d1be52be9f36000b01a9f0") } shards: { "_id" : "shard01", "host" : "shard01/localhost:40002", "state" : 1, "tags" : [ "LATAM" ] } { "_id" : "shard02", "host" : "shard02/localhost:40003", "state" : 1, "tags" : [ "LATAM" ] } <<the rest of status output was intentionally truncated to avoid unnecessary verbosity>>
Creating the Zone Ranges
mongos> sh.updateZoneKeyRange("dbtest.colltest1",{ "_id" : MinKey, "location" : MinKey },{ _id:MaxKey,"location" : MaxKey },"LATAM"); { "ok" : 1, "operationTime" : Timestamp(1624361452, 1), "$clusterTime" : { ... } }
Enabling Shard
mongos> sh.enableSharding("dbtest") { "ok" : 1, "operationTime" : Timestamp(1624361047, 6), "$clusterTime" : { ... } } mongos> sh.shardCollection("dbtest.colltest1",{_id:"hashed","location":1},false,{ presplitHashedZones: true }) { "collectionsharded" : "dbtest.colltest1", "collectionUUID" : UUID("30b2efff-2e2e-4c72-9a82-9593f227002f"), "ok" : 1, "operationTime" : Timestamp(1624361528, 31), "$clusterTime" : { ... } }
In this example, the namespace dbtest.colltest1 will be evenly distributed according to the zone LATAM which will reach the shards shard01 and shard02. Looking at the sh.status() again, you will see that the initial chunks were created following the range defined above.
Creating the Initial Chunks Based on Non-Hashed Key as a Prefix
This example section will be a little bit more complicated to make the shard key compliant with the balancer for the initial chunk distribution.
- You need to specify MinKey for each field in the shard key, which defines the lower boundary of each zone range.
- Every zone must comprise a range, so at least one of the fields in the shard key must have an upper-boundary value larger than its MinKey
- Every combination of values for the fields of the shard key will fall within the range of one of the defined zones.
- At the moment of sharding the collection, presplitHashedZones must be set to true.
- Not required, but likely desirable: One of the zones should define its upper boundary as MaxKey so it acts as a catchall for out-of-range values.
Defining the Zones and Assigning Them to the Shards
The zones were defined differently for this example:
mongos> sh.status() --- Sharding Status --- sharding version: { "_id" : 1, "minCompatibleVersion" : 5, "currentVersion" : 6, "clusterId" : ObjectId("60d1be52be9f36000b01a9f0") } shards: { "_id" : "shard01", "host" : "shard01/localhost:40002", "state" : 1, "tags" : [ "EU" ] } { "_id" : "shard02", "host" : "shard02/localhost:40003", "state" : 1, "tags" : [ "LATAM" ] } { "_id" : "shard03", "host" : "shard03/localhost:40004", "state" : 1, "tags" : [ "AMER" ] } { "_id" : "shard04", "host" : "shard04/localhost:40005", "state" : 1, "tags" : [ "APAC" ] }
Creating the Zone Ranges
mongos> sh.updateZoneKeyRange("dbtest2.colltest2",{ "location": "DC01", "_id" : MinKey },{ "location": "DC02", "_id" : MinKey },"LATAM"); { "ok" : 1, "operationTime" : Timestamp(1624364375, 1), "$clusterTime" : { ... } } mongos> sh.updateZoneKeyRange("dbtest2.colltest2",{ "location": "DC02", "_id" : MinKey },{ "location": MaxKey, "_id" : MinKey },"EU"); { "ok" : 1, "operationTime" : Timestamp(1624364430, 1), "$clusterTime" : { ... } }
Enabling Shard
mongos> sh.enableSharding("dbtest2") { "ok" : 1, "operationTime" : Timestamp(1624366057, 5), "$clusterTime" : { ... } } mongos> sh.shardCollection("dbtest2.colltest2",{"location":1,_id:"hashed"},false,{ presplitHashedZones: true }) { "collectionsharded" : "dbtest2.colltest2", "collectionUUID" : UUID("b8a64972-97df-4791-8019-9526d2f8d405"), "ok" : 1, "operationTime" : Timestamp(1624366085, 37), "$clusterTime" : { ... } } mongos>
The above example basically describes how to use a non-hashed field on the prefix of a shard key to ensure that certain values of that field will reach certain zones (determined as tags on shards) and the boundaries applied on the hashed field will ensure even distribution. In that case, all the docs of the namespace dbtest2.colltest2 with the minimum _id of location DC01 and the minimum _id of the location DC02, will be placed on the zone LATAM (shard02). And the next docs from the minimum _id of the location DC02 until the rest will be placed on the zone EU (shard 01)
It is very important to highlight that if the collection is not in compliance with the balancer, the migration of the chunks will never happen, though all the chunks will stay on the Primary Shard. It is possible to predict that situation right after enabling the sharding on the collection by looking at the output of the command sh.balancerCollectionStatus
mongos> sh.balancerCollectionStatus("dbtest.colltest1") { "balancerCompliant" : true, "ok" : 1, "operationTime" : Timestamp(1623691376, 1), "$clusterTime" : { ... } }
If the balancerCompliant is true, means that the balancer will be able to split and migrate the chunks.
Defining the shard key is the most important step of deploying a healthy sharded cluster. Having a field on the shard key which contains a few distinct values would compromise the shard distribution, and as consequence, the performance. Hence, this is a great improvement coming along in MongoDB 4.4 which makes it possible to have keys with low cardinality defining the boundaries, yet still ensuring that the shard will rely on a hashed distribution based on a very selective key.
Webinar June 29: Unlocking the Mystery of MongoDB Shard Key Selection
Do You Know How to Choose the Right MongoDB Shard Key for Your Business?
In our upcoming panel, Percona MongoDB experts Mike Grayson, Kimberly Wilkins, and Vinicius Grippa will discuss the complex issue of MongoDB shard key selection and offer advice on the measures to take if things go wrong.
Selecting the right shard key is one of the most important MongoDB design decisions you make, as it impacts performance and data management. Choosing the wrong shard key can have a disastrous effect on both.
MongoDB 5.0 is due to be released this summer, and it is likely to include another big change around sharding. Following last year’s 4.4 release that included refinable shard keys, we expect to see a new feature that allows for fully changeable shard keys for the first time.
But, even with refinable and changeable options, shard key selection will continue to be a crucial MongoDB task.
Join us for the panel, where Mike, Kimberly, and Vinicius will highlight some of the perils and pitfalls to avoid, as well as offering shard key best practices such as:
* Factors to consider when selecting your shard key
* Hidden “gotcha’s” around shard selection and architecture
* Examples of the worst shard keys our experts have seen
* What happens when you have a busted shard key, and how you can mitigate the impact
* What’s happening “under MongoDB’s hood” with all the changes?
* The future of shard keys for MongoDB
Please join Percona Technical Experts Mike Grayson, Kimberly Wilkins, and Vinicius Grippa on June 29, 2021, at 1 pm EDT for their webinar Unlocking the Mystery of MongoDB Shard Key Selection.
If you can’t attend, sign up anyway, and we’ll send you the slides and recording afterward.
Storing Kubernetes Operator for Percona Server for MongoDB Secrets in Github
More and more companies are adopting GitOps as the way of implementing Continuous Deployment. Its elegant approach built upon a well-known tool wins the hearts of engineers. But even if your git repository is private, it’s strongly discouraged to store keys and passwords in unencrypted form.
This blog post will show how easy it is to use GitOps and keep Kubernetes secrets for Percona Kubernetes Operator for Percona Server for MongoDB securely in the repository with Sealed Secrets or Vault Secrets Operator.
Sealed Secrets
- Kubernetes cluster up and running
- Github repository (optional)
- The files and keys that I used are stored in a public blog-data repository
Install Sealed Secrets Controller
Sealed Secrets rely on asymmetric cryptography (which is also used in TLS), where the private key (which in our case is stored in Kubernetes) can decrypt the message encrypted with the public key (which can be stored in public git repository safely). To make this task easier, Sealed Secrets provides the kubeseal tool, which helps with the encryption of the secrets.
Install kubeseal operator into your Kubernetes cluster:
kubectl apply -f
It will install the controller into the kube-system namespace and provide the Custom Resource Definition
. All resources in Kubernetes with
kind: SealedSecrets
will be handled by this Operator.
Download the kubeseal binary:
wget -O kubeseal sudo install -m 755 kubeseal /usr/local/bin/kubeseal
Encrypt the Keys
In this example, I intend to store important secrets of the Percona Kubernetes Operator for Percona Server for MongoDB in git along with my manifests that are used to deploy the database.
First, I will seal the secret file with system users, which is used by the MongoDB Operator to manage the database. Normally it is stored in deploy/secrets.yaml.
kubeseal --format yaml < secrets.yaml > blog-data/sealed-secrets/mongod-secrets.yaml
This command creates the file with encrypted contents, you can see it in the blog-data/sealed-secrets repository here. It is safe to store it publicly as it can only be decrypted with a private key.
kubectl apply -f blog-data/sealed-secrets/mongod-secrets.yaml
does the following:
- A sealedsecrets custom resource (CR) is created. You can see it by executing
kubectl get sealedsecrets
. - The Sealed Secrets Operator receives the event that a new sealedsecrets CR is there and decrypts it with the private key.
- Once decrypted, a regular Secrets object is created which can be used as usual.
$ kubectl get sealedsecrets NAME AGE my-secure-secret 20m $ kubectl get secrets my-secure-secret NAME TYPE DATA AGE my-secure-secret Opaque 10 20m
Next, I will also seal the keys for my S3 bucket that I plan to use to store backups of my MongoDB database:
kubeseal --format yaml < backup-s3.yaml > blog-data/sealed-secrets/s3-secrets.yaml kubectl apply -f blog-data/sealed-secrets/s3-secrets.yaml
Vault Secrets Operator
Sealed Secrets is the simplest approach, but it is possible to achieve the same result with HashiCorp Vault and Vault Secrets Operator. It is a more advanced, mature, and feature-rich approach.
- Kubernetes cluster up and running
- Github repository (optional)
- The files and keys that I used are stored in a public blog-data repository
- HashiCorp Vault up and running
Vault Secrets Operator also relies on Custom Resource, but all the keys are stored in HashiCorp Vault:
Create a policy on the Vault for the Operator:
cat <<EOF | vault policy write vault-secrets-operator - path "kvv2/data/*" { capabilities = ["read"] } EOF
The policy might look a bit differently, depending on where your secrets are.
Create and fetch the token for the policy:
$ vault token create -period=24h -policy=vault-secrets-operator
Key Value --- ----- token s.0yJZfCsjFq75GiVyKiZgYVOm ...
Write down the token, as you will need it in the next step.
Create the Kubernetes Secret so that the Operator can authenticate with the Vault:
export VAULT_TOKEN=s.0yJZfCsjFq75GiVyKiZgYVOm export VAULT_TOKEN_LEASE_DURATION=86400 cat <<EOF | kubectl apply -f - apiVersion: v1 kind: Secret metadata: name: vault-secrets-operator type: Opaque data: VAULT_TOKEN: $(echo -n "$VAULT_TOKEN" | base64) VAULT_TOKEN_LEASE_DURATION: $(echo -n "$VAULT_TOKEN_LEASE_DURATION" | base64) EOF
Deploy Vault Secrets Operator
It is recommended to deploy the Operator with Helm, but before we need to create the values.yaml file to configure the operator.
environmentVars: - name: VAULT_TOKEN valueFrom: secretKeyRef: name: vault-secrets-operator key: VAULT_TOKEN - name: VAULT_TOKEN_LEASE_DURATION valueFrom: secretKeyRef: name: vault-secrets-operator key: VAULT_TOKEN_LEASE_DURATION vault: address: "http://vault.vault.svc:8200"
Environment variables are pointing to the Secret that was created in the previous chapter to authenticate with Vault. We also need to provide the Vault address for the Operator to retrieve the secrets.
Now we can deploy the Vault Secrets Operator:
helm repo add ricoberger helm repo update helm upgrade --install vault-secrets-operator ricoberger/vault-secrets-operator -f blog-data/sealed-secrets/values.yaml
Give me the Secret
I have a key created in my HashiCorp Vault:
It is time to create the secret out of it. First, we will create the Custom Resource object of
kind: VaultSecret
$ cat blog-data/sealed-secrets/vs.yaml apiVersion: kind: VaultSecret metadata: name: my-secure-secret spec: path: kvv2/mongod-secret type: Opaque $ kubectl apply -f blog-data/sealed-secrets/vs.yaml
The Operator will connect to HashiCorp Vault and create regular Secret object automatically:
$ kubectl get vaultsecret NAME SUCCEEDED REASON MESSAGE LAST TRANSITION AGE my-secure-secret True Created Secret was created 47m 47m $ kubectl get secret my-secure-secret NAME TYPE DATA AGE my-secure-secret Opaque 7 47m
Deploy MongoDB Cluster
Now that the secrets are in place, it is time to deploy the Operator and the DB cluster:
kubectl apply -f blog-data/sealed-secrets/bundle.yaml kubectl apply -f blog-data/sealed-secrets/cr.yaml
The cluster will be up in a minute or two and use secrets we deployed.
By the way, my cr.yaml deploys MongoDB cluster with two shards. Multiple shards support was added in version 1.7.0of the Operator – I encourage you to try it out. Learn more about it here: Percona Server for MongoDB Sharding.