Nov
03
2023
--

How To Scale a Single-Host PostgreSQL Database With Citus

Single-Host PostgreSQL Database With Citus

When it comes to Citus, successfully building out and scaling a PostgreSQL cluster across multiple nodes and even across data centers can feel, at times, to be an art form because there are so many ways of building it out.

There’s an axiom that I think aptly applies to this situation describing the differences between science and art:

– Science: 1 problem -> 1 solution
– Art: 1 problem -> 1,000 solutions

While the two previous Citus blogs that I wrote covered the fairly straightforward concepts of creating columnar tables and leveraging data redundancy, this blog explores the architectural design considerations using this freely downloadable and fully featured PostgreSQL extension.

Rather than listing the concepts, function calls, etc, available in Citus, which frankly is a bit boring, I’m going to explore scaling out a database system starting with a single host. I won’t cover all the features but show just enough that you’ll want to see more of what you can learn to accomplish for yourself.

About the cluster

Citus PostgreSQL

Following a step-by-step process, the objective is to create a four-node cluster consisting of:

  • PostgreSQL version 15
  • Citus extension (I’ll be using version 11, but there are newer ones available.)
  • PostgreSQL Cluster
    • One coordinator node
      • citus-coord-01
    • Three worker nodes
      • citus1
      • citus2
      • citus3
  • Hardware
    AWS Instance
    Ubuntu Server 20.04, SSD volume type
    64-bit (x86)
    c5.xlarge 4vCPU 8GB-RAM
    Storage: EBS volume (root)
    80GB
    gp2 (IOPS 240/3000)

As well, high availability will be integrated, guaranteeing cluster viability in the case that one worker node goes down.

Leveraging pgbench, which is a benchmarking utility that comes bundled with PostgreSQL, I will put the cluster through its paces by executing a series of DML operations. Using its default tpc-b benchmark, one can stress test a database of any size ranging from a few clients to simulating thousands interacting with a system sized into the Terabytes if needs be.

Steps

Provisioning

The first step is to provision the four nodes with both PostgreSQL and Citus. Refer here or here for getting and installing the extension.

-- update each postgres node and enabling the extension 
-- with a server restart
alter system set shared_preload_libraries='citus';

On each of the four nodes, create the database with the Citus extension:

-- using psql
create database pgbench;
c pgbench
create extension citus;

One worker node

Beginning with a single worker node, log onto citus-coord-01, declaring the coordinator host, and starting with the single node, citus1:

-- execute on citus-coord-01
    psql pgbench <<_eof1_
        select citus_set_coordinator_host('citus-coord-01', 5432);
        select citus_add_node('citus1', 5432);
_eof1_

While still logged in database pgbench, get a list of the worker nodes, which at this time is just the one:

select * from master_get_active_worker_nodes();

node_name | node_port
-----------+-----------
 citus1    |      5432

Now, things get a little interesting; the normal initialization of pgbench is to create and populate the tables with the appropriate constraints. In this case, however, only the tables, without data or constraints, are initialized:

-- execute the following command on citus-coord-01
pgbench -iI t pgbench

pgbench=# d+
                                        List of relations
Schema |       Name       | Type  |  Owner   | Persistence | Access method |  Size   |
-------+------------------+-------+----------+-------------+---------------+---------+-----
public | citus_tables     | view  | postgres | permanent   |               | 0 bytes |
public | pgbench_accounts | table | postgres | permanent   | heap          | 0 bytes |
public | pgbench_branches | table | postgres | permanent   | heap          | 0 bytes |
public | pgbench_history  | table | postgres | permanent   | heap          | 0 bytes |
public | pgbench_tellers  | table | postgres | permanent   | heap          | 0 bytes |

This next set of commands, executed on the coordinator node, distributes the pgbench tables across node citus1:

psql pgbench <<_eof1_
    BEGIN;
        select create_distributed_table('pgbench_history', 'aid');
        select create_distributed_table('pgbench_accounts', 'aid');
        select create_distributed_table('pgbench_branches', 'bid');
        select create_distributed_table('pgbench_tellers', 'tid');
    COMMIT;
_eof1_

And now, we can begin populating the tables. The following invocation generates almost 4GB of data. Notice I still haven’t added any constraints:

pgbench -iI g -s 300 pgbench

This command shows the total number of shards on the node. The default number of shards generated for a given table is 32.

select * from citus_tables;

table_name    | citus_table_type | distribution_column | colocation_id | table_size | shard_count 
------------------+------------------+---------------------+---------------+------------+-------------
 pgbench_accounts | distributed      | aid                 |             1 | 3844 MB    |      32 
 pgbench_branches | distributed      | bid                 |             1 | 256 kB     |      32 
 pgbench_history  | distributed      | aid                 |             1 | 0 bytes    |      32 
 pgbench_tellers  | distributed      | tid                 |             1 | 256 kB     |      32

You’ll also notice that only some of the shards have been populated with data, which is normal when initializing pgbench with data:

select * from citus_shards;

