Aug
24
2021
--

Resharding in MongoDB 5.0

Resharding in MongoDB 5.0

Resharding in MongoDB 5.0MongoDB 5.0 has been released with a bunch of new features. An important one is the capability to completely redefine the shard key of a collection in a sharded cluster. Resharding was a feature frequently requested by the customers. In MongoDB 4.4 only the refining of the shard was available; from now on you can change your shard key using also different fields.

When you use sharding in MongoDB and you fail to create the shard key of a collection, you can face issues like:

  • Unbalanced distribution of the data
  • Having jumbo chunks that cannot be split and cannot be migrated

These issues can simply make your cluster very slow, making some of the shards extremely overloaded. Also, the cluster can be really unmanageable and crashes could happen at some point.

The Problem of Changing a Shard Key

I had a customer running a 5-shard cluster with more than 50TB of data. They had a few collections with sub-optimal shard keys. They had one of the shards having more than 75% of the data and workload, and they had thousands of jumbo chunks around. Some of the jumbo chunks reached 200GB in size; really a lot. The cluster was terribly slow, chunk migration attempts were a lot and they failed very frequently, and the application was most of the time unresponsive due to timeouts. Manual splits and migrations of jumbo chunks were impossible. The only solution was to create a different shard key for those collections, but this wasn’t possible with that MongoDB version. The only suggested way to change a shard key was to drop and recreate the collection. Yes, remember also to dump and reload the data. This requires you to stop your application for an impressive amount of time if the dataset is large. That was the case.

In order to avoid stopping the application, we deployed a new empty cluster with a few more shards as well. We created all the empty collections in the new cluster with the optimal shard key. Then we moved the data out of the old cluster with custom scripts, one piece at the time, using boundaries on indexed timestamp fields. The solution required the customer to deploy some application changes. The application was instructed to read/write from a different cluster depending if the affected data was already migrated into the new cluster or not. The entire migration process took weeks to complete. In the end, it worked and the customer now has a balanced cluster with no jumbo chunks.

Anyway, the solution came at some cost in terms of new machines, development effort, and time.

Fortunately, MongoDB 5.0 introduced the reshardCollection command. From now on changing a shard key should be less expensive than in the past. That’s awesome.

In this article, we’ll take a look at how resharding in MongoDB 5.0 works.

Internals

The way resharding works is simple. When you issue the reshardCollection command, a new implicit empty collection is created by MongoDB with the new shard key defined, and the data will be moved from the existing collection chunk by chunk. There is a two-second lock required by the balancer to determine the new data distribution, but during the copy phase, the application can continue to read and write the collection with no issues.

Due to this copy phase, the larger the collection is, the more time the resharding takes.

Prerequisites for Resharding

There are some requirements you need to validate before starting the resharding, and some could be expensive, in certain cases.

  • Disk space: be sure to have at least 1.2x the size of the collection you’re going to reshard. If the collection is 1TB, you need at least 1.2TB free space in your disk.
  • I/O capacity should be below 50%
  • CPU load should be below 80%

The resources available must be evaluated on each shard.

If you fail to provide these resources the database will run out of space or the resharding could take longer than expected.

Another important requirement is that you will need to deploy changes to your application queries. To let the database work best during the sharding process, the queries must use filters on both the current shard key and the new shard key. Only at the end of the resharding process you may drop all the filters regarding the old shard key out of your queries.

This requirement is very important and in some cases it could be a little expensive. Temporarily changing the application code is sometimes a hard task.

There also other limitations you need to consider:

  • Resharing is not permitted if an index built is running
  • Some queries will return error if you didn’t include both the current and new shard keys: deleteOne(), findAnd Modify(), updateOne() … check the manual for the full list
  • You can reshard only one collection at a time
  • You cannot use addShard(), removeShard(), dropDatabase(), db.createCollection() while resharding is running
  • The new shard key cannot have a uniqueness constraint
  • Resharding a collection with a uniqueness constraint is not supported

Let’s Test Resharding in MongoDB

To test the new feature, I created a sharded cluster using AWS instances, t2-large with 2 CPUs, 8 GB RAM, and EBS volume. I deployed a two-shard cluster using just a single member for each replica set.

Let’s create a sharded collection and insert some sample data.

At the beginning we create the shard key on the age field. To create the random data, we use some functions and a javascript loop to insert one million documents.

[direct: mongos] testdb> sh.shardCollection("testdb.testcoll", { age: 1} )
{
  collectionsharded: 'testdb.testcoll',
  ok: 1,
  '$clusterTime': {
    clusterTime: Timestamp({ t: 1628588848, i: 25 }),
    signature: {
      hash: Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
      keyId: Long("0")
    }
  },
  operationTime: Timestamp({ t: 1628588848, i: 21 })
}

