Sep
10
2021
--

Finding Undetected Jumbo Chunks in MongoDB

Jumbo Chunks in MongoDB

Jumbo Chunks in MongoDBI recently came across an interesting case of performance issues during balancing in a MongoDB cluster. Digging through the logs, it became clear the problem was related to chunk moves taking a long time.

As we know, the default maximum chunk size is 64 MB. So these migrations are supposed to be very fast in most of the hardware in use nowadays.

In this case, there were several chunks way above that limit being moved around. How can this happen? Shouldn’t those chunks have been marked as Jumbo?

Recap on Chunk Moves

For starters, let’s review what we know about chunk moves.

MongoDB does not move a chunk if the number of documents in it is greater than 1.3 times the result of dividing the configured chunk size (default 64 MB) by the average document size.

If we have a collection with an average document size of 2 KB, a chunk with more than 42597 (65535 / 2 * 1.3) documents won’t be balanced. This might be an issue for collections with non-uniform document sizes, as the estimation won’t be very good in that case.

Also, if a chunk grows past the maximum size, it is flagged as Jumbo. This means the balancer will not try to move it. You can check out the following article for more information about dealing with jumbo chunks.

Finally, we must keep in mind that chunk moves take a metadata lock at a certain part of the process, so write operations are blocked momentarily near the end of a chunk move.

The autoSplitter in Action

Before MongoDB 4.2, the autoSplitter process was responsible for ensuring chunks do not exceed the maximum size runs on the mongos router. The router process keeps some statistics about the operations it performs. These statistics are consulted to make chunk-splitting decisions.

The problem is that in a production environment, there are usually many of these processes deployed. An individual mongos router only has a partial idea of what’s happening with a particular chunk of data.

If multiple mongos routers change data on the same chunk, it could grow beyond the maximum size and still go unnoticed.

There are many reported issues related to this topic (for example, see SERVER-44088, SERVER-13806, and SERVER-16715).

Frequently restarting mongos processes could also lead to the same problem. This causes the statistics used to make chunk-splitting decisions to be reset.

Because of these issues, the autoSplitter process was changed to run on the primary member of each shard’s replica set. This happened in MongoDB version 4.2 (see SERVER-9287 for more details).

The process should better prevent jumbo chunks now, as each shard has a more accurate view of what data it contains (and how it is being changed).

The MongoDB version, in this case, was 3.6, and the autoSplitter was not doing as good a job as expected.

Finding Undetected Jumbo Chunks in MongoDB

For starters, I came across a script to print a summary of the chunks statistics here. We can build upon it to also display information about Jumbo chunks and generate the commands required to manually split any chunks exceeding the maximum size.

var allChunkInfo = function(ns){
    var chunks = db.getSiblingDB("config").chunks.find({"ns" : ns}).sort({min:1}).noCursorTimeout(); //this will return all chunks for the ns ordered by min
    var totalChunks = 0;
    var totalJumbo = 0;
    var totalSize = 0;
    var totalEmpty = 0;
 
    chunks.forEach( 
        function printChunkInfo(chunk) { 
          var db1 = db.getSiblingDB(chunk.ns.split(".")[0]) // get the database we will be running the command against later
          var key = db.getSiblingDB("config").collections.findOne({_id:chunk.ns}).key; // will need this for the dataSize call
         
          // dataSize returns the info we need on the data, but using the estimate option to use counts is less intensive
          var dataSizeResult = db1.runCommand({datasize:chunk.ns, keyPattern:key, min:chunk.min, max:chunk.max, estimate:false});
         
          if(dataSizeResult.size > 67108864) {
            totalJumbo++;
            print('sh.splitFind("' + chunk.ns.toString() + '", ' + JSON.stringify(chunk.min) + ')' + ' // '+  chunk.shard + '    ' + Math.round(dataSizeResult.size/1024/1024) + ' MB    ' + dataSizeResult.numObjects );
          }
          totalSize += dataSizeResult.size;
          totalChunks++;
          
          if (dataSizeResult.size == 0) { totalEmpty++ }; //count empty chunks for summary
          }
    )
    print("***********Summary Chunk Information***********");
    print("Total Chunks: "+totalChunks);
    print("Total Jumbo Chunks: "+totalJumbo);
    print("Average Chunk Size (Mbytes): "+(totalSize/totalChunks/1024/1024));
    print("Empty Chunks: "+totalEmpty);
    print("Average Chunk Size (non-empty): "+(totalSize/(totalChunks-totalEmpty)/1024/1024));
}

The script has to be called from a mongos router as follows:

mongos> allChunkInfo("db.test_col")

And it will print any commands to perform chunk splits as required:

sh.splitFind("db.test_col", {"_id":"jhxT2neuI5fB4o4KBIASK1"}) // shard-1    222 MB    7970
sh.splitFind("db.test_col", {"_id":"zrAESqSZjnpnMI23oh5JZD"}) // shard-2    226 MB    7988
sh.splitFind("db.test_col", {"_id":"SgkCkfSDrY789e9nD4crk9"}) // shard-1    218 MB    7986
sh.splitFind("db.test_col", {"_id":"X5MKEH4j32OhmAhY7LGPMm"}) // shard-1    238 MB    8338
...
***********Summary Chunk Information***********
Total Chunks: 5047
Total Jumbo Chunks: 120
Average Chunk Size (Mbytes): 19.29779934868946
Empty Chunks: 1107
Average Chunk Size (non-empty): 24.719795257064895

You can see the script prints the commands to split the jumbo chunks, along with the shard where each one lives, the size of the chunk, and the number of documents on it. At the end, summary information is also displayed.

The estimate: true option in the dataSize command causes it to use the average document size of the collection for the estimation. This is not very accurate if the document size has many variances but runs faster and is less resource-intensive. Consider setting that option if you know your document size is more or less uniform.

All that is left is to run the commands generated by the script, and the jumbo chunks should be gone. The balancer will then move chunks during the next balancing window as required to even out the data. Consider scheduling the balancing window outside the peak workload hours to reduce the impact.

Conclusion

Even though MongoDB 3.6 has been EOL for some time now, many people are still running it, mostly due to outdated drivers on the application side that are not compatible with newer releases. Nowadays, the autoSplitter should be doing a better job of keeping chunk size at bay.

If you are still running a MongoDB version older than 4.2, it would be a good idea to review your chunk stats from time to time to avoid some nasty surprises.

Percona Distribution for MongoDB is a freely available MongoDB database alternative, giving you a single solution that combines the best and most important enterprise components from the open source community, designed and tested to work together.

Download Percona Distribution for MongoDB Today!

Aug
24
2021
--

Resharding in MongoDB 5.0

Resharding in MongoDB 5.0

Resharding in MongoDB 5.0MongoDB 5.0 has been released with a bunch of new features. An important one is the capability to completely redefine the shard key of a collection in a sharded cluster. Resharding was a feature frequently requested by the customers. In MongoDB 4.4 only the refining of the shard was available; from now on you can change your shard key using also different fields.

When you use sharding in MongoDB and you fail to create the shard key of a collection, you can face issues like:

  • Unbalanced distribution of the data
  • Having jumbo chunks that cannot be split and cannot be migrated

These issues can simply make your cluster very slow, making some of the shards extremely overloaded. Also, the cluster can be really unmanageable and crashes could happen at some point.

The Problem of Changing a Shard Key

I had a customer running a 5-shard cluster with more than 50TB of data. They had a few collections with sub-optimal shard keys. They had one of the shards having more than 75% of the data and workload, and they had thousands of jumbo chunks around. Some of the jumbo chunks reached 200GB in size; really a lot. The cluster was terribly slow, chunk migration attempts were a lot and they failed very frequently, and the application was most of the time unresponsive due to timeouts. Manual splits and migrations of jumbo chunks were impossible. The only solution was to create a different shard key for those collections, but this wasn’t possible with that MongoDB version. The only suggested way to change a shard key was to drop and recreate the collection. Yes, remember also to dump and reload the data. This requires you to stop your application for an impressive amount of time if the dataset is large. That was the case.

In order to avoid stopping the application, we deployed a new empty cluster with a few more shards as well. We created all the empty collections in the new cluster with the optimal shard key. Then we moved the data out of the old cluster with custom scripts, one piece at the time, using boundaries on indexed timestamp fields. The solution required the customer to deploy some application changes. The application was instructed to read/write from a different cluster depending if the affected data was already migrated into the new cluster or not. The entire migration process took weeks to complete. In the end, it worked and the customer now has a balanced cluster with no jumbo chunks.

Anyway, the solution came at some cost in terms of new machines, development effort, and time.

Fortunately, MongoDB 5.0 introduced the reshardCollection command. From now on changing a shard key should be less expensive than in the past. That’s awesome.

In this article, we’ll take a look at how resharding in MongoDB 5.0 works.

Internals

The way resharding works is simple. When you issue the reshardCollection command, a new implicit empty collection is created by MongoDB with the new shard key defined, and the data will be moved from the existing collection chunk by chunk. There is a two-second lock required by the balancer to determine the new data distribution, but during the copy phase, the application can continue to read and write the collection with no issues.

Due to this copy phase, the larger the collection is, the more time the resharding takes.

Prerequisites for Resharding

There are some requirements you need to validate before starting the resharding, and some could be expensive, in certain cases.

  • Disk space: be sure to have at least 1.2x the size of the collection you’re going to reshard. If the collection is 1TB, you need at least 1.2TB free space in your disk.
  • I/O capacity should be below 50%
  • CPU load should be below 80%

The resources available must be evaluated on each shard.

If you fail to provide these resources the database will run out of space or the resharding could take longer than expected.

Another important requirement is that you will need to deploy changes to your application queries. To let the database work best during the sharding process, the queries must use filters on both the current shard key and the new shard key. Only at the end of the resharding process you may drop all the filters regarding the old shard key out of your queries.

This requirement is very important and in some cases it could be a little expensive. Temporarily changing the application code is sometimes a hard task.

There also other limitations you need to consider:

  • Resharing is not permitted if an index built is running
  • Some queries will return error if you didn’t include both the current and new shard keys: deleteOne(), findAnd Modify(), updateOne() … check the manual for the full list
  • You can reshard only one collection at a time
  • You cannot use addShard(), removeShard(), dropDatabase(), db.createCollection() while resharding is running
  • The new shard key cannot have a uniqueness constraint
  • Resharding a collection with a uniqueness constraint is not supported

Let’s Test Resharding in MongoDB

To test the new feature, I created a sharded cluster using AWS instances, t2-large with 2 CPUs, 8 GB RAM, and EBS volume. I deployed a two-shard cluster using just a single member for each replica set.