table_name    | shardid |       shard_name        | citus_table_type | colocation_id | nodename | nodeport | shard_size
------------------+---------+-------------------------+------------------+---------------+----------+----------+------------
 pgbench_accounts |  102040 | pgbench_accounts_102040 | distributed      |             1 | citus1   |     5432 |  125853696
 pgbench_accounts |  102041 | pgbench_accounts_102041 | distributed      |             1 | citus1   |     5432 |  126173184
 pgbench_accounts |  102042 | pgbench_accounts_102042 | distributed      |             1 | citus1   |     5432 |  125739008
 pgbench_accounts |  102043 | pgbench_accounts_102043 | distributed      |             1 | citus1   |     5432 |  125968384
.
.
 pgbench_branches |  102072 | pgbench_branches_102072 | distributed      |             1 | citus1   |     5432 |       8192
 pgbench_branches |  102073 | pgbench_branches_102073 | distributed      |             1 | citus1   |     5432 |       8192
 pgbench_branches |  102074 | pgbench_branches_102074 | distributed      |             1 | citus1   |     5432 |       8192
 pgbench_branches |  102075 | pgbench_branches_102075 | distributed      |             1 | citus1   |     5432 |       8192
.
.
 pgbench_history  |  102008 | pgbench_history_102008  | distributed      |             1 | citus1   |     5432 |          0
 pgbench_history  |  102009 | pgbench_history_102009  | distributed      |             1 | citus1   |     5432 |          0
 pgbench_history  |  102010 | pgbench_history_102010  | distributed      |             1 | citus1   |     5432 |          0
 pgbench_history  |  102011 | pgbench_history_102011  | distributed      |             1 | citus1   |     5432 |          0
.
.
 pgbench_tellers  |  102104 | pgbench_tellers_102104  | distributed      |             1 | citus1   |     5432 |       8192
 pgbench_tellers  |  102105 | pgbench_tellers_102105  | distributed      |             1 | citus1   |     5432 |       8192
 pgbench_tellers  |  102106 | pgbench_tellers_102106  | distributed      |             1 | citus1   |     5432 |       8192
 pgbench_tellers  |  102107 | pgbench_tellers_102107  | distributed      |             1 | citus1   |     5432 |       8192
 pgbench_tellers  |  102108 | pgbench_tellers_102108  | distributed      |             1 | citus1   |     5432 |       8192
.
.

And now, execute the benchmark:

-- execute the following on the coordinator node
pgbench -c 20 -j 3 -T 60 -P 3 pgbench

The results are not pretty.

transaction type: <builtin: TPC-B (sort of)>
scaling factor: 300
query mode: simple
number of clients: 20
number of threads: 3
maximum number of tries: 1
duration: 60 s
number of transactions actually processed: 1524
number of failed transactions: 0 (0.000%)
latency average = 791.374 ms
latency stddev = 157.710 ms
initial connection time = 15.704 ms
tps = 25.197427 (without initial connection time)

ATTENTION: Refer to the bottom of this blog, which summarizes a tabulation of the various benchmarks as this cluster evolves.

In order to speed up the benchmark indexes must be added. But, this standard pgbench method of adding indexes fails:

postgres@citus-coord-01:~$ pgbench -iI p pgbench
creating primary keys...
pgbench: error: query failed: ERROR:  cannot create constraint without a name on a distributed table
pgbench: detail: Query was: alter table pgbench_branches add primary key (bid)

Successfully adding indexes and constraints requires them to be explicitly named and created in the following fashion. Note that table pgbench_history requires a REPLICA identity because it doesn’t have a primary key:

# THIS WORKS!
psql pgbench <<_eof1_
--
-- indexes and constraints must be explicitly named
--
BEGIN;
create unique index pgbench_accounts_pk on pgbench_accounts(aid);
create unique index pgbench_branches_pk on pgbench_branches(bid);
create unique index pgbench_tellers_pk on pgbench_tellers(tid);

alter table pgbench_accounts
   add constraint pk_accounts primary key using index pgbench_accounts_pk;

alter table pgbench_branches
   add constraint pk_branches primary key using index pgbench_branches_pk;

alter table pgbench_tellers
   add constraint pk_tellers primary key using index pgbench_tellers_pk;

qecho adding REPLICA IDENTITY (no PK present)
alter table pgbench_history replica identity full;
COMMIT;
_eof1_

Repeating the benchmark run yields a much better result:

-- execute the following on the coordinator node
pgbench -c 20 -j 3 -T 60 -P 3 pgbench

scaling factor: 300
query mode: simple
number of clients: 20
number of threads: 3
maximum number of tries: 1
duration: 60 s
number of transactions actually processed: 135273
number of failed transactions: 0 (0.000%)
latency average = 8.865 ms
latency stddev = 9.452 ms
initial connection time = 15.556 ms
tps = 2254.544852 (without initial connection time)

Two worker nodes

Adding a second worker node is straightforward requiring only two steps:

  • Adding the node.
  • Rebalancing the shards across the two nodes.

Execute this on the coordinator node, citus-coord-01:

psql pgbench <<_eof1_
        --
        -- it's understood that database pgbench already exists on citus2
        --  and extension citus has been created
        --
        qecho adding node citus2 ...
        select citus_add_node('citus2', 5432);
        qecho rebalancing shards across TWO nodes ...
        select * from rebalance_table_shards();
        ANALYZE;
_eof1_

Here’s a partial output as the tables are rebalanced across the two nodes:

adding node citus2 ...
citus_add_node
----------------
             3