[direct: mongos] testdb> function getRandomInt(min, max) {
...     return Math.floor(Math.random() * (max - min + 1)) + min;
... }
[Function: getRandomInt]

[direct: mongos] testdb> var day = 1000 * 60 * 60 * 24;

[direct: mongos] testdb> function getRandomDate() {
... return new Date(Date.now() - (Math.floor(Math.random() * day)))
... }
[Function: getRandomDate]

[direct: mongos] testdb> function getRandomString() {
... return (Math.random()+1).toString(36).substring(2)
... }
[Function: getRandomString]

[direct: mongos] testdb> for (var i=1; i<=1000000; i++) {
... db.testcoll.insert(
..... {
....... uid: i,
....... name: getRandomString(),
....... date_created: getRandomDate(),
....... age: getRandomInt(1,100),
....... address: getRandomString(),
....... city: getRandomString(),
....... country: getRandomString()
....... }
..... )
... }

Now we have our initial collection. Let’s take a look at the distribution of the chunks and the sharding status of the cluster.

{
  database: {
    _id: 'testdb',
    primary: 'sh1-rs',
    partitioned: true,
    version: {
    uuid: UUID("efda5b69-1ffc-42c3-abed-ad20e08e321d"),
    timestamp: Timestamp({ t: 1628527701, i: 21 }),
    lastMod: 1
  }
},
collections: {
  'testdb.testcoll': {
    shardKey: { age: 1 },
    unique: false,
    balancing: true,
    chunkMetadata: [
      { shard: 'sh0-rs', nChunks: 2 },
      { shard: 'sh1-rs', nChunks: 3 }
    ],
    chunks: [
      { min: { age: MinKey() }, max: { age: 1 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 0 }) },
      { min: { age: 1 }, max: { age: 42 }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 4, i: 0 }) },
      { min: { age: 42 }, max: { age: 72 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 2 }) },
      { min: { age: 72 }, max: { age: 100 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 3 }) },
      { min: { age: 100 }, max: { age: MaxKey() }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 5, i: 1 }) }
    ],
    tags: []
    }
  }
}

With the current shard key on the age field we’ve got 5 chunks in total, 2 on sh0-rs and 3 on sh1-rs. The total size of the collection is around 200MB (uncompressed).

It’s time to test the resharding. Let’s try to change the shard key to { name: 1 }. The following is the right syntax where we provide the namespace to reshard and the new shard key:

[direct: mongos] testdb> db.adminCommand({
... reshardCollection: "testdb.testcoll",
... key: { name: 1 }
... })

From another connection we can monitor the status of the resharding:

[direct: mongos] testdb> db.getSiblingDB("admin").aggregate([ 
{ $currentOp: { allUsers: true, localOps: false } }, 
{ $match: { type: "op", "originatingCommand.reshardCollection": "testdb.testcoll" } }])
[
  {
    shard: 'sh0-rs',
    type: 'op',
    desc: 'ReshardingRecipientService f86967fe-36a9-4836-8972-1061a61230df',
    op: 'command',
    ns: 'testdb.testcoll',
    originatingCommand: {
      reshardCollection: 'testdb.testcoll',
      key: { name: 1 },
      unique: false,
      collation: { locale: 'simple' }
    },
    totalOperationTimeElapsedSecs: Long("145"),
    remainingOperationTimeEstimatedSecs: Long("173"),
    approxDocumentsToCopy: Long("624135"),
    documentsCopied: Long("569868"),
    approxBytesToCopy: Long("95279851"),
    bytesCopied: Long("86996201"),
    totalCopyTimeElapsedSecs: Long("145"),
    oplogEntriesFetched: Long("0"),
    oplogEntriesApplied: Long("0"),
    totalApplyTimeElapsedSecs: Long("0"),
    recipientState: 'cloning',
    opStatus: 'running'
  },
  {
    shard: 'sh0-rs',
    type: 'op',
    desc: 'ReshardingDonorService f86967fe-36a9-4836-8972-1061a61230df',
    op: 'command',
    ns: 'testdb.testcoll',
    originatingCommand: {
      reshardCollection: 'testdb.testcoll',
      key: { name: 1 },
      unique: false,
      collation: { locale: 'simple' }
    },
    totalOperationTimeElapsedSecs: Long("145"),
    remainingOperationTimeEstimatedSecs: Long("173"),
    countWritesDuringCriticalSection: Long("0"),
    totalCriticalSectionTimeElapsedSecs: Long("0"),
    donorState: 'donating-initial-data',
    opStatus: 'running'
  }, 
  {
    shard: 'sh1-rs',
    type: 'op',
    desc: 'ReshardingDonorService f86967fe-36a9-4836-8972-1061a61230df',
    op: 'command',
    ns: 'testdb.testcoll',
    originatingCommand: {
      reshardCollection: 'testdb.testcoll',
      key: { name: 1 },
      unique: false,
      collation: { locale: 'simple' }
    },
    totalOperationTimeElapsedSecs: Long("145"),
    remainingOperationTimeEstimatedSecs: Long("276"),
    countWritesDuringCriticalSection: Long("0"),
    totalCriticalSectionTimeElapsedSecs: Long("0"),
    donorState: 'donating-initial-data', 
    opStatus: 'running'
  },
  {
    shard: 'sh1-rs',
    type: 'op',
    desc: 'ReshardingRecipientService f86967fe-36a9-4836-8972-1061a61230df',
    op: 'command',
    ns: 'testdb.testcoll',
    originatingCommand: {
      reshardCollection: 'testdb.testcoll',
      key: { name: 1 },
      unique: false,
      collation: { locale: 'simple' }
    },
    totalOperationTimeElapsedSecs: Long("145"),
    remainingOperationTimeEstimatedSecs: Long("276"),
    approxDocumentsToCopy: Long("624135"),
    documentsCopied: Long("430132"),
    approxBytesToCopy: Long("95279851"),
    bytesCopied: Long("65663031"),
    totalCopyTimeElapsedSecs: Long("145"),
    oplogEntriesFetched: Long("0"),
    oplogEntriesApplied: Long("0"),
    totalApplyTimeElapsedSecs: Long("0"),
    recipientState: 'cloning',
    opStatus: 'running'
  }
]