Let’s create a sharded collection and insert some sample data.

At the beginning we create the shard key on the age field. To create the random data, we use some functions and a javascript loop to insert one million documents.

[direct: mongos] testdb> sh.shardCollection("testdb.testcoll", { age: 1} )
{
  collectionsharded: 'testdb.testcoll',
  ok: 1,
  '$clusterTime': {
    clusterTime: Timestamp({ t: 1628588848, i: 25 }),
    signature: {
      hash: Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
      keyId: Long("0")
    }
  },
  operationTime: Timestamp({ t: 1628588848, i: 21 })
}

[direct: mongos] testdb> function getRandomInt(min, max) {
...     return Math.floor(Math.random() * (max - min + 1)) + min;
... }
[Function: getRandomInt]

[direct: mongos] testdb> var day = 1000 * 60 * 60 * 24;

[direct: mongos] testdb> function getRandomDate() {
... return new Date(Date.now() - (Math.floor(Math.random() * day)))
... }
[Function: getRandomDate]

[direct: mongos] testdb> function getRandomString() {
... return (Math.random()+1).toString(36).substring(2)
... }
[Function: getRandomString]

[direct: mongos] testdb> for (var i=1; i<=1000000; i++) {
... db.testcoll.insert(
..... {
....... uid: i,
....... name: getRandomString(),
....... date_created: getRandomDate(),
....... age: getRandomInt(1,100),
....... address: getRandomString(),
....... city: getRandomString(),
....... country: getRandomString()
....... }
..... )
... }

Now we have our initial collection. Let’s take a look at the distribution of the chunks and the sharding status of the cluster.

{
  database: {
    _id: 'testdb',
    primary: 'sh1-rs',
    partitioned: true,
    version: {
    uuid: UUID("efda5b69-1ffc-42c3-abed-ad20e08e321d"),
    timestamp: Timestamp({ t: 1628527701, i: 21 }),
    lastMod: 1
  }
},
collections: {
  'testdb.testcoll': {
    shardKey: { age: 1 },
    unique: false,
    balancing: true,
    chunkMetadata: [
      { shard: 'sh0-rs', nChunks: 2 },
      { shard: 'sh1-rs', nChunks: 3 }
    ],
    chunks: [
      { min: { age: MinKey() }, max: { age: 1 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 0 }) },
      { min: { age: 1 }, max: { age: 42 }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 4, i: 0 }) },
      { min: { age: 42 }, max: { age: 72 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 2 }) },
      { min: { age: 72 }, max: { age: 100 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 3 }) },
      { min: { age: 100 }, max: { age: MaxKey() }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 5, i: 1 }) }
    ],
    tags: []
    }
  }
}

With the current shard key on the age field we’ve got 5 chunks in total, 2 on sh0-rs and 3 on sh1-rs. The total size of the collection is around 200MB (uncompressed).

It’s time to test the resharding. Let’s try to change the shard key to { name: 1 }. The following is the right syntax where we provide the namespace to reshard and the new shard key:

[direct: mongos] testdb> db.adminCommand({
... reshardCollection: "testdb.testcoll",
... key: { name: 1 }
... })

From another connection we can monitor the status of the resharding:

[direct: mongos] testdb> db.getSiblingDB("admin").aggregate([ 
{ $currentOp: { allUsers: true, localOps: false } }, 
{ $match: { type: "op", "originatingCommand.reshardCollection": "testdb.testcoll" } }])
[
  {
    shard: 'sh0-rs',
    type: 'op',
    desc: 'ReshardingRecipientService f86967fe-36a9-4836-8972-1061a61230df',
    op: 'command',
    ns: 'testdb.testcoll',
    originatingCommand: {
      reshardCollection: 'testdb.testcoll',
      key: { name: 1 },
      unique: false,
      collation: { locale: 'simple' }
    },
    totalOperationTimeElapsedSecs: Long("145"),
    remainingOperationTimeEstimatedSecs: Long("173"),
    approxDocumentsToCopy: Long("624135"),
    documentsCopied: Long("569868"),
    approxBytesToCopy: Long("95279851"),
    bytesCopied: Long("86996201"),
    totalCopyTimeElapsedSecs: Long("145"),
    oplogEntriesFetched: Long("0"),
    oplogEntriesApplied: Long("0"),
    totalApplyTimeElapsedSecs: Long("0"),
    recipientState: 'cloning',
    opStatus: 'running'
  },
  {
    shard: 'sh0-rs',
    type: 'op',
    desc: 'ReshardingDonorService f86967fe-36a9-4836-8972-1061a61230df',
    op: 'command',
    ns: 'testdb.testcoll',
    originatingCommand: {
      reshardCollection: 'testdb.testcoll',
      key: { name: 1 },
      unique: false,
      collation: { locale: 'simple' }
    },
    totalOperationTimeElapsedSecs: Long("145"),
    remainingOperationTimeEstimatedSecs: Long("173"),
    countWritesDuringCriticalSection: Long("0"),
    totalCriticalSectionTimeElapsedSecs: Long("0"),
    donorState: 'donating-initial-data',
    opStatus: 'running'
  }, 
  {
    shard: 'sh1-rs',
    type: 'op',
    desc: 'ReshardingDonorService f86967fe-36a9-4836-8972-1061a61230df',
    op: 'command',
    ns: 'testdb.testcoll',
    originatingCommand: {
      reshardCollection: 'testdb.testcoll',
      key: { name: 1 },
      unique: false,
      collation: { locale: 'simple' }
    },
    totalOperationTimeElapsedSecs: Long("145"),
    remainingOperationTimeEstimatedSecs: Long("276"),
    countWritesDuringCriticalSection: Long("0"),
    totalCriticalSectionTimeElapsedSecs: Long("0"),
    donorState: 'donating-initial-data', 
    opStatus: 'running'
  },
  {
    shard: 'sh1-rs',
    type: 'op',
    desc: 'ReshardingRecipientService f86967fe-36a9-4836-8972-1061a61230df',
    op: 'command',
    ns: 'testdb.testcoll',
    originatingCommand: {
      reshardCollection: 'testdb.testcoll',
      key: { name: 1 },
      unique: false,
      collation: { locale: 'simple' }
    },
    totalOperationTimeElapsedSecs: Long("145"),
    remainingOperationTimeEstimatedSecs: Long("276"),
    approxDocumentsToCopy: Long("624135"),
    documentsCopied: Long("430132"),
    approxBytesToCopy: Long("95279851"),
    bytesCopied: Long("65663031"),
    totalCopyTimeElapsedSecs: Long("145"),
    oplogEntriesFetched: Long("0"),
    oplogEntriesApplied: Long("0"),
    totalApplyTimeElapsedSecs: Long("0"),
    recipientState: 'cloning',
    opStatus: 'running'
  }
]

With the totalOperationTimeElapsedSecs you can see the time elapsed and with the remainingOperationTimeEstimatedSecs the estimated time for completing the operation.

While resharding is running we are allowed to write into the collection. For example, I tried the following and it worked:

[direct: mongos] testdb> db.testcoll.insert({ name: "new doc" })
{
  acknowledged: true,
  insertedIds: { '0': ObjectId("61125fa53916a70b7fdd8f6c") }
}

The full resharding of the collection took around 5 minutes. Let’s now see the new status of the sharding.

{
  database: {
    _id: 'testdb',
    primary: 'sh1-rs',
    partitioned: true,
    version: {
      uuid: UUID("efda5b69-1ffc-42c3-abed-ad20e08e321d"),
      timestamp: Timestamp({ t: 1628527701, i: 21 }),
      lastMod: 1
    }
  },
  collections: {
    'testdb.testcoll': {
    shardKey: { name: 1 },
    unique: false,
    balancing: true,
    chunkMetadata: [
      { shard: 'sh0-rs', nChunks: 3 },
      { shard: 'sh1-rs', nChunks: 3 }
    ],
    chunks: [
      { min: { name: MinKey() }, max: { name: '7gbk73hf42g' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 6, i: 0 }) },
      { min: { name: '7gbk73hf42g' }, max: { name: 'cv1oqoetetf' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 1 }) },
      { min: { name: 'cv1oqoetetf' }, max: { name: 'kuk1p3z8kc' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 2, i: 2 }) },
      { min: { name: 'kuk1p3z8kc' }, max: { name: 'ppq4ubqwywh' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 2, i: 3 }) },
      { min: { name: 'ppq4ubqwywh' }, max: { name: 'zsods6qhqp' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 1, i: 3 }) },
      { min: { name: 'zsods6qhqp' }, max: { name: MaxKey() }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 6, i: 1 }) }
    ],
    tags: []
    }
  }
}

We can see the shard key has been changed, and the chunks are now evenly distributed.

[direct: mongos] testdb> db.testcoll.getShardDistribution()
Shard sh1-rs at sh1-rs/172.30.0.108:27017
{
  data: '92.72MiB',
  docs: 636938,
  chunks: 3,
  'estimated data per chunk': '30.9MiB',
  'estimated docs per chunk': 212312
}
---
Shard sh0-rs at sh0-rs/172.30.0.221:27017
{
  data: '82.96MiB',
  docs: 569869,
  chunks: 3,
  'estimated data per chunk': '27.65MiB',
  'estimated docs per chunk': 189956
}
---
Totals
{
  data: '175.69MiB',
  docs: 1206807,
  chunks: 6,
  'Shard sh1-rs': [
    '52.77 % data',
    '52.77 % docs in cluster',
    '152B avg obj size on shard'
  ],
  'Shard sh0-rs': [
    '47.22 % data',
    '47.22 % docs in cluster',  
    '152B avg obj size on shard'
  ]
}

Good. It worked.