rebalancing shards across TWO nodes ...
NOTICE:  Moving shard 102008 from citus1:5432 to citus2:5432 ...
NOTICE:  Moving shard 102009 from citus1:5432 to citus2:5432 ...
NOTICE:  Moving shard 102010 from citus1:5432 to citus2:5432 ...
NOTICE:  Moving shard 102011 from citus1:5432 to citus2:5432 ...
.
.
NOTICE:  Moving shard 102020 from citus1:5432 to citus2:5432 ...
NOTICE:  Moving shard 102021 from citus1:5432 to citus2:5432 ...
NOTICE:  Moving shard 102022 from citus1:5432 to citus2:5432 ...
NOTICE:  Moving shard 102023 from citus1:5432 to citus2:5432 ...

Repeating the benchmark once again demonstrates that performance has degraded somewhat. This is to be expected, considering my POC consists of containers on a single machine, all sharing the same system resources:

-- execute the following on the coordinator node
pgbench -c 20 -j 3 -T 60 -P 3 pgbench

transaction type: <builtin: TPC-B (sort of)>
scaling factor: 300
query mode: simple
number of clients: 20
number of threads: 3
maximum number of tries: 1
duration: 60 s
number of transactions actually processed: 130190
number of failed transactions: 0 (0.000%)
latency average = 9.214 ms
latency stddev = 19.159 ms
initial connection time = 15.776 ms
tps = 2169.559969 (without initial connection time)

Three worker nodes

As per the two-worker node configuration, adding another node is straightforward.

psql pgbench <<_eof1_
       qecho adding node citus3 ...
        select citus_add_node('citus3', 5432);
        qecho rebalancing shards across TWO nodes ...
         select * from rebalance_table_shards();
_eof1_

Repeating the benchmark gives us the updated performance metrics.

-- execute the following on the coordinator node
pgbench -c 20 -j 3 -T 60 -P 3 pgbench

scaling factor: 300
query mode: simple
number of clients: 20
number of threads: 3
maximum number of tries: 1
duration: 60 s
number of transactions actually processed: 142630
number of failed transactions: 0 (0.000%)
latency average = 8.410 ms
latency stddev = 5.542 ms
initial connection time = 16.407 ms
tps = 2377.227920 (without initial connection time)

Three worker nodes, with redundancy/replication

This next step is interesting because it adds redundancy by increasing the replication factor from one to two such that two copies of each data shard of each table are not only duplicated but located on two distinct nodes. 

ATTENTION: A newly updated replication factor affects tables only after the fact. Pre Existing tables are not affected by the new replication factor:

psql pgbench <<_eof1_
        qecho increasing replication factor to TWO
        alter system set citus.shard_replication_factor=2;
        select pg_reload_conf();
_eof1_

TIP: In order to replicate tables, set the replication factor before creating them.

In order to replicate the existing tables across the nodes, they are first undistributed by centralizing the tables onto the coordinator node. Redistributing them across the three node clusters automatically adds replication.

This centralizes the tables onto the coordinator node:

psql pgbench <<_eof1_
    BEGIN;
        select undistribute_table('pgbench_history');
        select undistribute_table('pgbench_accounts');
        select undistribute_table('pgbench_branches');
        select undistribute_table('pgbench_tellers');
    COMMIT;
_eof1_

The tables are redistributed and rebalanced across the three-node cluster with the correct redundancy of two:

psql pgbench <<_eof1_
BEGIN;
  select create_distributed_table('pgbench_history', 'aid');
  select create_distributed_table('pgbench_accounts', 'aid');
  select create_distributed_table('pgbench_branches', 'bid');
  select create_distributed_table('pgbench_tellers', 'tid');
COMMIT;

-- remove legacy tables from the coordinator node
BEGIN;
   select truncate_local_data_after_distributing_table($$public.pgbench_history$$);
   select truncate_local_data_after_distributing_table($$public.pgbench_accounts$$);
   select truncate_local_data_after_distributing_table($$public.pgbench_branches$$);
   select truncate_local_data_after_distributing_table($$public.pgbench_tellers$$);
COMMIT;
_eof1_

Repeating the benchmark gives us the updated performance metrics.

-- execute the following on the coordinator node
pgbench -c 20 -j 3 -T 60 -P 3 pgbench

transaction type: <builtin: TPC-B (sort of)>
scaling factor: 300
query mode: simple
number of clients: 20
number of threads: 3
maximum number of tries: 1
duration: 60 s
number of transactions actually processed: 68834
number of failed transactions: 0 (0.000%)
latency average = 17.430 ms
latency stddev = 12.818 ms
initial connection time = 16.321 ms
tps = 1147.093369 (without initial connection time)

Benchmark results

Here’s a summary of the various benchmarking runs, giving you an idea of relative performance.

RUN NUM NODES WITH  IDX Replication Factor TPS
1 1 no 1 25
2 1 yes 1 2255
3 2 yes 1 2170
4 3 yes 1 2377
5 3 yes 2 1147

Caveat

Although not mentioned in the blog, there are a few things you’ll need to keep in mind as you work with CitusDB:

  1. The password for the account used to propagate the operations across the cluster is input in the home account’s .pgpass file on all nodes.
  2. The “wal_level” is set at logical.
  3. Redundancy can potentially decrease overall performance.