With the totalOperationTimeElapsedSecs you can see the time elapsed and with the remainingOperationTimeEstimatedSecs the estimated time for completing the operation.

While resharding is running we are allowed to write into the collection. For example, I tried the following and it worked:

[direct: mongos] testdb> db.testcoll.insert({ name: "new doc" })
{
  acknowledged: true,
  insertedIds: { '0': ObjectId("61125fa53916a70b7fdd8f6c") }
}

The full resharding of the collection took around 5 minutes. Let’s now see the new status of the sharding.

{
  database: {
    _id: 'testdb',
    primary: 'sh1-rs',
    partitioned: true,
    version: {
      uuid: UUID("efda5b69-1ffc-42c3-abed-ad20e08e321d"),
      timestamp: Timestamp({ t: 1628527701, i: 21 }),
      lastMod: 1
    }
  },
  collections: {
    'testdb.testcoll': {
    shardKey: { name: 1 },
    unique: false,
    balancing: true,
    chunkMetadata: [
      { shard: 'sh0-rs', nChunks: 3 },
      { shard: 'sh1-rs', nChunks: 3 }
    ],
    chunks: [
      { min: { name: MinKey() }, max: { name: '7gbk73hf42g' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 6, i: 0 }) },
      { min: { name: '7gbk73hf42g' }, max: { name: 'cv1oqoetetf' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 1 }) },
      { min: { name: 'cv1oqoetetf' }, max: { name: 'kuk1p3z8kc' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 2, i: 2 }) },
      { min: { name: 'kuk1p3z8kc' }, max: { name: 'ppq4ubqwywh' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 2, i: 3 }) },
      { min: { name: 'ppq4ubqwywh' }, max: { name: 'zsods6qhqp' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 1, i: 3 }) },
      { min: { name: 'zsods6qhqp' }, max: { name: MaxKey() }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 6, i: 1 }) }
    ],
    tags: []
    }
  }
}

We can see the shard key has been changed, and the chunks are now evenly distributed.

[direct: mongos] testdb> db.testcoll.getShardDistribution()
Shard sh1-rs at sh1-rs/172.30.0.108:27017
{
  data: '92.72MiB',
  docs: 636938,
  chunks: 3,
  'estimated data per chunk': '30.9MiB',
  'estimated docs per chunk': 212312
}
---
Shard sh0-rs at sh0-rs/172.30.0.221:27017
{
  data: '82.96MiB',
  docs: 569869,
  chunks: 3,
  'estimated data per chunk': '27.65MiB',
  'estimated docs per chunk': 189956
}
---
Totals
{
  data: '175.69MiB',
  docs: 1206807,
  chunks: 6,
  'Shard sh1-rs': [
    '52.77 % data',
    '52.77 % docs in cluster',
    '152B avg obj size on shard'
  ],
  'Shard sh0-rs': [
    '47.22 % data',
    '47.22 % docs in cluster',  
    '152B avg obj size on shard'
  ]
}

Good. It worked.