Out of curiosity, during the resharding, the temporary collection being copied is visible. Just run sh.status():

    collections: {
      'testdb.system.resharding.defda80d-ea5f-4e52-9a7f-9b3e2ed8ddf1': {
        shardKey: { name: 1 },
        unique: false,
        balancing: true,
        chunkMetadata: [
          { shard: 'sh0-rs', nChunks: 3 },
          { shard: 'sh1-rs', nChunks: 3 }
        ],
        chunks: [
          { min: { name: MinKey() }, max: { name: '9bnfw8n23l' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 2, i: 0 }) },
          { min: { name: '9bnfw8n23l' }, max: { name: 'etd3ho9vfwg' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 2, i: 1 }) },
          { min: { name: 'etd3ho9vfwg' }, max: { name: 'kmht0br01l' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 1, i: 2 }) },
          { min: { name: 'kmht0br01l' }, max: { name: 'sljcb1s70g' }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 1, i: 3 }) },
          { min: { name: 'sljcb1s70g' }, max: { name: 'zzi3ijuw2f' }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 1, i: 4 }) },
          { min: { name: 'zzi3ijuw2f' }, max: { name: MaxKey() }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 1, i: 5 }) }
        ],
        tags: []
      },
      'testdb.testcoll': {
        shardKey: { age: 1 },
        unique: false,
        balancing: true,
        chunkMetadata: [
          { shard: 'sh0-rs', nChunks: 2 },
          { shard: 'sh1-rs', nChunks: 3 }
        ],
        chunks: [
          { min: { age: MinKey() }, max: { age: 1 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 0 }) },
          { min: { age: 1 }, max: { age: 42 }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 4, i: 0 }) },
          { min: { age: 42 }, max: { age: 72 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 2 }) },
          { min: { age: 72 }, max: { age: 100 }, 'on shard': 'sh1-rs', 'last modified': Timestamp({ t: 5, i: 3 }) },
          { min: { age: 100 }, max: { age: MaxKey() }, 'on shard': 'sh0-rs', 'last modified': Timestamp({ t: 5, i: 1 }) }
        ],
        tags: []
      }
    }
  }

testdb.system.resharding.defda80d-ea5f-4e52-9a7f-9b3e2ed8ddf1 is the newly created collection.

Some Graphs from Percona Monitoring and Management

I deployed Percona Monitoring and Management (PMM) to monitor the cluster and for evaluating the impact of the resharding operation. As said, I used a little test environment with no workload at all, so what we can see on the following graphs is just the impact of the resharding.

The cluster has one config server and two shards. Each replica set is just a single member running on a dedicated virtual host in AWS EC2.

Let’s see the CPU usage

percona monitoring and management mongodb

The node in the middle is the config server. The data-bearing nodes had more impact in terms of CPU usage, quite relevant. The config server didn’t have any impact.

Let’s see the Disk utilization in terms of IOPS and latency:

Disk utilization

The same as the CPU, the config server didn’t have any impact. Instead, the shards increased the IOPS as expected since a new collection has been created. The disk latency had spikes to more than 20 ms at the beginning and at the end of the process.

MongoDB increased the latency of the read operations.

The WiredTiger storage engine increased the number of transactions processed and the cache activity as expected.

Other metrics increased but they are not included here. This behavior was expected and it’s normal for a resharding operation.

The resharding is quite expensive, even for a small collection, in particular for the CPU utilization. Anyway, more tests are needed with larger collections, in the GB/TB magnitude.

Remember the resharding time depends on the size of the collection. Be aware of that and expect the performance decrease for your servers for some time when you plan to do a resharding.

Conclusions

Resharding in MongoDB 5.0 is an amazing new feature but it comes with some additional costs and limitations. Indeed, resharding requires resources in terms of disk space, I/O, and CPU. Be aware of that and plan carefully your resharding operations.

As a best practice, I suggest spending as much time as possible the first time you decide to shard a collection. If you choose the optimal shard key from the beginning, then you won’t need resharding. A good schema design is always the best approach when working with MongoDB.

I suggest continuously monitoring the status of the chunk distribution in your collections. As soon as you notice there’s something wrong with the shard key and the distribution is uneven and it’s getting worse, then use reshardCollection as soon as you can, while the collection is not too large.

Another suggestion is to do resharding only during a maintenance window because the performance impact for your cluster could be significant in the case of very large collections.

Further readings:

https://docs.mongodb.com/manual/core/sharding-reshard-a-collection/

https://docs.mongodb.com/v5.0/core/sharding-choose-a-shard-key/

https://docs.mongodb.com/manual/reference/command/refineCollectionShardKey/

https://www.percona.com/blog/mongodb-5-0-is-coming-in-hot-what-do-database-experts-across-the-community-think/

Percona Distribution for MongoDB is a freely available MongoDB database alternative, giving you a single solution that combines the best and most important enterprise components from the open source community, designed and tested to work together.

Download Percona Distribution for MongoDB Today!

Jul
06
2021
--

Sharding With Zones Based on Compound Shard-Keys on MongoDB 4.4

Compound Shard-Keys on MongoDB 4.4

Compound Shard-Keys on MongoDB 4.4This article was written with the main purpose of showing you how to determine zones on a shard when using compound shard keys.

Defining Zones in Shards means pre-defining where certain chunks will be stored and amongst which set of particular shards they will be balanced according to the shard key definition.

MongoDB 4.4 brings the possibility to shard a collection and determine zones by compound keys, including mixing a hash key with non-hashed keys.  Hashed keys may be placed in the prefix of the index (shard key) or not. Depending on the method chosen, different settings must be in compliance with the balancer and this article will also show you a couple of examples.

Defining the Index Prior to Sharding

Even though it is not always required to create indexes in advance of running the “shardCollection” command, I am defining it to better illustrate the procedure

  • The collections colltest1 will be sharded based on a key containing the hashed key in the prefix
mongos> db.getSiblingDB("dbtest").colltest1.createIndex({_id:"hashed","location":1},{unique:false})
{
	"raw" : {
		"shard02/localhost:40003" : {
			"createdCollectionAutomatically" : true,
			"numIndexesBefore" : 1,
			"numIndexesAfter" : 2,
			"commitQuorum" : "votingMembers",
			"ok" : 1
		}
	},
	"ok" : 1,
	"operationTime" : Timestamp(1624357456, 7),
	"$clusterTime" : { ... }
}

  • The collections colltest2 will be sharded based on a key containing the non-hashed key in the prefix
mongos> db.getSiblingDB("dbtest").colltest2.createIndex({"location":1,_id:"hashed"},{unique:false})
{
	"raw" : {
		"shard02/localhost:40003" : {
			"createdCollectionAutomatically" : true,
			"numIndexesBefore" : 1,
			"numIndexesAfter" : 2,
			"commitQuorum" : "votingMembers",
			"ok" : 1
		}
	},
	"ok" : 1,
	"operationTime" : Timestamp(1624357534, 2),
	"$clusterTime" : { ... }
}

It is important to highlight  a couple of important points about creating the indexes for the shard keys:

  • Once the collection is empty or if it is a new one, the command used to get a collection sharded will automatically create the indexes if they are not present yet. This will be described in the upcoming sections.
  • If the shard key prefix will not be a hashed value, the indexes shall be created with the option {unique:false}

Creating the Initial Chunks Based on Hashed Key as a Prefix

There are, basically, a couple of requirements to ensure that the collection will be in compliance with the balancer and the initial chunk distribution will be optimally performed. 

  • For each non-hashed value, a range of the hashed key must be defined with upper and lower boundaries. 
  • The collection must be sharded with the option presplitHashedZones: true if there is a hashed field.

The below example shows how to shard the collection colltest1.

Defining the Zones and Assigning to the Shards (To make things brief, only one zone “LATAM” is used in this example.)

mongos> sh.addShardToZone("shard01", "LATAM")
{
	"ok" : 1,
	"operationTime" : Timestamp(1624359227, 38),
	"$clusterTime" : { ... }
}
mongos> sh.addShardToZone("shard02", "LATAM")
{
	"ok" : 1,
	"operationTime" : Timestamp(1624359231, 1),
	"$clusterTime" : {... }
}

If you check the output of the sh.status(), you will notice that tags were created and assigned to the shards according to the zones definition:

mongos> sh.status()
--- Sharding Status --- 
  sharding version: {
  	"_id" : 1,
  	"minCompatibleVersion" : 5,
  	"currentVersion" : 6,
  	"clusterId" : ObjectId("60d1be52be9f36000b01a9f0")
  }
  shards:
        {  "_id" : "shard01",  "host" : "shard01/localhost:40002",  "state" : 1,  "tags" : [ "LATAM" ] }
        {  "_id" : "shard02",  "host" : "shard02/localhost:40003",  "state" : 1,  "tags" : [ "LATAM" ] }        
<<the rest of status output was intentionally truncated to avoid unnecessary verbosity>>

Creating the Zone Ranges

mongos> sh.updateZoneKeyRange("dbtest.colltest1",{ "_id" : MinKey, "location" : MinKey },{ _id:MaxKey,"location" : MaxKey },"LATAM");
{
	"ok" : 1,
	"operationTime" : Timestamp(1624361452, 1),
	"$clusterTime" : { ... }
}

Enabling Shard

mongos> sh.enableSharding("dbtest")
{
	"ok" : 1,
	"operationTime" : Timestamp(1624361047, 6),
	"$clusterTime" : { ... }
}
mongos> sh.shardCollection("dbtest.colltest1",{_id:"hashed","location":1},false,{ presplitHashedZones: true })
{
	"collectionsharded" : "dbtest.colltest1",
	"collectionUUID" : UUID("30b2efff-2e2e-4c72-9a82-9593f227002f"),
	"ok" : 1,
	"operationTime" : Timestamp(1624361528, 31),
	"$clusterTime" : { ... }
}

In this example, the namespace dbtest.colltest1 will be evenly distributed according to the zone LATAM which will reach the shards shard01 and shard02. Looking at the sh.status() again, you will see that the initial chunks were created following the range defined above.

Creating the Initial Chunks Based on Non-Hashed Key as a Prefix

This example section will be a little bit more complicated to make the shard key compliant with the balancer for the initial chunk distribution.

  • You need to specify MinKey for each field in the shard key, which defines the lower boundary of each zone range.
  • Every zone must comprise a range, so at least one of the fields in the shard key must have an upper-boundary value larger than its MinKey
  • Every combination of values for the fields of the shard key will fall within the range of one of the defined zones.  
  • At the moment of sharding the collection, presplitHashedZones must be set to true.
  • Not required, but likely desirable: One of the zones should define its upper boundary as MaxKey so it acts as a catchall for out-of-range values.

Defining the Zones and Assigning Them to the Shards

The zones were defined differently for this example:

mongos> sh.status()
--- Sharding Status --- 
  sharding version: {
  	"_id" : 1,
  	"minCompatibleVersion" : 5,
  	"currentVersion" : 6,
  	"clusterId" : ObjectId("60d1be52be9f36000b01a9f0")
  }
  shards:
        {  "_id" : "shard01",  "host" : "shard01/localhost:40002",  "state" : 1,  "tags" : [ "EU" ] }
        {  "_id" : "shard02",  "host" : "shard02/localhost:40003",  "state" : 1,  "tags" : [ "LATAM" ] }
        {  "_id" : "shard03",  "host" : "shard03/localhost:40004",  "state" : 1,  "tags" : [ "AMER" ] }
        {  "_id" : "shard04",  "host" : "shard04/localhost:40005",  "state" : 1,  "tags" : [ "APAC" ] }

Creating the Zone Ranges