Remember to keep in mind that the performance metrics are dependent not only upon the cluster’s hardware and system resources but as to the level of tuning, which, in this case, has not been addressed.

A future blog will continue my exploration into Citus by scaling out pgbench into other architectures.

Percona Distribution for PostgreSQL provides the best and most critical enterprise components from the open-source community, in a single distribution, designed and tested to work together.

 

Download Percona Distribution for PostgreSQL Today!

Aug
29
2023
--

Data Redundancy With the PostgreSQL Citus Extension

Data Redundancy With the PostgreSQL Citus

Over the years, I’ve had the opportunity to architect all sorts of configurations using Postgres as a backend. I’ve always found it very cool and satisfying to implement sophisticated business rules, often in more ways than one has fingers and toes. So, it’s not an understatement when I say that Citus is one of the more interesting technologies that I’ve come across when scaling PostgreSQL.

Citus is an extension that horizontally scales PostgreSQL across multiple machines by sharding and creating redundant copies of tables. Its query engine not only parallelizes incoming SQL queries for time series but also has the ability to create column-wise tables making it ideal for OLAP analysis duties.

Among its many capabilities, a Citus cluster can:

  • Create distributed tables that are sharded across a cluster of PostgreSQL nodes to combine their CPU, memory, storage, and I/O capacity.
  • Reference tables can be replicated to all nodes for joins and foreign keys from distributed tables and for maximum read performance.
  • The distributed query engine can route and parallelize SELECT, DML, and other operations on distributed tables across the cluster.
  • Columnar storage of tables can compress data, speeding up scans and supporting fast projections, both on regular and distributed tables.

Data redundancy, a database version of a RAID

Pondering the case of high availability and redundancy, one replicates data by creating a replica via streaming replication.

Now let’s stretch our imagination and consider a second method of high availability, ala Citus.

The best way to describe the Citus way of doing things is to reflect how data is managed by a disk RAID array. Depending on the configuration, one can tune a hardware RAID for either performance or redundancy. The same can be said for Citus data sharding.

Here is an example of a table named replica2x, which has 2X redundancy across a cluster of four (4) nodes. The colors indicate duplicated shards of the table. For example, if node citus1 goes offline, the sharded table it holds still has copies on nodes citus2 and citus4. Likewise, it can be said that if node citus2 goes offline, the same data is still available on nodes 1, 3, and 4.

Citus example

About this POC

I’ll be upfront: I love working with Linux Containers, LXD. Much of what I will show you makes heavy use of them. You won’t need LXD to replicate this POC, of course, but I can’t say enough how flexible and versatile such an environment can be when prototyping a cluster of Postgres nodes, let alone an entire multi-data center infrastructure on a single workstation.

There are two parts to this POC;

  • Part A: Setup
  • Part B: Redundancy demonstration

Part A: POC setup

Step one: Getting and installing Citus

Referring to this earlier blog, you’ll see how to get and install Citus into your environment.

Step two: Creating the Citus nodes

The Citus cluster consists of a five (5) node cluster:

  • citus-coord-01: coordinator
  • citus1: worker
  • citus2: worker
  • citus3: worker
  • citus4: worker

By using LXD, I created a single templated container with Citus on Ubuntu 20.04, where the various nodes were copied from this template.

for u in citus-coord-01 citus1 citus2 citus3 citus4
do
    echo "==== $u ===="
    lxc rm --force $u 2>/dev/null
    lxc cp template-ubuntu-2004-citusdb $u
    lxc start $u
done

And here’s the resultant cluster:

lxc ls -c ns4 citus

+----------------+---------+----------------------+
|      NAME      |  STATE  |         IPV4         |
+----------------+---------+----------------------+
| citus1         | RUNNING | 10.231.38.140 (eth0) |
+----------------+---------+----------------------+
| citus2         | RUNNING | 10.231.38.151 (eth0) |
+----------------+---------+----------------------+
| citus3         | RUNNING | 10.231.38.171 (eth0) |
+----------------+---------+----------------------+
| citus4         | RUNNING | 10.231.38.204 (eth0) |
+----------------+---------+----------------------+
| citus-coord-01 | RUNNING | 10.231.38.34 (eth0)  |
+----------------+---------+----------------------+

It’s understood that on each of the five nodes:

  1. Database db01 has already been created.
  2. The postgresql.conf configuration file has been appropriately edited for remote access.
  3. The .pgpass file has been configured to supply the superuser password for all nodes.
  4. Extension citus has been created in database db01.

Step three: Check packages

Inspecting the nodes confirms Citus has been correctly installed:

for u in citus1 citus2 citus3 citus4 citus-coord-01
do

echo "==== NODE: $u ===="

lxc exec $u -- su - postgres -c 'psql db01'<<_eof_
select extname, extversion from pg_extension;
_eof_

done | less -S

==== NODE: citus1 ====
    extname     | extversion 
----------------+------------
 plpgsql        | 1.0
 citus_columnar | 11.1-1
 citus          | 11.1-1

==== NODE: citus2 ====
    extname     | extversion 
----------------+------------
 plpgsql        | 1.0
 citus_columnar | 11.1-1
 citus          | 11.1-1

