The Hard Truth About Self-Hosting ClickHouse® Infra at Scale
Introduction
ReplicatedMergeTree broke my brain.
At Numia, we serve real-time blockchain APIs at near-petabyte scale. ClickHouse handles this workload beautifully. Columnar storage, vectorized execution, compression ratios that make storage costs almost irrelevant. For real-time analytical queries across billions of rows, nothing else comes close.
But here's what the documentation doesn't prepare you for: the distributed setup is where ClickHouse stops being a database and starts being a distributed systems problem you're responsible for solving if you don't want to pay for the cloud version.
We spent 12 months building tooling to manage ClickHouse clusters. A CLI that abstracted the complexity. AI skills that let our agents create tables, run migrations, set up pipelines. Our error rate dropped. And then we deleted all of it. Not refactored. Not simplified. Deleted. Two thousand lines of code, gone.
This post is about why. It's about the architectural problem that no amount of tooling could fix, and how we finally built something that made the complexity disappear.
If you've ever wondered why your ClickHouse cluster feels like it's held together with duct tape and optimism, you're in the right place. If you've ever debugged a replication queue at 3 AM while a client was complaining, this story might feel familiar. And if you're evaluating ClickHouse and trying to understand what you're getting into, consider this your field guide to the distributed setup nobody warns you about.
Let me show you exactly what went wrong.
The N*X+1 Table Problem
Every logical table in a distributed ClickHouse cluster becomes N*X+1 physical tables. One table in your mental model, a bunch of them in reality. Here's how it happens.
A distributed cluster spreads processing and storage across machines called shards. You want a table called events, but you can't just create events. You need a _local table on each shard that actually stores the data:
-- The local table on each shard (schema defined here)
CREATE TABLE events_local ON CLUSTER '{cluster}' (
chain_id UInt32,
block_number UInt64,
tx_hash String,
timestamp DateTime,
event_type LowCardinality(String),
payload String
) ENGINE = ReplicatedMergeTree(
'events_local',
'{replica}'
)
PARTITION BY toYYYYMM(timestamp)
ORDER BY (chain_id, block_number, tx_hash);
With three shards, that's three physical tables: events_local on shard 1, shard 2, and shard 3. But shards alone don't give you fault tolerance. If one goes down, the data it holds becomes unavailable. So you add replicas, copies of each shard that can serve reads and take over if the primary fails. With three shards and two replicas each, you're now at six physical tables:
events_localon shard 1, replica 1events_localon shard 1, replica 2events_localon shard 2, replica 1events_localon shard 2, replica 2events_localon shard 3, replica 1events_localon shard 3, replica 2
Six tables, but you still can't query them directly without knowing which shard holds which data. That's where the Distributed table comes in: a facade that reads from all local tables so you don't have to.
-- The Distributed table (just routing logic)
CREATE TABLE events AS events_local
ENGINE = Distributed(
'{cluster}',
default,
events_local,
sipHash64(chain_id, tx_hash)
);
That sipHash64(chain_id, tx_hash) is your sharding key. It determines which local table receives each row based on the column values you choose. Pick columns with unbalanced value distribution and some shards will store more data than others, tanking performance.
Now queries hit the Distributed table, which parallelizes across shards and aggregates results. Inserts go through it too, routed to the correct shard. But that's one more entity to maintain. If schemas drift between local and Distributed tables, queries fail silently or return wrong results.
So here's where we land: seven physical entities for one logical table. N shards x X replicas + 1 Distributed facade. Scale that to dozens of tables and you understand why ClickHouse clusters feel fragile. You wanted a database; you got a distributed system to manage. And every hour spent on that complexity is an hour not spent on your actual product.
ALTER Hell and DROP Disasters
The N*X+1 problem gets worse the moment you need to change anything. Say you need to add a column. With a normal database, you'd run one ALTER statement and move on with your life. With ReplicatedMergeTree, you need two operations minimum. If you're unlucky, you need to fix a mess.
-- Step 1: ALTER the local tables on all shards
ALTER TABLE events_local ON CLUSTER '{cluster}'
ADD COLUMN new_field String DEFAULT '';
-- Step 2: ALTER the Distributed table separately
ALTER TABLE events
ADD COLUMN new_field String DEFAULT '';
Why separately? Because the Distributed table doesn't participate in the ON CLUSTER clause. It's not a replicated table. It's a routing layer that happens to have its own schema definition that needs to match the local tables.
The ON CLUSTER clause is supposed to make this easier. You run the command once, and ClickHouse propagates it to all shards. In practice, it has several failure modes:
- Partial application: The ALTER succeeds on some shards but fails on others. Now your cluster has an inconsistent schema.
- Timeout failures: Large clusters or slow networks cause the operation to timeout, leaving you uncertain about what actually happened.
- ZooKeeper contention: Under load, ZooKeeper can become a bottleneck, causing ALTERs to queue or fail.
When ON CLUSTER fails partway through, you're in a recovery scenario. You need to figure out which shards have the new schema and which don't. Then you need to manually apply the ALTER to the failed shards and hope nothing else changed in the meantime.
DROP operations have the same coordination problem, but the consequences differ. A Distributed table and its underlying local tables must be dropped together. Forgetting either one leaves your cluster in an inconsistent state.
-- Forgetting the local table
DROP TABLE events ON CLUSTER '{cluster}';
-- Result: Queries fail immediately with "Table doesn't exist" and space is still occupied on disk
-- Forgetting the Distributed table
DROP TABLE events_local ON CLUSTER '{cluster}';
-- Result: Distributed table still exists but queries fail with "Table events_local doesn't exist" on each shard
TRUNCATE on large tables is its own adventure. ReplicatedMergeTree stores data in parts, and truncating a table with thousands of parts can take hours. During that window, the table is in a weird intermediate state. We learned to get creative: sometimes it's faster to create a new table, swap the names, and drop the old one than to truncate in place.
Reingestions are the worst. Fix a pipeline bug, backfill a new field, correct bad data; you're deleting terabytes and reinserting them. The replication queue backs up as every replica syncs deletions, then inserts. At our scale, a full reingestion took days. The entire cluster slowed down and queries timed out. And if something failed halfway through? Partial data across replicas with no clean recovery. Start from scratch.
Every operation became a multi-step ceremony with rollback plans. We documented the exact sequence, the verification queries to run between each step, and the recovery procedures for partial failures. This documentation was longer than most feature specs.
ZooKeeper: The Hidden Tax
ReplicatedMergeTree doesn't just add complexity to your ClickHouse cluster. It adds an entirely separate component you need to run, monitor, and debug.
ZooKeeper is the coordination layer. It's what makes the "Replicated" part of ReplicatedMergeTree work. Every replicated table registers itself in ZooKeeper. Every replica tracks its state there. The replication queue lives there. Leadership elections happen there.
Here's what ZooKeeper actually does for your ClickHouse cluster:
Part tracking: When a replica ingests data, it creates a part. ZooKeeper tracks which replicas have which parts, so other replicas know what they need to fetch.
Replication queue: When replica A has a part that replica B doesn't, ZooKeeper queues a fetch task. Replica B pulls the part from A. The queue tracks progress, retries, and failures.
Leadership: For operations that need coordination (like merges), ZooKeeper handles leader election. Only one replica merges at a time to avoid conflicts.
Metadata: Table schemas, partition information, cluster topology. All stored in ZooKeeper.
This is elegant distributed systems design. It's also another component to run and a single point of failure.
ZooKeeper needs its own high-availability setup. You need at least three ZooKeeper nodes for quorum. They need to be on separate machines for fault tolerance. They need monitoring and backup procedures. They need capacity planning as your ClickHouse cluster grows.
And they have their own failure modes:
Session expiration: If a ClickHouse shard loses connection to ZooKeeper for too long, its session expires. The shard thinks it's still part of the cluster, but ZooKeeper disagrees. Recovery means restarting services and hoping the replication queue catches up.
Queue backlog: Under heavy write load, the replication queue can grow faster than replicas can process it. Parts pile up. Replicas fall behind. Query results become inconsistent depending on which replica you hit. Background tasks stack.
Split brain scenarios: Network partitions between ZooKeeper nodes can cause quorum issues. ClickHouse shards can't write because they can't coordinate. Your cluster is technically up but functionally down.
I've spent more hours debugging ZooKeeper issues than I've spent debugging actual ClickHouse queries. You're no longer building a product. You're operating a distributed system. The database gave you primitives; you built the coordination layer yourself.
At 3 AM, when a client is complaining about missing data, the last thing you want to hear is "the replication queue is backed up." But that's what running ReplicatedMergeTree at scale means. You become a ZooKeeper expert whether you wanted to or not.
The Tooling Attempt
We didn't accept this complexity quietly. We built tooling to manage it. The first layer was a CLI. It abstracted the dual table creation pattern. Execute a single command and it would generate both the _local and Distributed tables. It handled the ON CLUSTER clauses. It managed the sharding key configuration. It verified that all shards received the schema before reporting success.
# Simplified version of what the abstraction layer looked like
class ClickHouseTableManager:
def create_table(self, name: str, schema: Schema) -> Result:
# Step 1: Create local table on cluster
local_result = self.execute_on_cluster(
f"CREATE TABLE {name}_local {schema.to_ddl()} "
f"ENGINE = ReplicatedMergeTree(...)"
)
if not local_result.all_succeeded():
return self.rollback_partial_create(name, local_result)
# Step 2: Verify all shards have the table
verification = self.verify_table_exists_all_shards(f"{name}_local")
if not verification.passed():
return self.rollback_and_report(name, verification)
# Step 3: Create distributed table
dist_result = self.execute(
f"CREATE TABLE {name} AS {name}_local "
f"ENGINE = Distributed(...)"
)
if not dist_result.succeeded():
return self.rollback_distributed_failure(name, dist_result)
# Step 4: Final verification
return self.verify_full_setup(name)
The second layer was ClickHouse skills for our AI agents. We wanted Claude to be able to create tables, run migrations, set up real-time pipelines, and test queries. The skills wrapped the CLI, adding natural language understanding on top of the abstraction.
It helped and our error rate dropped. New team members could work with ClickHouse without understanding the N+1 pattern. The happy path was genuinely happy. But distributed systems have a lot of unhappy paths.
The first edge case was partial ALTER failures. The ON CLUSTER command would timeout on one shard but succeed on others. The CLI detected this, but what's the recovery? You can't just retry on the failed shard because maybe it actually succeeded and the acknowledgment was lost. You need to check the schema on every shard, compare them, and figure out what to do.
The second edge case was DROP cascading incorrectly. A materialized view referenced a table we were dropping. The DROP succeeded but the materialized view was now broken. The CLI didn't track view dependencies. We added that. Then we found views that referenced views.
The third edge case was replica drift. After a network partition, two replicas would have divergent data. ZooKeeper showed them as healthy. Queries returned different results depending on which replica you hit. The CLI had no way to detect this because from its perspective, everything was fine.
Every bug we fixed revealed another one we hadn't considered. The tooling got more complex. We added retry logic with exponential backoff. We added schema comparison utilities. We added health checks that queried every replica and compared results. We added rollback procedures that could undo partial operations. We added logging so detailed it generated gigabytes per day.
The CLI grew to thousands of lines. The AI skills became brittle because they depended on the CLI behaving predictably, and the CLI was trying to handle unpredictable distributed system behavior.
We were building a distributed systems framework on top of a database that was supposed to handle distribution for us.
The Architecture Insight
The realization hit during yet another debugging session. A failed ALTER had left two shards with different schemas. The CLI detected it but couldn't automatically resolve it because we had concurrent writes happening. We needed to stop writes, fix the schema, verify consistency, and resume. This was the third time that month.
The problem wasn't our tooling. It was the architecture. ReplicatedMergeTree requires you to manage distribution at the application layer. The database gives you primitives: parts, replication queues, ZooKeeper coordination. You're responsible for assembling those primitives into a working distributed system. You handle sharding, consistency, and failure recovery.
No amount of tooling can fix an architecture mismatch. You can smooth the rough edges, automate the happy path, handle some failure cases. But you can't change the fact that you're operating a distributed system that happens to use ClickHouse for storage.
The fix was architectural: decouple storage and compute. Instead of each replica holding its own copy of the data, all compute nodes read from the same storage backend. No local replicas, no replication queues, no ZooKeeper coordination for data consistency. One table definition, one ALTER statement, one source of truth.
So we built it.
Cloud-Native MergeTree
Cloud-native MergeTree changes how ClickHouse works. Instead of distributing data across nodes that each maintain their own copy, it centralizes data in object storage and makes compute nodes stateless.
-- One table. That's it. No _local suffix. No Distributed facade.
CREATE TABLE events (
chain_id UInt32,
block_number UInt64,
tx_hash String,
timestamp DateTime,
event_type LowCardinality(String),
payload String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (chain_id, block_number, tx_hash);
Same logical table, but now it's actually one table. Not seven.
Every compute node connects to the same S3-compatible storage backend. When a query comes in, the node reads the relevant parts directly from storage, processes them, and returns results. No local data storage beyond caching. No synchronization between nodes. Just read and compute.
With ReplicatedMergeTree, shards coordinated because each held different copies of data. ZooKeeper tracked who had what and replication queues ensured eventual consistency. With cloud-native MergeTree, there's nothing to coordinate. Every node sees the same data because they're all reading from the same place.
ALTERs become one statement, no ON CLUSTER. No dual operations. The schema lives in one place.
ALTER TABLE events ADD COLUMN new_field String DEFAULT '';
Adding capacity goes from days to minutes. With ReplicatedMergeTree, adding a shard meant copying 25TB+ of data. With cloud-native MergeTree, the new node connects to the shared storage and starts serving queries immediately.
We built this against S3-compatible storage allowing compute to scale independently from storage. It took months to validate at production scale, but we got there.
The Migration and Deletion
The migration was anticlimactic. New cluster alongside the old one, data pipelines pointed at both, validated identical results, switched over.
What happened next was more satisfying. The entire abstraction layer—dual table creation, dual ALTERs, shard routing, partial failure recovery—was deleted. Two thousand lines of code that existed only because the architecture required it.
# Before: 2,000 lines handling distributed complexity
def create_table(self, name, schema):
local_result = self.execute_on_cluster(...)
if not local_result.all_succeeded():
return self.rollback_partial_create(name, local_result)
# ... dozens more lines of edge case handling
# After: ~200 lines
def create_table(self, name, schema):
return self.execute(f"CREATE TABLE {name} ... ENGINE = MergeTree()")
The AI skills we built for Claude actually work now. Adding a column used to mean orchestrating a multi-step process with verification and rollback. Now it's one statement.
Conclusion
This wasn't about saving money, though we did. It was about eliminating an entire category of problems.
Every hour debugging replication issues, every edge case in the CLI, every partial ALTER failure—that time compounds. The new architecture doesn't eliminate problems. It eliminates the problems that come from managing distribution at the application layer. What remains is data modeling, query optimization, and business logic. Those are problems worth having.
We could have kept running ReplicatedMergeTree. It worked. But "working" isn't the bar when you're building for the long term. We wanted something where the operational surface matched the logical model. One table should mean one table.
We're opening it up: ObsessionDB.
The complexity wasn't the price of scale. It was the cost of an architecture that made you responsible for problems the database should have solved.
Originally written for obsessionDB. Read the original article here.