mongos> sh.updateZoneKeyRange("dbtest2.colltest2",{ "location": "DC01", "_id" : MinKey },{ "location": "DC02", "_id" : MinKey },"LATAM");
{
	"ok" : 1,
	"operationTime" : Timestamp(1624364375, 1),
	"$clusterTime" : { ... }
}
mongos> sh.updateZoneKeyRange("dbtest2.colltest2",{ "location": "DC02", "_id" : MinKey },{ "location": MaxKey, "_id" : MinKey },"EU");
{
	"ok" : 1,
	"operationTime" : Timestamp(1624364430, 1),
	"$clusterTime" : { ... }
}

Enabling Shard

mongos> sh.enableSharding("dbtest2")
{
	"ok" : 1,
	"operationTime" : Timestamp(1624366057, 5),
	"$clusterTime" : { ... }
}
mongos> sh.shardCollection("dbtest2.colltest2",{"location":1,_id:"hashed"},false,{ presplitHashedZones: true })
{
	"collectionsharded" : "dbtest2.colltest2",
	"collectionUUID" : UUID("b8a64972-97df-4791-8019-9526d2f8d405"),
	"ok" : 1,
	"operationTime" : Timestamp(1624366085, 37),
	"$clusterTime" : { ... }
}
mongos>

The above example basically describes how to use a non-hashed field on the prefix of a shard key to ensure that certain values of that field will reach certain zones (determined as tags on shards) and the boundaries applied on the hashed field will ensure even distribution. In that case, all the docs of the namespace dbtest2.colltest2 with the minimum _id of location DC01 and the minimum _id of the location DC02, will be placed on the zone LATAM (shard02). And the next docs from the minimum _id of the location DC02 until the rest will be placed on the zone EU (shard 01)

It is very important to highlight that if the collection is not in compliance with the balancer, the migration of the chunks will never happen, though all the chunks will stay on the Primary Shard. It is possible to predict that situation right after enabling the sharding on the collection by looking at the output of the command sh.balancerCollectionStatus

mongos> sh.balancerCollectionStatus("dbtest.colltest1")
{
	"balancerCompliant" : true,
	"ok" : 1,
	"operationTime" : Timestamp(1623691376, 1),
	"$clusterTime" : { ... }
}

If the balancerCompliant is true, means that the balancer will be able to split and migrate the chunks.

Conclusion

Defining the shard key is the most important step of deploying a healthy sharded cluster. Having a field on the shard key which contains a few distinct values would compromise the shard distribution, and as consequence, the performance. Hence, this is a great improvement coming along in MongoDB 4.4 which makes it possible to have keys with low cardinality defining the boundaries, yet still ensuring that the shard will rely on a hashed distribution based on a very selective key.

May
27
2021
--

Webinar June 29: Unlocking the Mystery of MongoDB Shard Key Selection

Choose the Right MongoDB Shard Key

Choose the Right MongoDB Shard KeyDo You Know How to Choose the Right MongoDB Shard Key for Your Business?

In our upcoming panel, Percona MongoDB experts Mike Grayson, Kimberly Wilkins, and Vinicius Grippa will discuss the complex issue of MongoDB shard key selection and offer advice on the measures to take if things go wrong.

Selecting the right shard key is one of the most important MongoDB design decisions you make, as it impacts performance and data management. Choosing the wrong shard key can have a disastrous effect on both.

MongoDB 5.0 is due to be released this summer, and it is likely to include another big change around sharding. Following last year’s 4.4 release that included refinable shard keys, we expect to see a new feature that allows for fully changeable shard keys for the first time.

But, even with refinable and changeable options, shard key selection will continue to be a crucial MongoDB task.

Join us for the panel, where Mike, Kimberly, and Vinicius will highlight some of the perils and pitfalls to avoid, as well as offering shard key best practices such as:

* Factors to consider when selecting your shard key

* Hidden “gotcha’s” around shard selection and architecture

* Examples of the worst shard keys our experts have seen

* What happens when you have a busted shard key, and how you can mitigate the impact

* What’s happening “under MongoDB’s hood” with all the changes?

* The future of shard keys for MongoDB

Please join Percona Technical Experts Mike Grayson, Kimberly Wilkins, and Vinicius Grippa on June 29, 2021, at 1 pm EDT for their webinar Unlocking the Mystery of MongoDB Shard Key Selection.

Register for Webinar

If you can’t attend, sign up anyway, and we’ll send you the slides and recording afterward.

Mar
22
2021
--

Storing Kubernetes Operator for Percona Server for MongoDB Secrets in Github

storing kubernetes MongoDB secrets github

storing kubernetes MongoDB secrets githubMore and more companies are adopting GitOps as the way of implementing Continuous Deployment. Its elegant approach built upon a well-known tool wins the hearts of engineers. But even if your git repository is private, it’s strongly discouraged to store keys and passwords in unencrypted form.

This blog post will show how easy it is to use GitOps and keep Kubernetes secrets for Percona Kubernetes Operator for Percona Server for MongoDB securely in the repository with Sealed Secrets or Vault Secrets Operator.

Sealed Secrets

Prerequisites:

  • Kubernetes cluster up and running
  • Github repository (optional)

Install Sealed Secrets Controller

Sealed Secrets rely on asymmetric cryptography (which is also used in TLS), where the private key (which in our case is stored in Kubernetes) can decrypt the message encrypted with the public key (which can be stored in public git repository safely). To make this task easier, Sealed Secrets provides the kubeseal tool, which helps with the encryption of the secrets.

Install kubeseal operator into your Kubernetes cluster:

kubectl apply -f https://github.com/bitnami-labs/sealed-secrets/releases/download/v0.15.0/controller.yaml

It will install the controller into the kube-system namespace and provide the Custom Resource Definition

sealedsecrets.bitnami.com

. All resources in Kubernetes with

kind: SealedSecrets

will be handled by this Operator.

Download the kubeseal binary:

wget https://github.com/bitnami-labs/sealed-secrets/releases/download/v0.15.0/kubeseal-linux-amd64 -O kubeseal
sudo install -m 755 kubeseal /usr/local/bin/kubeseal

Encrypt the Keys

In this example, I intend to store important secrets of the Percona Kubernetes Operator for Percona Server for MongoDB in git along with my manifests that are used to deploy the database.

First, I will seal the secret file with system users, which is used by the MongoDB Operator to manage the database. Normally it is stored in deploy/secrets.yaml.

kubeseal --format yaml < secrets.yaml  > blog-data/sealed-secrets/mongod-secrets.yaml

This command creates the file with encrypted contents, you can see it in the blog-data/sealed-secrets repository here. It is safe to store it publicly as it can only be decrypted with a private key.

Executing

kubectl apply -f blog-data/sealed-secrets/mongod-secrets.yaml

does the following:

  1. A sealedsecrets custom resource (CR) is created. You can see it by executing
    kubectl get sealedsecrets

    .

  2. The Sealed Secrets Operator receives the event that a new sealedsecrets CR is there and decrypts it with the private key.
  3. Once decrypted, a regular Secrets object is created which can be used as usual.

$ kubectl get sealedsecrets
NAME               AGE
my-secure-secret   20m

$ kubectl get secrets my-secure-secret
NAME               TYPE     DATA   AGE
my-secure-secret   Opaque   10     20m

Next, I will also seal the keys for my S3 bucket that I plan to use to store backups of my MongoDB database:

kubeseal --format yaml < backup-s3.yaml  > blog-data/sealed-secrets/s3-secrets.yaml
kubectl apply -f blog-data/sealed-secrets/s3-secrets.yaml

Vault Secrets Operator

Sealed Secrets is the simplest approach, but it is possible to achieve the same result with HashiCorp Vault and Vault Secrets Operator. It is a more advanced, mature, and feature-rich approach.

Prerequisites:

Vault Secrets Operator also relies on Custom Resource, but all the keys are stored in HashiCorp Vault:

Preparation

Create a policy on the Vault for the Operator:

cat <<EOF | vault policy write vault-secrets-operator -
path "kvv2/data/*" {
  capabilities = ["read"]
}
EOF

The policy might look a bit differently, depending on where your secrets are.

Create and fetch the token for the policy:

$ vault token create -period=24h -policy=vault-secrets-operator

Key                  Value                                                                                                                                                                                        
---                  -----                                                                                               
token                s.0yJZfCsjFq75GiVyKiZgYVOm
...

Write down the token, as you will need it in the next step.

Create the Kubernetes Secret so that the Operator can authenticate with the Vault:

export VAULT_TOKEN=s.0yJZfCsjFq75GiVyKiZgYVOm
export VAULT_TOKEN_LEASE_DURATION=86400

cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
metadata:
  name: vault-secrets-operator
type: Opaque
data:
  VAULT_TOKEN: $(echo -n "$VAULT_TOKEN" | base64)
  VAULT_TOKEN_LEASE_DURATION: $(echo -n "$VAULT_TOKEN_LEASE_DURATION" | base64)
EOF

Deploy Vault Secrets Operator

It is recommended to deploy the Operator with Helm, but before we need to create the values.yaml file to configure the operator.

environmentVars:
  - name: VAULT_TOKEN
    valueFrom:
      secretKeyRef:
        name: vault-secrets-operator
        key: VAULT_TOKEN
  - name: VAULT_TOKEN_LEASE_DURATION
    valueFrom:
      secretKeyRef:
        name: vault-secrets-operator
        key: VAULT_TOKEN_LEASE_DURATION
vault:
  address: "http://vault.vault.svc:8200"

Environment variables are pointing to the Secret that was created in the previous chapter to authenticate with Vault. We also need to provide the Vault address for the Operator to retrieve the secrets.

Now we can deploy the Vault Secrets Operator:

helm repo add ricoberger https://ricoberger.github.io/helm-charts
helm repo update

helm upgrade --install vault-secrets-operator ricoberger/vault-secrets-operator -f blog-data/sealed-secrets/values.yaml

Give me the Secret

I have a key created in my HashiCorp Vault:

$ vault kv get kvv2/mongod-secret
…
Key                                 Value
---                                 -----                                                                                                                                                                         
MONGODB_BACKUP_PASSWORD             <>
MONGODB_CLUSTER_ADMIN_PASSWORD      <>
MONGODB_CLUSTER_ADMIN_USER          <>
MONGODB_CLUSTER_MONITOR_PASSWORD    <>
MONGODB_CLUSTER_MONITOR_USER        <>                                                                                                                                                               
MONGODB_USER_ADMIN_PASSWORD         <>
MONGODB_USER_ADMIN_USER             <>

It is time to create the secret out of it. First, we will create the Custom Resource object of

kind: VaultSecret

.