==== NODE: citus3 ====
    extname     | extversion 
----------------+------------
 plpgsql        | 1.0
 citus_columnar | 11.1-1
 citus          | 11.1-1

==== NODE: citus4 ====
    extname     | extversion 
----------------+------------
 plpgsql        | 1.0
 citus_columnar | 11.1-1
 citus          | 11.1-1

==== NODE: citus-coord-01 ====
    extname     | extversion 
----------------+------------
 plpgsql        | 1.0
 citus_columnar | 11.1-1
 citus          | 11.1-1

Properly installed, the Citus runtime variables are now available:

lxc exec citus1 -- su postgres -c psql db01<<_eof_ | less -S
    select name,setting,unit from pg_settings where name ~ 'citus' order by 1;
_eof_

name                        |     setting     | unit 
----------------------------------------------------+-----------------+------
 citus.all_modifications_commutative                | off             | 
 citus.background_task_queue_interval               | 5000            | ms
 citus.cluster_name                                 | default         | 
 citus.coordinator_aggregation_strategy             | row-gather      | 
 citus.count_distinct_error_rate                    | 0               | 
 citus.cpu_priority                                 | 0               | 
 citus.cpu_priority_for_logical_replication_senders | inherit         | 
 citus.defer_drop_after_shard_move                  | on              | 
 citus.defer_drop_after_shard_split                 | on              | 
 citus.defer_shard_delete_interval                  | 15000           | ms
 citus.desired_percent_disk_available_after_move    | 10              | 
 citus.distributed_deadlock_detection_factor        | 2               | 
 citus.enable_binary_protocol                       | on              | 
 citus.enable_create_role_propagation               | on              | 
 citus.enable_deadlock_prevention                   | on              | 
 citus.enable_local_execution                       | on              | 
 citus.enable_local_reference_table_foreign_keys    | on              | 
 citus.enable_repartition_joins                     | off             | 
 citus.enable_statistics_collection                 | off             | 
 citus.explain_all_tasks                            | off             | 
 citus.explain_analyze_sort_method                  | execution-time  | 
 citus.limit_clause_row_fetch_count                 | -1              | 
 citus.local_hostname                               | localhost       | 
 citus.local_shared_pool_size                       | 50              | 
 citus.local_table_join_policy                      | auto            | 
 citus.log_remote_commands                          | off             | 
 citus.max_adaptive_executor_pool_size              | 16              | 
 citus.max_cached_connection_lifetime               | 600000          | ms
 citus.max_cached_conns_per_worker                  | 1               | 
 citus.max_client_connections                       | -1              | 
 citus.max_high_priority_background_processes       | 2               | 
 citus.max_intermediate_result_size                 | 1048576         | kB
 citus.max_matview_size_to_auto_recreate            | 1024            | MB
 citus.max_shared_pool_size                         | 100             | 
 citus.max_worker_nodes_tracked                     | 2048            | 
 citus.multi_shard_modify_mode                      | parallel        | 
 citus.multi_task_query_log_level                   | off             | 
 citus.node_connection_timeout                      | 30000           | ms
 citus.node_conninfo                                | sslmode=require | 
 citus.propagate_set_commands                       | none            | 
 citus.recover_2pc_interval                         | 60000           | ms
 citus.remote_task_check_interval                   | 10              | ms
 citus.shard_count                                  | 32              | 
 citus.shard_replication_factor                     | 1               | 
 citus.show_shards_for_app_name_prefixes            |                 | 
 citus.skip_constraint_validation                   | off             | 
 citus.skip_jsonb_validation_in_copy                | on              | 
 citus.stat_statements_track                        | none            | 
 citus.task_assignment_policy                       | greedy          | 
 citus.task_executor_type                           | adaptive        | 
 citus.use_citus_managed_tables                     | off             | 
 citus.use_secondary_nodes                          | never           | 
 citus.values_materialization_threshold             | 100             | 
 citus.version                                      | 11.1.4          | 
 citus.worker_min_messages                          | notice          | 
 citus.writable_standby_coordinator                 | off             |

Step 4: Define/configure the cluster

Log into the coordinator in order to declare and configure the cluster:

lxc exec citus-coord-01 -- su - postgres -c 'psql db01'<<_eof_
    select citus_set_coordinator_host('citus-coord-01', 5432);
    insert into pg_dist_node(nodename)
        values ('citus1')
              ,('citus2')
              ,('citus3')
              ,('citus4');
_eof_

And here’s the cluster’s organization:

db01=# select nodeid,nodename,groupid,isactive from pg_dist_node order by 1;
 nodeid |    nodename    | groupid | isactive 
--------+----------------+---------+----------
      1 | citus-coord-01 |       0 | t
      2 | citus1         |       1 | t
      3 | citus2         |       2 | t
      4 | citus3         |       3 | t
      5 | citus4         |       4 | t

Step four: Create, distribute, and populate a single table

Table myevents is created, and the newly inserted records are evenly distributed across the cluster of nodes.

Login to the coordinator and execute the following commands. Notice that all DML and SQL statements are executed on the coordinator node:

lxc exec citus-coord-01 -- su - postgres -c 'psql db01'<<_eof_

-- create the table
    create table myevents (
        device_id
        bigint,
        event_id
        bigserial,
        event_time
        timestamptz default now(),
        data
        jsonb not null,
        primary key (device_id, event_id)
    );