Out of curiosity, during the resharding, the temporary collection being copied is visible. Just run sh.status():

    collections: {
      'testdb.system.resharding.defda80d-ea5f-4e52-9a7f-9b3e2ed8ddf1': {
        shardKey: { name: 1 },
        unique: false,
        balancing: true,
        chunkMetadata: [
          { shard: 'sh0-rs', nChunks: 3 },
          { shard: 'sh1-rs', nChunks: 3 }
        ],
        chunks: [
          { min: { name: MinKey() }, max: { name: '9bnfw8n23l' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 2, i: 0 }) },
          { min: { name: '9bnfw8n23l' }, max: { name: 'etd3ho9vfwg' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 2, i: 1 }) },
          { min: { name: 'etd3ho9vfwg' }, max: { name: 'kmht0br01l' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 1, i: 2 }) },
          { min: { name: 'kmht0br01l' }, max: { name: 'sljcb1s70g' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 1, i: 3 }) },
          { min: { name: 'sljcb1s70g' }, max: { name: 'zzi3ijuw2f' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 1, i: 4 }) },
          { min: { name: 'zzi3ijuw2f' }, max: { name: MaxKey() }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 1, i: 5 }) }
        ],
        tags: []
      },
      'testdb.testcoll': {
        shardKey: { age: 1 },
        unique: false,
        balancing: true,
        chunkMetadata: [
          { shard: 'sh0-rs', nChunks: 2 },
          { shard: 'sh1-rs', nChunks: 3 }
        ],
        chunks: [
          { min: { age: MinKey() }, max: { age: 1 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 0 }) },
          { min: { age: 1 }, max: { age: 42 }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 4, i: 0 }) },
          { min: { age: 42 }, max: { age: 72 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 2 }) },
          { min: { age: 72 }, max: { age: 100 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 3 }) },
          { min: { age: 100 }, max: { age: MaxKey() }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 5, i: 1 }) }
        ],
        tags: []
      }
    }
  }

testdb.system.resharding.defda80d-ea5f-4e52-9a7f-9b3e2ed8ddf1 is the newly created collection.

Some Graphs from Percona Monitoring and Management

I deployed Percona Monitoring and Management (PMM) to monitor the cluster and for evaluating the impact of the resharding operation. As said, I used a little test environment with no workload at all, so what we can see on the following graphs is just the impact of the resharding.

The cluster has one config server and two shards. Each replica set is just a single member running on a dedicated virtual host in AWS EC2.

Let’s see the CPU usage

percona monitoring and management mongodb

The node in the middle is the config server. The data-bearing nodes had more impact in terms of CPU usage, quite relevant. The config server didn’t have any impact.

Let’s see the Disk utilization in terms of IOPS and latency:

Disk utilization

The same as the CPU, the config server didn’t have any impact. Instead, the shards increased the IOPS as expected since a new collection has been created. The disk latency had spikes to more than 20 ms at the beginning and at the end of the process.

MongoDB increased the latency of the read operations.

The WiredTiger storage engine increased the number of transactions processed and the cache activity as expected.

Other metrics increased but they are not included here. This behavior was expected and it’s normal for a resharding operation.

The resharding is quite expensive, even for a small collection, in particular for the CPU utilization. Anyway, more tests are needed with larger collections, in the GB/TB magnitude.

Remember the resharding time depends on the size of the collection. Be aware of that and expect the performance decrease for your servers for some time when you plan to do a resharding.

Conclusions

Resharding in MongoDB 5.0 is an amazing new feature but it comes with some additional costs and limitations. Indeed, resharding requires resources in terms of disk space, I/O, and CPU. Be aware of that and plan carefully your resharding operations.

As a best practice, I suggest spending as much time as possible the first time you decide to shard a collection. If you choose the optimal shard key from the beginning, then you won’t need resharding. A good schema design is always the best approach when working with MongoDB.

I suggest continuously monitoring the status of the chunk distribution in your collections. As soon as you notice there’s something wrong with the shard key and the distribution is uneven and it’s getting worse, then use reshardCollection as soon as you can, while the collection is not too large.

Another suggestion is to do resharding only during a maintenance window because the performance impact for your cluster could be significant in the case of very large collections.

Further readings:

https://docs.mongodb.com/manual/core/sharding-reshard-a-collection/

https://docs.mongodb.com/v5.0/core/sharding-choose-a-shard-key/

https://docs.mongodb.com/manual/reference/command/refineCollectionShardKey/

https://www.percona.com/blog/mongodb-5-0-is-coming-in-hot-what-do-database-experts-across-the-community-think/

Percona Distribution for MongoDB is a freely available MongoDB database alternative, giving you a single solution that combines the best and most important enterprise components from the open source community, designed and tested to work together.

Download Percona Distribution for MongoDB Today!

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com