$ cat blog-data/sealed-secrets/vs.yaml
apiVersion: ricoberger.de/v1alpha1
kind: VaultSecret
metadata:
  name: my-secure-secret
spec:
  path: kvv2/mongod-secret
  type: Opaque

$ kubectl apply -f blog-data/sealed-secrets/vs.yaml

The Operator will connect to HashiCorp Vault and create regular Secret object automatically:

$ kubectl get vaultsecret
NAME               SUCCEEDED   REASON    MESSAGE              LAST TRANSITION   AGE
my-secure-secret   True        Created   Secret was created   47m               47m

$ kubectl get secret  my-secure-secret
NAME               TYPE     DATA   AGE
my-secure-secret   Opaque   7      47m

Deploy MongoDB Cluster

Now that the secrets are in place, it is time to deploy the Operator and the DB cluster:

kubectl apply -f blog-data/sealed-secrets/bundle.yaml
kubectl apply -f blog-data/sealed-secrets/cr.yaml

The cluster will be up in a minute or two and use secrets we deployed.

By the way, my cr.yaml deploys MongoDB cluster with two shards. Multiple shards support was added in version 1.7.0of the Operator – I encourage you to try it out. Learn more about it here: Percona Server for MongoDB Sharding.

Mar
10
2021
--

A Peek at Percona Kubernetes Operator for Percona Server for MongoDB New Features

Percona Kubernetes Operator for Percona Server for MongoDB New Features

Percona Kubernetes Operator for Percona Server for MongoDB New FeaturesThe latest 1.7.0 release of Percona Kubernetes Operator for Percona Server for MongoDB came out just recently and enables users to:

Today we will look into these new features, the use cases, and highlight some architectural and technical decisions we made when implementing them.

Sharding

The 1.6.0 release of our Operator introduced single shard support, which we highlighted in this blog post and explained why it makes sense. But horizontal scaling is not possible without support for multiple shards.

Adding a Shard

A new shard is just a new ReplicaSet which can be added under spec.replsets in cr.yaml:

spec:
  ...
  replsets:
  - name: rs0
    size: 3
  ....
  - name: rs1
    size: 3
  ...

Read more on how to configure sharding.

In the Kubernetes world, a MongoDB ReplicaSet is a StatefulSet with a number of pods specified in

spec.replsets.[].size

variable.

Once pods are up and running, the Operator does the following:

  • Initiates ReplicaSet by connecting to newly created pods running mongod
  • Connects to mongos and adds a shard with sh.addShard() command

    adding a shard mongodb operator

Then the output of db.adminCommand({ listShards:1 }) will look like this:

        "shards" : [
                {
                        "_id" : "replicaset-1",
                        "host" : "replicaset-1/percona-cluster-replicaset-1-0.percona-cluster-replicaset-1.default.svc.cluster.local:27017,percona-cluster-replicaset-1-1.percona-cluster-replicaset-1.default.svc.cluster.local:27017,percona-cluster-replicaset-1-2.percona-cluster-replicaset-1.default.svc.cluster.local:27017",
                        "state" : 1
                },
                {
                        "_id" : "replicaset-2",
                        "host" : "replicaset-2/percona-cluster-replicaset-2-0.percona-cluster-replicaset-2.default.svc.cluster.local:27017,percona-cluster-replicaset-2-1.percona-cluster-replicaset-2.default.svc.cluster.local:27017,percona-cluster-replicaset-2-2.percona-cluster-replicaset-2.default.svc.cluster.local:27017",
                        "state" : 1
                }
        ],

Have open source expertise to share? Submit your talk for Percona Live ONLINE!

Deleting a Shard

Percona Operators are built to simplify the deployment and management of the databases on Kubernetes. Our goal is to provide resilient infrastructure, but the operator does not manage the data itself. Deleting a shard requires moving the data to another shard before removal, but there are a couple of caveats:

  • Sometimes data is not moved automatically by MongoDB – unsharded collections or jumbo chunks
  • We hit the storage problem – what if another shard does not have enough disk space to hold the data?

shard does not have enough disk space to hold the data

There are a few choices:

  1. Do not touch the data. The user needs to move the data manually and then the operator removes the empty shard.
  2. The operator decides where to move the data and deals with storage issues by upscaling if necessary.
    • Upscaling the storage can be tricky, as it requires certain capabilities from the Container Storage Interface (CNI) and the underlying storage infrastructure.

For now, we decided to pick option #1 and won’t touch the data, but in future releases, we would like to work with the community to introduce fully-automated shard removal.

When the user wants to remove the shard now, we first check if there are any non-system databases present on the ReplicaSet. If there are none, the shard can be removed:

func (r *ReconcilePerconaServerMongoDB) checkIfPossibleToRemove(cr *api.PerconaServerMongoDB, usersSecret *corev1.Secret, rsName string) error {
  systemDBs := map[string]struct{}{
    "local": {},
    "admin": {},
    "config":  {},
  }

delete a shard

Custom Sidecars

The sidecar container pattern allows users to extend the application without changing the main container image. They leverage the fact that all containers in the pod share storage and network resources.

Percona Operators have built-in support for Percona Monitoring and Management to gain monitoring insights for the databases on Kubernetes, but sometimes users may want to expose metrics to other monitoring systems.  Lets see how mongodb_exporter can expose metrics running as a sidecar along with ReplicaSet containers.

1. Create the monitoring user that the exporter will use to connect to MongoDB. Connect to mongod in the container and create the user:

> db.getSiblingDB("admin").createUser({
    user: "mongodb_exporter",
    pwd: "mysupErpassword!123",
    roles: [
      { role: "clusterMonitor", db: "admin" },
      { role: "read", db: "local" }
    ]
  })

2. Create the Kubernetes secret with these login and password. Encode both the username and password with base64:

$ echo -n mongodb_exporter | base64
bW9uZ29kYl9leHBvcnRlcg==
$ echo -n 'mysupErpassword!123' | base64
bXlzdXBFcnBhc3N3b3JkITEyMw==

Put these into the secret and apply:

$ cat mongoexp_secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: mongoexp-secret
data:
  username: bW9uZ29kYl9leHBvcnRlcg==
  password: bXlzdXBFcnBhc3N3b3JkITEyMw==

$ kubectl apply -f mongoexp_secret.yaml

3. Add a sidecar for mongodb_exporter into cr.yaml and apply:

replsets:
- name: rs0
  ...
  sidecars:
  - image: bitnami/mongodb-exporter:latest
    name: mongodb-exporter
    env:
    - name: EXPORTER_USER
      valueFrom:
        secretKeyRef:
          name: mongoexp-secret
          key: username
    - name: EXPORTER_PASS
      valueFrom:
        secretKeyRef:
          name: mongoexp-secret
          key: password
    - name: POD_IP
      valueFrom:
        fieldRef:
          fieldPath: status.podIP
    - name: MONGODB_URI
      value: "mongodb://$(EXPORTER_USER):$(EXPORTER_PASS)@$(POD_IP):27017"
    args: ["--web.listen-address=$(POD_IP):9216"

$ kubectl apply -f deploy/cr.yaml

All it takes now is to configure the monitoring system to fetch the metrics for each mongod Pod. For example, prometheus-operator will start fetching metrics once annotations are added to ReplicaSet pods:

replsets:
- name: rs0
  ...
  annotations:
    prometheus.io/scrape: 'true'
    prometheus.io/port: '9216'

PVCs Clean Up

Running CICD pipelines that deploy MongoDB clusters on Kubernetes is a common thing. Once these clusters are terminated, the Persistent Volume Claims (PVCs) are not. We have now added automation that removes PVCs after cluster deletion. We rely on Kubernetes Finalizers – asynchronous pre-delete hooks. In our case we hook the finalizer to the Custom Resource (CR) object which is created for the MongoDB cluster.

PVCs Clean Up

A user can enable the finalizer through cr.yaml in the metadata section:

metadata:
  name: my-cluster-name
  finalizers:
     - delete-psmdb-pvc

Conclusion

Percona is committed to providing production-grade database deployments on Kubernetes. Our Percona Kubernetes Operator for Percona Server for MongoDB is a feature-rich tool to deploy and manage your MongoDB clusters with ease. Our Operator is free and open source. Try it out by following the documentation here or help us to make it better by contributing your code and ideas to our Github repository.

May
24
2019
--

An Overview of Sharding in PostgreSQL and How it Relates to MongoDB’s

PostgreSQL LogoA couple of weeks ago I presented at Percona University São Paulo about the new features in PostgreSQL that allow the deployment of simple shards. I’ve tried to summarize the main points in this post, as well as providing an introductory overview of sharding itself. Please note I haven’t included any third-party extensions that provide sharding for PostgreSQL in my discussion below.

Partitioning in PostgreSQL

In a nutshell, until not long ago there wasn’t a dedicated, native feature in PostgreSQL for table partitioning. Not that that prevented people from doing it anyway: the PostgreSQL community is very creative. There’s a table inheritance feature in PostgreSQL that allows the creation of child tables with the same structure as a parent table. That, combined with the employment of proper constraints in each child table along with the right set of triggers in the parent table, has provided practical “table partitioning” in PostgreSQL for years (and still works). Here’s an example:

Using table inheritance

CREATE TABLE temperature (
  id BIGSERIAL PRIMARY KEY NOT NULL,
  city_id INT NOT NULL,
  timestamp TIMESTAMP NOT NULL,
  temp DECIMAL(5,2) NOT NULL
);

Figure 1a. Main (or parent) table

CREATE TABLE temperature_201901 (CHECK (timestamp >= DATE '2019-01-01' AND timestamp <= DATE '2019-01-31')) INHERITS (temperature);
CREATE TABLE temperature_201902 (CHECK (timestamp >= DATE '2019-02-01' AND timestamp <= DATE '2019-02-28')) INHERITS (temperature);
CREATE TABLE temperature_201903 (CHECK (timestamp >= DATE '2019-03-01' AND timestamp <= DATE '2019-03-31')) INHERITS (temperature);

Figure 1b. Child tables inherit the structure of the parent table and are limited by constraints

CREATE OR REPLACE FUNCTION temperature_insert_trigger()
RETURNS TRIGGER AS $$
BEGIN
    IF ( NEW.timestamp >= DATE '2019-01-01' AND NEW.timestamp <= DATE '2019-01-31' ) THEN INSERT INTO temperature_201901 VALUES (NEW.*);
    ELSIF ( NEW.timestamp >= DATE '2019-02-01' AND NEW.timestamp <= DATE '2019-02-28' ) THEN INSERT INTO temperature_201902 VALUES (NEW.*);
    ELSIF ( NEW.timestamp >= DATE '2019-03-01' AND NEW.timestamp <= DATE '2019-03-31' ) THEN INSERT INTO temperature_201903 VALUES (NEW.*);
    ELSE RAISE EXCEPTION 'Date out of range!';
    END IF;
    RETURN NULL;
END;
$$
LANGUAGE plpgsql;

Figure 1c. A function that controls in which child table a new entry should be added according to the timestamp field

CREATE TRIGGER insert_temperature_trigger
    BEFORE INSERT ON temperature
    FOR EACH ROW EXECUTE PROCEDURE temperature_insert_trigger();

Figure 1d. A trigger is added to the parent table that calls the function above when an INSERT is performed

The biggest drawbacks for such an implementation was related to the amount of manual work needed to maintain such an environment (even though a certain level of automation could be achieved through the use of 3rd party extensions such as pg_partman) and the lack of optimization/support for “distributed” queries. The PostgreSQL optimizer wasn’t advanced enough to have a good understanding of partitions at the time, though there were workarounds that could be used such as employing constraint exclusion.

Declarative partitioning

About 1.5 year ago, PostgreSQL 10 was released with a bunch of new features, among them native support for table partitioning through the new declarative partitioning feature. Here’s how we could partition the same temperature table using this new method:

CREATE TABLE temperature (
  id BIGSERIAL NOT NULL,
  city_id INT NOT NULL,
  timestamp TIMESTAMP NOT NULL,
  temp DECIMAL(5,2) NOT NULL
) PARTITION BY RANGE (timestamp);

Figure 2a. Main table structure for a partitioned table

CREATE TABLE temperature_201901 PARTITION OF temperature FOR VALUES FROM ('2019-01-01') TO ('2019-02-01');
CREATE TABLE temperature_201902 PARTITION OF temperature FOR VALUES FROM ('2019-02-01') TO ('2019-03-01');
CREATE TABLE temperature_201903 PARTITION OF temperature FOR VALUES FROM ('2019-03-01') TO ('2019-04-01');

Figure 2b. Tables defined as partitions of the main table; with declarative partitioning, there was no need for triggers anymore.

It still missed the greater optimization and flexibility needed to consider it a complete partitioning solution. It wasn’t possible, for example, to perform an UPDATE that would result in moving a row from one partition to a different one, but the foundation had been laid. Fast forward another year and PostgreSQL 11 builds on top of this, delivering additional features like:

  • the possibility to define a default partition, to which any entry that wouldn’t fit a corresponding partition would be added to.
  • having indexes added to the main table “replicated” to the underlying partitions, which improved declarative partitioning usability.
  • support for Foreign Keys

These are just a few of the features that led to a more mature partitioning solution.

Sharding in PostgreSQL

By now you might be reasonably questioning my premise, and that partitioning is not sharding, at least not in the sense and context you would have expected this post to cover. In fact, PostgreSQL has implemented sharding on top of partitioning by allowing any given partition of a partitioned table to be hosted by a remote server. The basis for this is in PostgreSQL’s Foreign Data Wrapper (FDW) support, which has been a part of the core of PostgreSQL for a long time. While technically possible to implement, we just couldn’t make practical use of it for sharding using the table inheritance + triggers approach. Declarative partitioning allowed for much better integration of these pieces making sharding – partitioned tables hosted by remote servers – more of a reality in PostgreSQL.

CREATE TABLE temperature_201904 (
  id BIGSERIAL NOT NULL,
  city_id INT NOT NULL,
  timestamp TIMESTAMP NOT NULL,
  temp DECIMAL(5,2) NOT NULL
);

Figure 3a. On the remote server we create a “partition” – nothing but a simple table

CREATE EXTENSION postgres_fdw;
GRANT USAGE ON FOREIGN DATA WRAPPER postgres_fdw to app_user;
CREATE SERVER shard02 FOREIGN DATA WRAPPER postgres_fdw
    OPTIONS (dbname 'postgres', host 'shard02', port
    '5432');
CREATE USER MAPPING for app_user SERVER shard02
    OPTIONS (user 'fdw_user', password 'secret');

Figure 3b. On the local server the preparatory steps involve loading the postgres_fdw extension, allowing our local application user to use that extension, creating an entry to access the remote server, and finally mapping that user with a user in the remote server (fdw_user) that has local access to the table we’ll use as a remote partition.

CREATE FOREIGN TABLE temperature_201904 PARTITION OF temperature
    FOR VALUES FROM ('2019-04-01') TO ('2019-05-01')
    SERVER remoteserver01;

Figure 3c. Now it’s simply a matter of creating a proper partition of our main table in the local server that will be linked to the table of the same name in the remote server

You can read more about postgres_fdw in Foreign Data Wrappers in PostgreSQL and a closer look at postgres_fdw.

When does it make sense to partition a table?

There are a several principle reasons to partition a table:

  1. When a table grows so big that searching it becomes impractical even with the help of indexes (which will invariably become too big as well).
  2. When data management is such that the target data is often the most recently added and/or older data is constantly being purged/archived, or even not being searched anymore (at least not as often).
  3. If you are loading data from different sources and maintaining it as a data warehousing for reporting and analytics.
  4. For a less expensive archiving or purging of massive data that avoids exclusive locks on the entire table.

When should we resort to sharding?

Here are a couple of classic cases:

  1. To scale out (horizontally), when even after partitioning a table the amount of data is too great or too complex to be processed by a single server.
  2. Use cases where the data in a big table can be divided into two or more segments that would benefit the majority of the search patterns. A common example used to describe a scenario like this is that of a company whose customers are evenly spread across the United States and searches to a target table involves the customer ZIP code. A shard then could be used to host entries of customers located on the East coast and another for customers on the West coast.

Note though this is by no means an extensive list.

How should we shard the data?

With sharding (in this context) being “distributed” partitioning, the essence for a successful (performant) sharded environment lies in choosing the right shard key – and by “right” I mean one that will distribute your data across the shards in a way that will benefit most of your queries. In the example above, using the customer ZIP code as shard key makes sense if an application will more often be issuing queries that will hit one shard (East) or the other (West). However, if most queries would filter by, say, birth date, then all queries would need to be run through all shards to recover the full result set. This could easily backfire on performance with the shard approach, by not selecting the right shard key or simply by having such a heterogeneous workload that no shard key would be able to satisfy it.

It only ever makes sense to shard if the nature of the queries involving the target table(s) is such that distributed processing will be the norm and constitute an advantage far greater than any overhead caused by a minority of queries that rely on JOINs involving multiple shards. Due to the distributed nature of sharding such queries will necessarily perform worse if compared to having them all hosted on the same server.

Why not simply rely on replication or clustering?

Sharding should be considered in those situations where you can’t efficiently break down a big table through data normalization or use an alternative approach and maintaining it on a single server is too demanding. The table is then partitioned and the partitions distributed across different servers to spread the load across many servers. It doesn’t need to be one partition per shard, often a single shard will host a number of partitions.

Note how sharding differs from traditional “share all” database replication and clustering environments: you may use, for instance, a dedicated PostgreSQL server to host a single partition from a single table and nothing else. However, these data scaling technologies may well complement each other: a PostgreSQL database may host a shard with part of a big table as well as replicate smaller tables that are often used for some sort of consultation (read-only), such as a price list, through logical replication.

How does sharding in PostgreSQL relates to sharding in MongoDB®?

MongoDB® tackles the matter of managing big collections straight through sharding: there is no concept of local partitioning of collections in MongoDB. In fact, the whole MongoDB scaling strategy is based on sharding, which takes a central place in the database architecture. As such, the sharding process has been made as transparent to the application as possible: all a DBA has to do is to define the shard key.

Instead of connecting to a reference database server the application will connect to an auxiliary router server named mongos which will process the queries and request the necessary information to the respective shard. It knows which shard contains what because they maintain a copy of the metadata that maps chunks of data to shards, which they get from a config server, another important and independent component of a MongoDB sharded cluster. Together, they also play a role in maintaining good data distribution across the shards, actively splitting and migrating chunks of data between servers as needed.

In PostgreSQL the application will connect and query the main database server. There isn’t an intermediary router such as the mongos but PostgreSQL’s query planner will process the query and create an execution plan. When data requested from a partitioned table is found on a remote server PostgreSQL will request the data from it, as shown in the EXPLAIN output below:

…
   Remote SQL: UPDATE public.emp SET sal = $2 WHERE ctid = $1
   ->  Nested Loop  (cost=100.00..300.71 rows=669 width=118)
     	Output: emp.empno, emp.ename, emp.job, emp.mgr, emp.hiredate, (emp.sal * '1.1'::double precision), emp.comm, emp.deptno, emp.ctid, salgrade.ctid
     	Join Filter: ((emp.sal > (salgrade.losal)::double precision) AND (emp.sal < (salgrade.hisal)::double precision)) ->  Foreign Scan on public.emp  (cost=100.00..128.06 rows=602 width=112)
           	Output: emp.empno, emp.ename, emp.job, emp.mgr, emp.hiredate, emp.sal, emp.comm, emp.deptno, emp.ctid
…

Figure 4: excerpt of an EXPLAIN plan that involves processing a query in a remote server

Note in the above query the mention “Remote SQL”. A lot of optimizations have been made in the execution of remote queries in PostgreSQL 10 and 11, which contributed to mature and improve the sharding solution. Among them is support for having grouping and aggregation operations executed on the remote server itself (“push down”) rather than recovering all rows and having them processed locally.

What is missing in PostgreSQL implementation?

There is, however, still room for improvement. In terms of remote execution, reports from the community indicate not all queries are performing as they should. For example, in some cases the PostgreSQL planner is not performing a full push-down, resulting in shards transferring more data than required. Parallel scheduling of queries that touch multiple shards is not yet implemented: for now, the execution is taking place sequentially, one shard at a time, which takes longer to complete. When it comes to the maintenance of partitioned and sharded environments, changes in the structure of partitions are still complicated and not very practical. For example, when you add a new partition to a partitioned table with an appointed default partition you may need to detach the default partition first if it contains rows that would now fit in the new partition, manually move those to the new partition, and finally re-attach the default partition back in place.

But that is all part of a maturing technology. We’re looking forward to PostgreSQL 12 and what it will bring in the partitioning and sharding fronts.


Image based on photos by Leonardo Quatrocchi from Pexels

 

Jul
18
2018
--

Webinar Wed 7/19: MongoDB Sharding

MongoDB shard zones

MongoDB shard zonesPlease join Percona’s Senior Support Engineer, Adamo Tonete as he presents MongoDB Sharding 101 on July 19th, 2018, at 12:30 PM PDT (UTC-7) / 3:30 PM EDT (UTC-4).

 

This tutorial is a continuation of advanced topics for the DBA. In it, we will share best practices and tips on how to perform the most common activities.

In this tutorial, we are going to cover MongoDB sharding.

Register Now

The post Webinar Wed 7/19: MongoDB Sharding appeared first on Percona Database Performance Blog.

Jul
05
2018
--

Configuring PMM Monitoring for MongoDB Cluster

In this blog, we will see how to configure Percona Monitoring and Management (PMM) monitoring for a MongoDB cluster. It’s very simple, like adding a replica set or standalone instances to PMM Monitoring. 

For this example, I have used docker to create PMM Server and MongoDB sharded cluster containers. If you want the steps I used to create the MongoDB® cluster environment using docker, I have shared them at the end of this blog. You can refer to this if you would like to create the same set up.

Configuring PMM Clients

For PMM installations, you can check these links for PMM installation and pmm-client setup. The following are the members of the MongoDB cluster:

mongos:
 mongos1  
config db: (replSet name - mongors1conf)  
 mongocfg1  
 mongocfg2  
 mongocfg3
Shard1: (replSet name - mongors1)  
 mongors1n1  
 mongors1n2
 mongors1n3
Shard1: (replSet name - mongors2)  
 mongors2n1
 mongors2n2  
 mongors2n3

In this setup, I installed the pmm-client on mongos1 server. Then I added an agent to monitor MongoDB metrics with cluster option as shown in the next code block. I named the cluster “mongoClusterPMM” (Note: you have to be root user or need sudo access to execute the

pmm-admin

 command):

root@90262f1360a0:/# pmm-admin add  mongodb --cluster mongoClusterPMM
[linux:metrics]   OK, now monitoring this system.
[mongodb:metrics] OK, now monitoring MongoDB metrics using URI localhost:27017
[mongodb:queries] OK, now monitoring MongoDB queries using URI localhost:27017
[mongodb:queries] It is required for correct operation that profiling of monitored MongoDB databases be enabled.
[mongodb:queries] Note that profiling is not enabled by default because it may reduce the performance of your MongoDB server.
[mongodb:queries] For more information read PMM documentation (https://www.percona.com/doc/percona-monitoring-and-management/conf-mongodb.html).
root@90262f1360a0:/# pmm-admin list
pmm-admin 1.11.0
PMM Server      | 172.17.0.2 (password-protected)
Client Name     | 90262f1360a0
Client Address  | 172.17.0.4 
Service Manager | unix-systemv
---------------- ------------- ----------- -------- ---------------- ------------------------
SERVICE TYPE     NAME          LOCAL PORT  RUNNING  DATA SOURCE      OPTIONS                 
---------------- ------------- ----------- -------- ---------------- ------------------------
mongodb:queries  90262f1360a0  -           YES      localhost:27017  query_examples=true     
linux:metrics    90262f1360a0  42000       YES      -                                        
mongodb:metrics  90262f1360a0  42003       YES      localhost:27017  cluster=mongoClusterPMM 
root@90262f1360a0:/#

As you can see, I used the 

pmm-admin add mongodb [options]

 command which enables monitoring for system, MongoDB metrics and queries. You need to enable profiler to monitor MongoDB queries. Use the next command to enable it at database level:

use db_name
db.setProfilingLevel(1)

Check this blog to know more about QAN setup and details. If you want to enable only MongoDB metrics, rather than queries to be monitored, then you can use the command 

pmm-admin add  mongodb:metrics [options]

 . After this, go to the PMM homepage (in my case localhost:8080) in your browser and select MongoDB Cluster Summary from the drop down list under the MongoDB option. The below screenshot shows the MongoDB Cluster—“mongoClusterPMM” statistics— collected by the agent that we added in mongos1 server. 

Percona Monitoring and Management for a MongoDB cluster

Did we miss something here? And do you see any metrics in the dashboard above except “Balancer Enabled” and “Chunks Balanced”?

No. This is because, PMM doesn’t have enough data to show in the dashboard. The shards are not added to the cluster yet and as you can see it displays 0 under shards. Let’s add two shard replica sets mongors1 and mongors2 in the mongos1 instance, and enable sharding to the database to complete the cluster setup as follows:

mongos> sh.addShard("mongors1/mongors1n1:27017,mongors1n2:27017,mongors1n3:27017")
{ "shardAdded" : "mongors1", "ok" : 1 }
mongos> sh.addShard("mongors2/mongors2n1:27017,mongors2n2:27017,mongors2n3:27017")
{ "shardAdded" : "mongors2", "ok" : 1 }

Now, I’ll add some data, collection and shard keys, and enable sharding so that we can see some statistics in the dashboard:

use vinodh
db.setProfilingLevel(1)
db.testColl.insertMany([{id1:1,name:"test insert”},{id1:2,name:"test insert"},{id1:3,name:"insert"},{id1:4,name:"insert"}])
db.testColl.ensureIndex({id1:1})
sh.enableSharding("vinodh")
sh.shardCollection("vinodh.testColl", {id1:1})

At last! Now you can see statistics in the graph for the MongoDB cluster:

PMM Statistics for a MongoDB Cluster 

Full Cluster Monitoring

We are not done yet. We have just added an agent to monitor mongos1 instance, and so only the cluster related statistics and QAN are collected through this node. For monitoring all nodes in the MongoDB cluster, we need to configure all nodes in PMM under “mongoClusterPMM” cluster. This will tell PMM that the configured nodes are part of the same cluster. We could also monitor the replica set related metrics for the members in config DBs and shards. Let’s add the monitoring agents in mongos1 server to monitor all MongoDB instances remotely. We’ll use these commands:

pmm-admin add mongodb:metrics --uri "mongodb://mongocfg1:27017/admin?replicaSet=mongors1conf" mongocfg1replSet --cluster  mongoClusterPMM
pmm-admin add mongodb:metrics --uri "mongodb://mongocfg2:27017/admin?replicaSet=mongors1conf" mongocfg2replSet --cluster  mongoClusterPMM
pmm-admin add mongodb:metrics --uri "mongodb://mongocfg3:27017/admin?replicaSet=mongors1conf" mongocfg3replSet --cluster  mongoClusterPMM
pmm-admin add mongodb:metrics --uri "mongodb://mongors1n1:27017/admin?replicaSet=mongors1" mongors1replSetn1 --cluster  mongoClusterPMM
pmm-admin add mongodb:metrics --uri "mongodb://mongors1n2:27017/admin?replicaSet=mongors1" mongors1replSetn2 --cluster  mongoClusterPMM
pmm-admin add mongodb:metrics --uri "mongodb://mongors1n3:27017/admin?replicaSet=mongors1" mongors1replSetn3 --cluster  mongoClusterPMM
pmm-admin add mongodb:metrics --uri "mongodb://mongors2n1:27017/admin?replicaSet=mongors2" mongors2replSetn1 --cluster  mongoClusterPMM
pmm-admin add mongodb:metrics --uri "mongodb://mongors2n2:27017/admin?replicaSet=mongors2" mongors2replSetn2 --cluster  mongoClusterPMM
pmm-admin add mongodb:metrics --uri "mongodb://mongors2n3:27017/admin?replicaSet=mongors2" mongors2replSetn3 --cluster  mongoClusterPMM

Once you have added them, you can check the agent’s (mongodb-exporter) status as follows:

root@90262f1360a0:/# pmm-admin list
pmm-admin 1.11.0
PMM Server      | 172.17.0.2 (password-protected)
Client Name     | 90262f1360a0
Client Address  | 172.17.0.4 
Service Manager | unix-systemv
---------------- ------------------ ----------- -------- ----------------------- ------------------------
SERVICE TYPE     NAME               LOCAL PORT  RUNNING  DATA SOURCE             OPTIONS                 
---------------- ------------------ ----------- -------- ----------------------- ------------------------
mongodb:queries  90262f1360a0       -           YES      localhost:27017         query_examples=true     
linux:metrics    90262f1360a0       42000       YES      -                                               
mongodb:metrics  90262f1360a0       42003       YES      localhost:27017         cluster=mongoClusterPMM 
mongodb:metrics  mongocfg1replSet   42004       YES      mongocfg1:27017/admin   cluster=mongoClusterPMM 
mongodb:metrics  mongocfg2replSet   42005       YES      mongocfg2:27017/admin   cluster=mongoClusterPMM 
mongodb:metrics  mongocfg3replSet   42006       YES      mongocfg3:27017/admin   cluster=mongoClusterPMM 
mongodb:metrics  mongors1replSetn1  42007       YES      mongors1n1:27017/admin  cluster=mongoClusterPMM 
mongodb:metrics  mongors1replSetn3  42008       YES      mongors1n3:27017/admin  cluster=mongoClusterPMM 
mongodb:metrics  mongors2replSetn1  42009       YES      mongors2n1:27017/admin  cluster=mongoClusterPMM 
mongodb:metrics  mongors2replSetn2  42010       YES      mongors2n2:27017/admin  cluster=mongoClusterPMM 
mongodb:metrics  mongors2replSetn3  42011       YES      mongors2n3:27017/admin  cluster=mongoClusterPMM 
mongodb:metrics  mongors1replSetn2  42012       YES      mongors1n2:27017/admin  cluster=mongoClusterPMM

So now you can monitor every member of the MongoDB cluster including their replica set and shard statistics. The next screenshot shows one of the members from the replica set under MongoDB replica set dashboard. You can select this from dashboard in this way: Cluster: mongoClusterPMM ? Replica Set: mongors1 ? Instance: mongors1replSetn2]:

 PMM MongoDB Replica Set Dashboard

MongoDB cluster docker setup

As I said in the beginning of this blog, I’ve provided the steps for the MongoDB cluster setup. Since this is for testing, I have used very simple configuration to setup the cluster environment using docker-compose. Before creating the MongoDB cluster, create the network in docker for the cluster nodes and PMM to connect each other like this:

Vinodhs-MBP:docker-mcluster vinodhkrish$ docker network create mongo-cluster-nw
7e3203aed630fed9b5f5f2b30e346301a58a068dc5f5bc0bfe38e2eef1d48787

I used the docker-compose.yaml file to create the docker environment here:

version: '2'
services:
  mongors1n1:
    container_name: mongors1n1
    image: mongo:3.4
    command: mongod --shardsvr --replSet mongors1 --dbpath /data/db --port 27017
    networks:
      - mongo-cluster
    ports:
      - 27047:27017
    expose:
      - "27017"
    environment:
      TERM: xterm
  mongors1n2:
    container_name: mongors1n2
    image: mongo:3.4
    command: mongod --shardsvr --replSet mongors1 --dbpath /data/db --port 27017
    networks:
      - mongo-cluster
    ports:
      - 27048:27017
    expose:
      - "27017"
    environment:
      TERM: xterm
  mongors1n3:
    container_name: mongors1n3
    image: mongo:3.4
    command: mongod --shardsvr --replSet mongors1 --dbpath /data/db --port 27017
    networks:
      - mongo-cluster
    ports:
      - 27049:27017
    expose:
      - "27017"
    environment:
      TERM: xterm
  mongors2n1:
    container_name: mongors2n1
    image: mongo:3.4
    command: mongod --shardsvr --replSet mongors2 --dbpath /data/db --port 27017
    networks:
      - mongo-cluster
    ports:
      - 27057:27017
    expose:
      - "27017"
    environment:
      TERM: xterm
  mongors2n2:
    container_name: mongors2n2
    image: mongo:3.4
    command: mongod --shardsvr --replSet mongors2 --dbpath /data/db --port 27017
    networks:
      - mongo-cluster
    ports:
      - 27058:27017
    expose:
      - "27017"
    environment:
      TERM: xterm
  mongors2n3:
    container_name: mongors2n3
    image: mongo:3.4
    command: mongod --shardsvr --replSet mongors2 --dbpath /data/db --port 27017
    networks:
      - mongo-cluster
    ports:
      - 27059:27017
    expose:
      - "27017"
    environment:
      TERM: xterm
  mongocfg1:
    container_name: mongocfg1
    image: mongo:3.4
    command: mongod --configsvr --replSet mongors1conf --dbpath /data/db --port 27017
    networks:
      - mongo-cluster
    ports:
      - 27025:27017
    environment:
      TERM: xterm
    expose:
      - "27017"
  mongocfg2:
    container_name: mongocfg2
    image: mongo:3.4
    command: mongod --configsvr --replSet mongors1conf --dbpath /data/db --port 27017
    networks:
      - mongo-cluster
    ports:
      - 27024:27017
    environment:
      TERM: xterm
    expose:
      - "27017"
  mongocfg3:
    container_name: mongocfg3
    image: mongo:3.4
    command: mongod --configsvr --replSet mongors1conf --dbpath /data/db --port 27017
    networks:
      - mongo-cluster
    ports:
      - 27023:27017
    environment:
      TERM: xterm
    expose:
      - "27017"
  mongos1:
    container_name: mongos1
    image: mongo:3.4
    depends_on:
    - mongocfg1
      - mongocfg2
    command: mongos --configdb mongors1conf/mongocfg1:27017,mongocfg2:27017,mongocfg3:27017 --port 27017
    networks:
      - mongo-cluster
    ports:
      - 27019:27017
    expose:
      - "27017"
networks:
  mongo-cluster:
    external:
      name: mongo-cluster-nw

Hint:
In the docker compose file, above, if you are using docker-compose version >=3.x, then you can use the option for “networks” in the yaml file like this:

networks:
  mongo-cluster:
    name: mongo-cluster-nw

Now start the containers and configure the cluster quickly as follows:

Vinodhs-MBP:docker-mcluster vinodhkrish$ docker-compose up -d
Creating mongocfg1 ... 
Creating mongos1 ... 
Creating mongors2n3 ... 
Creating mongocfg2 ... 
Creating mongors1n1 ... 
Creating mongocfg3 ... 
Creating mongors2n2 ... 
Creating mongors1n3 ... 
Creating mongors2n1 ... 
Creating mongors1n2 ...

ConfigDB replica set setup

Vinodhs-MBP:docker-mcluster vinodhkrish$ docker exec -it mongocfg1 mongo --quiet
> rs.initiate({  _id: "mongors1conf",configsvr: true,
...   members: [{ _id:0, host: "mongocfg1" },{_id:1, host: "mongocfg2" },{ _id:2, host: "mongocfg3"}]
...   })
{ "ok" : 1 }

Shard1 replica set setup

Vinodhs-MBP:docker-mcluster vinodhkrish$ docker exec -it mongors1n1 mongo --quiet
> rs.initiate( { _id: "mongors1", members:[ {_id: 0, host: "mongors1n1"}, {_id:1, host: "mongors1n2"}, {_id:2, host: "mongors1n3"}] })
{ "ok" : 1 }

Shard2 replica set setup:

Vinodhs-MBP:docker-mcluster vinodhkrish$ docker exec -it mongors2n1 mongo --quiet
> rs.initiate({ _id: "mongors2",  members: [ { _id: 0, host: "mongors2n1"}, { _id:1, host: "mongors2n2"}, { _id:2, host: "mongors2n3"}] })
{ "ok" : 1 }

I hope that this blog helps you to setup a MongoDB cluster and also to configure PMM to monitor it! Have a great day!

The post Configuring PMM Monitoring for MongoDB Cluster appeared first on Percona Database Performance Blog.

Jun
13
2018
--

Zone Based Sharding in MongoDB

MongoDB shard zones

MongoDB shard zonesIn this blog post, we will discuss about how to use zone based sharding to deploy a sharded MongoDB cluster in a customized manner so that the queries and data will be redirected per geographical groupings. This feature of MongoDB is a part of its Data Center Awareness, that allows queries to be routed to particular MongoDB deployments considering physical locations or configurations of mongod instances.

Before moving on, let’s have an overview of this feature. You might already have some questions about zone based sharding. Was it recently introduced? If zone-based sharding is something we should use, then what about tag-aware sharding?

MongoDB supported tag-aware sharding from even the initial versions of MongoDB. This means tagging a range of shard keys values, associating that range with a shard, and redirecting operations to that specific tagged shard. This tag-aware sharding, since version 3.4, is referred to as ZONES. So, the only change is its name, and this is the reason sh.addShardTag(shard, tag) method is being used.

How it works

  1. With the help of a shard key, MongoDB allows you to create zones of sharded data – also known as shard zones.
  2. Each zone can be associated with one or more shards.
  3. Similarly, a shard can associate with any number of non-conflicting zones.
  4. MongoDB migrates chunks to the zone range in the selected shards.
  5. MongoDB routes read and write to a particular zone range that resides in particular shards.

Useful for what kind of deployments/applications?

  1. In cases where data needs to be routed to a particular shard due to some hardware configuration restrictions.
  2. Zones can be useful if there is the need to isolate specific data to a particular shard. For example, in the case of GDPR compliance that requires businesses to protect data and privacy for an individual within the EU.
  3. If an application is being used geographically and you want a query to route to the nearest shards for both reads and writes.

Let’s consider a Scenario

Consider the scenario of a school where students are experts in Biology, but most students are experts in Maths. So we have more data for the maths students compare to Biology students. In this example, deployment requires that Maths students data should route to the shard with the better configuration for a large amount of data. Both read and write will be served by specific shards.  All the Biology students will be served by another shard. To implement this, we will add a tag to deploy the zones to the shards.

For this scenario we have an environment with:

DB: “school”

Collection: “students”

Fields: “sId”, “subject”, “marks” and so on..

Indexed Fields: “subject” and “sId”

We enable sharding:

sh.enableSharding("school")

And create a shardkey: “subject” and “sId” 

sh.shardCollection("school.students", {subject: 1, sId: 1});

We have two shards in our test environment

shards:

{  "_id" : "shard0000",  "host" : "127.0.0.1:27001",  "state" : 1 }
{  "_id" : "shard0001",  "host" : "127.0.0.1:27002",  "state" : 1 }

Zone Deployment

1) Disable balancer

To prevent migration of the chunks across the cluster, disable the balancer for the “students” collection:

mongos> sh.disableBalancing("school.students")
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

Before proceeding further make sure the balancer is not running. It is not a mandatory process, but it is always a good practice to make sure no migration of chunks takes place while configuring zones

mongos> sh.isBalancerRunning()
false

2) Add shard to the zone

A zone can be associated with a particular shard in the form of a tag, using the sh.addShardTag(), so a tag will be added to each shard. Here we are considering two zones so the tags “MATHS” and “BIOLOGY” need to be added.

mongos> sh.addShardTag( "shard0000" , "MATHS");
{ "ok" : 1 }
mongos> sh.addShardTag( "shard0001" , "BIOLOGY");
{ "ok" : 1 }

We can see zones are assigned in the form of tags as required against each shard.

mongos> sh.status()
 shards:
        {  "_id" : "shard0000",  "host" : "127.0.0.1:27001",  "state" : 1,  "tags" : [ "MATHS" ] }
        {  "_id" : "shard0001",  "host" : "127.0.0.1:27002",  "state" : 1,  "tags" : [ "BIOLOGY" ] }

3) Define ranges for each zone

Each zone covers one or more ranges of shard key values. Note: each range a zone covers is always inclusive of its lower boundary and exclusive of its upper boundary.

mongos> sh.addTagRange(
	"school.students",
	{ "subject" : "maths", "sId" : MinKey},
	{ "subject" : "maths", "sId" : MaxKey},
	"MATHS"
)
{ "ok" : 1 }
mongos> sh.addTagRange(
	"school.students",
	{ "subject" : "biology", "sId" : MinKey},
	{ "subject" : "biology", "sId" : MaxKey},
"BIOLOGY"
)
{ "ok" : 1 }

4) Enable balancer

Now enable the balancer so the chunks will migrate across the shards as per the requirement and all the read and write queries will be routed to the particular shards.

mongos> sh.enableBalancing("school.students")
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
mongos> sh.isBalancerRunning()
true

Let’s check how documents get routed as per the tags:

We have inserted 6 documents, 4 documents with “subject”:”maths” and 2 documents with “subject”:”biology”

mongos> db.students.find({"subject":"maths"}).count()
4
mongos> db.students.find({"subject":"biology"}).count()
2

Checking the shard distribution for the students collection:

mongos> db.students.getShardDistribution()
Shard shard0000 at 127.0.0.1:27003
data : 236B docs : 4 chunks : 4
estimated data per chunk : 59B
estimated docs per chunk : 1
Shard shard0001 at 127.0.0.1:27004
data : 122B docs : 2 chunks : 1
estimated data per chunk : 122B
estimated docs per chunk : 2

So in this test case, all the queries for the students collection have routed as per the tag used, with four documents inserted into shard0000 and two documents inserted to shard0001.

Any queries related to MATHS will route to shard0000 and queries related to BIOLOGY will route to shard0001, hence the load will be distributed as per the configuration of the shard, keeping the database performance optimized.

Sharding MongoDB using zones is a great feature provided by MongoDB. With the help of zones, data can be isolated to the specific shards. Or if we have any kind of hardware or configuration restrictions to the shards, it is a possible solution for routing the operations.

The post Zone Based Sharding in MongoDB appeared first on Percona Database Performance Blog.

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com