-- distribute the events among the nodes
    select create_distributed_table('myevents', 'device_id');

-- populate the table
    insert into myevents (device_id, data)
        select s % 100, ('{"measurement":'||random()||'}')::jsonb
        from generate_series(1,1000000) s;
_eof_

Querying the coordinator:

lxc exec citus-coord-01 -- su - postgres -c 'psql db01'<<_eof_ | less -S
select *
    from myevents
    where device_id = 1
    order by event_time desc, event_id desc
    limit 10;
_eof_

device_id | event_id |          event_time           |                data                  
----------+----------+-------------------------------+-------------------------------------
        1 |   999901 | 2023-08-16 14:51:12.618467-07 | {"measurement": 0.2868659956537316}
        1 |   999801 | 2023-08-16 14:51:12.618467-07 | {"measurement": 0.7931493079697731}
        1 |   999701 | 2023-08-16 14:51:12.618467-07 | {"measurement": 0.4875322951757288}
        1 |   999601 | 2023-08-16 14:51:12.618467-07 | {"measurement": 0.6491362745752653}
        1 |   999501 | 2023-08-16 14:51:12.618467-07 | {"measurement": 0.9629266554851366}
        1 |   999401 | 2023-08-16 14:51:12.618467-07 | {"measurement": 0.1185674800281864}
        1 |   999301 | 2023-08-16 14:51:12.618467-07 | {"measurement": 0.5133762596297742}
        1 |   999201 | 2023-08-16 14:51:12.618467-07 | {"measurement": 0.7307634886202119}
        1 |   999101 | 2023-08-16 14:51:12.618467-07 | {"measurement": 0.2997471209159892}
        1 |   999001 | 2023-08-16 14:51:12.618467-07 | {"measurement": 0.9692520484104021}

Step five: A review of table distributions across the cluster

This demonstrates clearly that table myevents is hash-sharded across every node member:

  • Notice how well-balanced the sharded tables are in size.
  • Notice the table name numbering order; see how each named shard that is incremented by one is found on the next node.
for u in citus1 citus2 citus3 citus4
do
echo "==== NODE: $u ===="
lxc exec $u -- su - postgres -c 'psql db01'<<_eof_
    dt+
_eof_
done | less -S

==== NODE: citus1 ====
                                        List of relations
Schema |      Name       | Type  |  Owner   | Persistence | Access method |  Size   | Description  
-------+-----------------+-------+----------+-------------+---------------+---------+-------------
public | myevents_102008 | table | postgres | permanent   | heap          | 2832 kB |  
public | myevents_102012 | table | postgres | permanent   | heap          | 2840 kB |  
public | myevents_102016 | table | postgres | permanent   | heap          | 5624 kB |  
public | myevents_102020 | table | postgres | permanent   | heap          | 2840 kB |  
public | myevents_102024 | table | postgres | permanent   | heap          | 1904 kB |  
public | myevents_102028 | table | postgres | permanent   | heap          | 5632 kB |  
public | myevents_102032 | table | postgres | permanent   | heap          | 2840 kB |  
public | myevents_102036 | table | postgres | permanent   | heap          | 6560 kB |  

==== NODE: citus2 ====
                                        List of relations
Schema |      Name       | Type  |  Owner   | Persistence | Access method |  Size   | Description  
-------+-----------------+-------+----------+-------------+---------------+---------+-------------
public | myevents_102009 | table | postgres | permanent   | heap          | 4696 kB |  
public | myevents_102013 | table | postgres | permanent   | heap          | 976 kB  |  
public | myevents_102017 | table | postgres | permanent   | heap          | 1904 kB |  
public | myevents_102021 | table | postgres | permanent   | heap          | 3768 kB |  
public | myevents_102025 | table | postgres | permanent   | heap          | 1904 kB |  
public | myevents_102029 | table | postgres | permanent   | heap          | 2840 kB |  
public | myevents_102033 | table | postgres | permanent   | heap          | 1904 kB |  
public | myevents_102037 | table | postgres | permanent   | heap          | 2840 kB |  

==== NODE: citus3 ====
                                         List of relations
Schema |      Name       | Type  |  Owner   | Persistence | Access method |    Size    | Description  
-------+-----------------+-------+----------+-------------+---------------+------------+-------------
public | myevents_102010 | table | postgres | permanent   | heap          | 1904 kB    |  
public | myevents_102014 | table | postgres | permanent   | heap          | 1904 kB    |  
public | myevents_102018 | table | postgres | permanent   | heap          | 1904 kB    |  
public | myevents_102022 | table | postgres | permanent   | heap          | 4696 kB    |  
public | myevents_102026 | table | postgres | permanent   | heap          | 8192 bytes |  
public | myevents_102030 | table | postgres | permanent   | heap          | 2832 kB    |  
public | myevents_102034 | table | postgres | permanent   | heap          | 976 kB     |  
public | myevents_102038 | table | postgres | permanent   | heap          | 4696 kB    |  

==== NODE: citus4 ====
                                        List of relations
Schema |      Name       | Type  |  Owner   | Persistence | Access method |  Size   | Description  
-------+-----------------+-------+----------+-------------+---------------+---------+-------------
public | myevents_102011 | table | postgres | permanent   | heap          | 5632 kB |  
public | myevents_102015 | table | postgres | permanent   | heap          | 1904 kB |  
public | myevents_102019 | table | postgres | permanent   | heap          | 1904 kB |  
public | myevents_102023 | table | postgres | permanent   | heap          | 1904 kB |  
public | myevents_102027 | table | postgres | permanent   | heap          | 2840 kB |  
public | myevents_102031 | table | postgres | permanent   | heap          | 2832 kB |  
public | myevents_102035 | table | postgres | permanent   | heap          | 1904 kB |  
public | myevents_102039 | table | postgres | permanent   | heap          | 4696 kB |

Part B: Redundancy demonstration

Method

  • Step 1: Update shard replication factor from 1X to 2X
  • Step 2: Create table myevents2x with 2X redundancy
  • Step 3: Identify shard myevents2x_102040 across citus1 and citus2
  • Step 4: Identify some records to query from shards known to be on nodes citus1 and citus2
  • Step 5: Test
    • Shutdown citus1; perform the aforementioned identified query
    • Startup citus1, shutdown citus2; perform afore identified query
    • Restart citus2; perform the aforementioned identified query

ATTENTION: Please note that you may have to edit your own queries as the values may be different for your setup.

Step one:

Update shard replication factor from 1X to 2X:

lxc exec citus-coord-01 -- su - postgres -c 'psql db01'<<_eof_
    show citus.shard_replication_factor;
    alter system set citus.shard_replication_factor=2;
    select pg_reload_conf();
_eof_

# validate
lxc exec citus-coord-01 -- su - postgres -c 'psql db01'<<_eof_
    show citus.shard_replication_factor;
_eof_

citus.shard_replication_factor  
--------------------------------
2

Step two:

Table myevents2x is created and populated with a redundancy of 2X across the cluster:

lxc exec citus-coord-01 -- su - postgres -c 'psql db01'<<_eof_ | less -S
-- create a new table with 2X redundancy
    create table myevents2x (
        device_id
        bigint,
        event_id
        bigserial,
        event_time
        timestamptz default now(),
        data
        jsonb not null,
        primary key (device_id, event_id)
    );

-- distribute the events among the nodes
    select create_distributed_table('myevents2x', 'device_id');


-- confirm table has been added across the cluster
    select * from master_get_active_worker_nodes() order by 1;

-- populate the table
    insert into myevents2x (device_id, data)
        select s % 100, ('{"measurement":'||random()||'}')::jsonb
        from generate_series(1,1000000) s;
_eof_

Here’s the output:

CREATE TABLE
 create_distributed_table 
--------------------------
 
(1 row)

 node_name | node_port 
-----------+-----------
 citus1    |      5432
 citus2    |      5432
 citus3    |      5432
 citus4    |      5432
(4 rows)

INSERT 0 1000000

Step three:

Locate shard myevents2x_102040, which should be on nodes citus1 and citus2:

for u in citus1 citus2 citus3 citus4
do
echo "==== NODE: $u ===="
lxc exec $u -- su - postgres -c 'psql db01'<<_eof_
    select tablename from pg_tables where tablename~'myevents2x' order by 1
_eof_
done | less -S

Here’s the output:

==== NODE: citus1 ====
     tablename     
-------------------
 myevents2x_102040
 myevents2x_102043
 myevents2x_102044
 myevents2x_102047
 myevents2x_102048
 myevents2x_102051
 myevents2x_102052
 myevents2x_102055
 myevents2x_102056
 myevents2x_102059
 myevents2x_102060
 myevents2x_102063
 myevents2x_102064
 myevents2x_102067
 myevents2x_102068
 myevents2x_102071
(16 rows)

==== NODE: citus2 ====
     tablename     
-------------------
 myevents2x_102040
 myevents2x_102041
 myevents2x_102044
 myevents2x_102045
 myevents2x_102048
 myevents2x_102049
 myevents2x_102052
 myevents2x_102053
 myevents2x_102056
 myevents2x_102057
 myevents2x_102060
 myevents2x_102061
 myevents2x_102064
 myevents2x_102065
 myevents2x_102068
 myevents2x_102069

==== NODE: citus3 ====
     tablename     
-------------------
 myevents2x_102041
 myevents2x_102042
 myevents2x_102045
 myevents2x_102046
 myevents2x_102049
 myevents2x_102050
 myevents2x_102053
 myevents2x_102054
 myevents2x_102057
 myevents2x_102058
 myevents2x_102061
 myevents2x_102062
 myevents2x_102065
 myevents2x_102066
 myevents2x_102069
 myevents2x_102070

==== NODE: citus4 ====
     tablename     
-------------------
 myevents2x_102042
 myevents2x_102043
 myevents2x_102046
 myevents2x_102047
 myevents2x_102050
 myevents2x_102051
 myevents2x_102054
 myevents2x_102055
 myevents2x_102058
 myevents2x_102059
 myevents2x_102062
 myevents2x_102063
 myevents2x_102066
 myevents2x_102067
 myevents2x_102070
 myevents2x_102071

Step four:

Locate and return the first three records of shard myevents2x_102040 on nodes citus1 and citus2:

lxc exec citus1 -- su - postgres -c 'psql db01'<<_eof_ | less -S
    qecho ==== citus1 ====
    qecho select * from myevents2x_102040 order by 1,2 limit 3
    select * from myevents2x_102040 order by 1,2 limit 3;
    c 'host=citus2 dbname=db01 user=postgres'
    qecho ==== citus2 ====
    qecho select * from myevents2x_102040 order by 1,2 limit 3
    select * from myevents2x_102040 order by 1,2 limit 3;
    c 'host=citus-coord-01 dbname=db01 user=postgres'
    qecho ==== coordinator ====
    qecho select * from myevents2x where device_id=8 and event_id in (8,108,208) order by 1,2
    select * from myevents2x where device_id=8 and event_id in (8,108,208) order by 1,2;
_eof_

Here’s the output:

==== citus1 ====
select * from myevents2x_102040 order by 1,2 limit 3
 device_id | event_id |          event_time           |                data                 
-----------+----------+-------------------------------+-------------------------------------
         8 |        8 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.5055415404961443}
         8 |      108 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.9145586163936634}
         8 |      208 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.8032392420487922}


You are now connected to database "db01" as user "postgres" on host "citus2" (address "fd42:cb6a:5384:9a60:216:3eff:fe38>
==== citus2 ====
select * from myevents2x_102040 order by 1,2 limit 3
 device_id | event_id |          event_time           |                data                 
-----------+----------+-------------------------------+-------------------------------------
         8 |        8 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.5055415404961443}
         8 |      108 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.9145586163936634}
         8 |      208 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.8032392420487922}


You are now connected to database "db01" as user "postgres" on host "citus-coord-01" (address "fd42:cb6a:5384:9a60:216:3>
==== coordinator ====
select * from myevents2x where device_id=8 and event_id in (8,108,208) order by 1,2
 device_id | event_id |          event_time           |                data                 
-----------+----------+-------------------------------+-------------------------------------
         8 |        8 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.5055415404961443}
         8 |      108 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.9145586163936634}
         8 |      208 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.8032392420487922}

Step five:

The next few steps demonstrate our ability to continuously query and return those records found in shard myevents2x_102040.

Step 5a: Test, shutdown node citus1

lxc stop citus1

lxc exec citus-coord-01 -- su - postgres -c 'psql db01'<<_eof_ | less -S
    qecho "==== citus1 is shutdown ===="
    qecho ==== querying coordinator ====
    qecho select * from myevents2x where device_id=8 and event_id in (8,108,208) order by 1,2
    select * from myevents2x where device_id=8 and event_id in (8,108,208) order by 1,2;
_eof_

Here’s the output:

"==== citus1 is shutdown ===="
==== querying coordinator ====
select * from myevents2x where device_id=8 and event_id in (8,108,208) order by 1,2
 device_id | event_id |          event_time           |                data                 
-----------+----------+-------------------------------+-------------------------------------
         8 |        8 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.5055415404961443}
         8 |      108 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.9145586163936634}
         8 |      208 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.8032392420487922}

Step 5b: Test, restart citus1, and shutdown citus2

lxc start citus1
lxc stop citus2

lxc exec citus-coord-01 -- su - postgres -c 'psql db01'<<_eof_ | less -S
    qecho "==== citus2 is shutdown ===="
    qecho ==== querying coordinator ====
    qecho select * from myevents2x where device_id=8 and event_id in (8,108,208) order by 1,2
    select * from myevents2x where device_id=8 and event_id in (8,108,208) order by 1,2;
_eof_

Here’s the output; note that it’s exactly the same as the previous test:

"==== citus2 is shutdown ===="
==== querying coordinator ====
select * from myevents2x where device_id=8 and event_id in (8,108,208) order by 1,2
 device_id | event_id |          event_time           |                data                 
-----------+----------+-------------------------------+-------------------------------------
         8 |        8 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.5055415404961443}
         8 |      108 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.9145586163936634}
         8 |      208 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.8032392420487922}

Step 5c: Test, restart node citus2

lxc start citus2

lxc exec citus-coord-01 -- su - postgres -c 'psql db01'<<_eof_ | less -S
    qecho "==== cluster restored ===="
    qecho ==== querying coordinator ====
    qecho select * from myevents2x where device_id=8 and event_id in (8,108,208) order by 1,2
    select * from myevents2x where device_id=8 and event_id in (8,108,208) order by 1,2;
_eof_

Here’s the output, of course!

"==== cluster restored ===="
==== querying coordinator ====
select * from myevents2x where device_id=8 and event_id in (8,108,208) order by 1,2
device_id | event_id |          event_time           |                data                  
----------+----------+-------------------------------+-------------------------------------
        8 |        8 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.5055415404961443}
        8 |      108 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.9145586163936634}
        8 |      208 | 2023-08-17 07:10:51.622082-07 | {"measurement": 0.8032392420487922}

Conclusion

Data redundancy is the hallmark of high availability. But with Citus, we’ve raised the bar. Can you think of a better system that can stay up without losing time initiating failovers when a node fails?

Have fun!

Percona Distribution for PostgreSQL provides the best and most critical enterprise components from the open-source community in a single distribution, designed and tested to work together.

Download Percona Distribution for PostgreSQL Today!

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com