How Apache Iceberg Achieves Atomic All-or-Nothing Writes
How Apache Iceberg Achieves Atomic All-or-Nothing Writes
The Problem: Distributed Data Consistency
Writing data reliably across distributed systems is hard. In traditional Hive-based data warehouses, the write process breaks down into two distinct phases:
- Write phase: Data files land in object storage (S3, HDFS, etc.)
- Metadata update phase: The Hive Metastore is updated to reflect the change
If a writer crashes between these phases, the system enters an inconsistent state. Data files exist but the metastore doesn’t know about them. Readers might see stale data or no data at all. It’s a nightmare.
Apache Iceberg solves this with atomic pointer swaps — a technique database engineers have used for decades.
The Core Insight: The Pointer Swap (Atomic Commit)
Restaurant Menu System Analogy
Imagine managing a restaurant’s menu:
The problem: You print 500 new menus with updated prices. Customers arrive while you’re replacing old menus with new ones. Some see old prices, some see new prices. If interrupted, the restaurant is in chaos.
The Iceberg solution: Print all new menus in a back room. Keep one master menu at the front desk. When ready, swap the master menu once. All customers see the consistent new menu immediately. If you crash before the swap, the old menu stays intact.
Iceberg uses the same principle: metadata is the single source of truth.
Iceberg’s Atomic Write Architecture
Step 1: Write Data Files (Invisible)
The writer creates new data files in object storage:
s3://my-bucket/warehouse/
├── data/
│ ├── [existing parquet files]
│ ├── 00001-abc123.parquet ← NEW (invisible)
│ └── 00002-def456.parquet ← NEW (invisible)
Here’s the key: readers can’t see these files yet. They’re not referenced in any active metadata, so as far as the table is concerned, they don’t exist.
Step 2: Create Metadata File (Still Invisible)
The writer creates a new metadata file that describes the commit. This file contains:
- Which data files belong to this commit
- Schema information
- Statistics (row count, file size, null counts)
- Partition information
- Reference to the previous metadata file
{
"format-version": 2,
"metadata-location": "s3://bucket/v2.metadata.json",
"last-updated-ms": 1702156800000,
"last-column-id": 10,
"schema": { "fields": "..." },
"current-snapshot-id": 5,
"snapshots": [
{
"snapshot-id": 5,
"timestamp-ms": 1702156800000,
"summary": {
"operation": "append",
"added-data-files": 2,
"added-rows": 150000
},
"manifest-list": "s3://bucket/v2.manifest-list.avro"
}
],
"refs": {
"main": {
"snapshot-id": 5
}
}
}
Again, this file is invisible. It’s not yet the active snapshot, so readers don’t know about it.
Step 3: The Atomic Pointer Swap
This is the critical operation. Iceberg updates a version-hint file (or uses optimistic concurrency control) to atomically update the table’s reference from old metadata to new metadata.
The pointer file (typically version-hint.text or database-backed) changes from:
v1.metadata.json
to:
v2.metadata.json
This single atomic operation is the transaction boundary. Nothing else matters — this is it.
Mermaid representation:
graph LR
A["🔒 Version Hint File<br/>(Points to v1.metadata.json)"]
B["📝 Writer Creates<br/>Data Files<br/>(Invisible)"]
C["📋 Writer Creates<br/>v2.metadata.json<br/>(Invisible)"]
D["🔄 Atomic Swap<br/>Version Hint<br/>v1 → v2"]
E["➜ Readers See<br/>New Snapshot<br/>(Consistent)"]
A -->|"Before Write"| B
B --> C
C --> D
D --> E
Step 4: Readers Discover New Snapshot
When a reader queries the table, it:
- Checks the version hint file to find the current metadata location
- Reads that metadata file (e.g.,
v2.metadata.json) - Discovers the list of data files to read
- Reads those data files
The reader sees a consistent snapshot because the metadata points to actual data files that exist.
Fault Tolerance Scenarios
Scenario 1: Writer Crashes Before Atomic Swap
Timeline:
Time Event
---- -----
T1 Writer creates 00001-abc123.parquet
T2 Writer creates 00002-def456.parquet
T3 Writer creates v2.metadata.json
T4 ⚡ CRASH before swapping version hint
Result:
- Data files exist in S3 but are orphaned (not referenced)
- Metadata file exists but is not the active snapshot
- Version hint still points to v1.metadata.json
- Readers continue reading v1 (old data) — consistency is maintained
- Orphaned files can be cleaned up by a garbage collector
Scenario 2: Pointer Swap Itself Fails (Conflict)
If two writers attempt concurrent commits:
Writer A Writer B
-------- --------
Creates v2.metadata.json
Creates v3.metadata.json (concurrent)
Attempts atomic swap
Read version hint: v1
CAS (compare-and-swap) Attempts atomic swap
✅ SUCCESS (swap v1→v2) Read version hint: v1
CAS fails (already v2)
❌ CONFLICT
Result:
- Writer A succeeds, snapshot is now v2
- Writer B fails with a conflict exception
- Writer B must retry from scratch (read current snapshot, recompute, attempt commit again)
- Readers see one consistent snapshot — no partial writes
Iceberg’s conflict resolution is database-like: last write wins (or writer must retry).
How Iceberg Replaces Hive Metastore
Traditional Hive:
ALTER TABLE my_table ADD PARTITION (year=2024, month=12)
LOCATION 's3://bucket/year=2024/month=12'
This updates the Hive Metastore with a new partition pointer.
Iceberg replaces this:
Hive Metastore now only points to Iceberg’s metadata location (e.g., s3://bucket/metadata/v2.metadata.json).
The actual table structure, files, and snapshots are entirely managed by Iceberg, not Hive.
Hive Metastore:
┌─────────────────────────────┐
│ Table: my_table │
│ Location: s3://bucket │
│ Current Version: v2 │ ← Atomic pointer
└─────────────────────────────┘
↓
v2.metadata.json (Iceberg manages)
↓
┌─────────────────────────────┐
│ Snapshot #5 │
│ - manifest-list.avro │
│ - Data files: [...] │
│ - Statistics │
└─────────────────────────────┘
Atomicity Guarantees in Detail
Visibility Guarantees
Before atomic swap:
graph LR
subgraph "Invisible"
A["Data Files<br/>(new)"]
B["v2.metadata.json"]
end
subgraph "Readers See"
C["v1.metadata.json<br/>(old data)"]
end
A -.-> B
C -->|Points to| D["Old Data Files"]
After atomic swap:
graph LR
subgraph "Now Visible"
A["Data Files<br/>(new)"]
B["v2.metadata.json"]
end
subgraph "Still Accessible"
C["v1.metadata.json<br/>(time-travel)"]
end
B -->|Points to| A
A -.-> D["(Orphaned old files)"]
Write Isolation
No writer can see another writer’s uncommitted changes:
Writer A (Session 1) Writer B (Session 2)
───────────────── ─────────────────
Read v1.metadata.json
Compute changes
Write data files
Create v2.metadata.json
Read v1.metadata.json (v1 still current)
Attempt atomic swap
✅ Swap succeeded Readers still see v1
(snapshot is now v2)
Compute changes
Write data files
Create v3.metadata.json
Attempt atomic swap
✅ Swap succeeded
(snapshot is now v3)
Key insight: Each writer operates on a snapshot, not on live data.
Why This Matters for Data Engineering
Consistency Without Locks
Iceberg achieves serializability without pessimistic locking (no reader/writer locks).
Instead, it uses optimistic concurrency control:
- Writers assume their compute will succeed
- At the end, they detect conflicts during atomic swap
- On conflict, they retry
This is more efficient for distributed systems where lock timeouts are problematic.
S3-Safe Atomicity
Traditional distributed databases require consensus protocols (Raft, Paxos) or global locks.
Iceberg uses only:
- Atomic file writes (S3 supports this with
PutObject) - Atomic metadata updates (version hint file or database transaction)
No distributed consensus needed!
Support for Time Travel
Because metadata files are immutable and versioned, readers can query old snapshots:
// Read current version
spark.read.iceberg("my_table").show()
// Read as of specific time
spark.read.iceberg("my_table").asOfTimestamp("2024-12-01T10:00:00Z").show()
// Read specific snapshot
spark.read.iceberg("my_table").option("snapshot-id", 3).show()
Summary: The Atomic Guarantee in One Diagram
graph TD
A["📝 Write Phase<br/>(Data + Metadata)"] -->|"Invisible to readers"| B["🔲 Uncommitted State"]
B -->|"One Atomic Operation"| C["🔄 Atomic Pointer Swap<br/>(Version Hint)"]
C -->|"Success"| D["➜ New Snapshot Visible<br/>(All-or-Nothing)"]
C -->|"Failure/Conflict"| E["⚠ Old Snapshot Unchanged<br/>(Automatic Rollback)"]
E -->|"Retry"| A
Streaming Checkpoints vs. Iceberg Commits
The Old Problem: Offset Management Without ACID
Traditional streaming systems (Kafka + Hive/Parquet) had a critical problem:
Checkpoint stored: offset = 1000
Batch processing:
1. Read records 0-999 from Kafka
2. Write data files to S3
3. Update offset to 1000 in checkpoint
If crash at step 2: data written but offset not updated → duplicate reads
If crash at step 3: offset updated but data lost → data gaps
Two independent operations, two points of failure.
How Iceberg Solves This: Atomic Metadata + Offsets
With Iceberg streaming, the checkpoint location stores both:
Checkpoint directory:
├── offsets/
│ └── 0 (offset = 1000, batch 5)
├── v1.metadata.json (old snapshot)
├── v2.metadata.json (new snapshot with data from batch 5)
└── ...
The key difference: Offset and commit are in the same atomic operation.
// Pseudo-code: Iceberg streaming commit
commitBatch(batchId: Long, records: DataFrame, maxOffset: Long):
// Step 1: Write data (invisible)
manifestList = writeDataFiles(records)
// Step 2: Create new metadata with manifest pointer (still invisible)
newMetadata = createMetadata(manifestList)
// Step 3: ATOMIC - Update pointer AND offset in one operation
atomicSwap(
oldMetadata = v1.metadata.json,
newMetadata = v2.metadata.json,
offsetCheckpoint = maxOffset
)
Either both succeed or both fail — no middle ground.
What Happens During a Single Batch Failure
Scenario: Stream Crashes Mid-Batch
Timeline:
T1 Batch 5 starts, read records 950-1050
T2 Batch 5 writes 50 parquet files (invisible)
T3 Batch 5 creates v5.metadata.json (still invisible)
T4 ⚡ CRASH during atomic swap to v5.metadata.json
T5 Process restarts
Result:
- v4.metadata.json still active (readers see up to record 949)
- Batch 5 data files are orphaned in S3 (unreferenced, invisible)
- Offset checkpoint still at 950
On restart:
1. Read checkpoint: last committed offset = 950
2. Kafka seek to offset 950
3. Batch 5 retries: read records 950-1050
4. Write new parquet files (different file names than before)
5. Atomic swap succeeds this time
6. Readers now see data through record 1050
Zero data loss. Zero duplicates. Even if the crash happened 100 times, restart always gets it right.
What Happens During Full Application Restart
Scenario: Entire Streaming App Crashes
Timeline:
T1 App running: batches 1-50 already committed
T2 Batch 51: read 5000 records, write 100 parquet files
T3 Batch 52: read 5000 records, write 100 parquet files
T4 ⚡ ENTIRE APP CRASHES (both batches mid-flight)
- Batch 51 data: ORPHANED (no v51.metadata.json)
- Batch 52 data: ORPHANED (no v52.metadata.json)
- Checkpoint: still at batch 50, offset 50000
T5 App restarts after 1 hour
Result:
- Only v50.metadata.json is active
- Readers see data through offset 50000
- Batches 51-52 data sits invisible in S3
On restart:
1. Last committed snapshot = 50, offset = 50000
2. Kafka seek to offset 50000
3. Batch 51 starts fresh (with new file names)
4. Batch 52 starts fresh
5. Both process independently (no conflict with orphaned files)
6. Both eventually commit successfully
Old systems would:
- Have half-written data interleaved with offsets
- Not know if records 50000-55000 were processed or not
- Risk re-reading duplicates or skipping records on restart
Iceberg guarantees:
- Exactly which records were committed (checkpoint offset)
- Exactly which data belongs to those records (metadata snapshot)
- Orphaned files never interfere with new writes
Multi-Sink Streaming (The Hard Case)
Scenario: Write to 5 Sinks in One Batch
In older Spark Structured Streaming, this was problematic:
// BAD: Spark writeStream with multiple sinks
parsed.writeStream.to("sink1").start() // Query 1
parsed.writeStream.to("sink2").start() // Query 2
parsed.writeStream.to("sink3").start() // Query 3
parsed.writeStream.to("sink4").start() // Query 4
parsed.writeStream.to("sink5").start() // Query 5
Each is independent:
- Sink 1 commits batch 5 at offset 5000 ✓
- Sink 2 commits batch 5 at offset 5001 (one behind!)
- Sink 3 crashes mid-batch, commits fail
- Sink 4 retries from offset 4999 (duplicate!)
- Sink 5 never sees batch 5
Different sinks see different versions of the data. Chaos.
Iceberg Solution: Single Checkpoint, Multiple Outputs
// GOOD: Iceberg foreachBatch with atomic commit
stream.writeStream.foreachBatch { (batchDF, batchId) =>
// Single read from Kafka, single parse
val parsed = transformOnce(batchDF)
// Split to multiple outputs
val sink1Data = parsed.select("field1", "field2")
val sink2Data = parsed.select("field3", "field4")
val sink3Data = parsed.select("field5", "field6")
val sink4Data = parsed.select("field7", "field8")
val sink5Data = parsed.select("field9", "field10")
// All writes happen in single batch
sink1Data.write.mode("append").saveToIceberg("table1")
sink2Data.write.mode("append").saveToIceberg("table2")
sink3Data.write.mode("append").saveToIceberg("table3")
sink4Data.write.mode("append").saveToIceberg("table4")
sink5Data.write.mode("append").saveToIceberg("table5")
}
If any sink fails:
Sink 1: writes succeed (invisible)
Sink 2: writes succeed (invisible)
Sink 3: ⚡ CRASH before atomic metadata swap
Sink 4: writes pending
Sink 5: writes pending
Result:
- NONE of the tables are updated
- All 5 are still at their previous snapshot
- Checkpoint still at batch 4
- On restart: batch 5 retries
- Either ALL 5 tables get batch 5, or NONE do
This is atomicity across multiple tables — impossible before Iceberg.
Checkpoint Location Structure
Understanding what Iceberg stores in the checkpoint directory:
s3://my-bucket/warehouse/my_table/.iceberg/
├── metadata/
│ ├── v1.metadata.json
│ ├── v2.metadata.json
│ ├── v3.metadata.json
│ └── ... (immutable, versioned history)
│
├── version-hint.text
│ └── Contains: "3" (current version is v3.metadata.json)
│
└── offsets/
├── 0 (batch 1: offset = 100)
├── 1 (batch 2: offset = 200)
├── 2 (batch 3: offset = 300)
└── 3 (batch 4: offset = 400)
Key insight: Offsets are tied to metadata versions by batch ID.
When Iceberg atomically swaps metadata, it simultaneously writes the offset:
Before commit: version-hint = "v2", offsets/2 = 300
After commit: version-hint = "v3", offsets/3 = 400
Recovery: read version-hint = "v3", lookup offsets/3 = 400
Result: Know exactly which Kafka offsets are in v3
The Actual Checkpoint Directory on Disk
Here’s what your streaming checkpoint looks like after 4 successful batches:
.checkpoint/
├── offsets/
│ ├── 0
│ │ └── contents: 100
│ ├── 1
│ │ └── contents: 200
│ ├── 2
│ │ └── contents: 300
│ └── 3
│ │ └── contents: 400
│
├── commits/
│ ├── 0
│ │ └── contents: v1.metadata.json (snapshot 1)
│ ├── 1
│ │ └── contents: v2.metadata.json (snapshot 2)
│ ├── 2
│ │ └── contents: v3.metadata.json (snapshot 3)
│ └── 3
│ │ └── contents: v4.metadata.json (snapshot 4)
│
└── batchMetadata/
├── 0.json → {"batchId": 0, "offset": 100, "timestamp": ...}
├── 1.json → {"batchId": 1, "offset": 200, "timestamp": ...}
├── 2.json → {"batchId": 2, "offset": 300, "timestamp": ...}
└── 3.json → {"batchId": 3, "offset": 400, "timestamp": ...}
Each batch creates:
- An offset file - stores the max Kafka offset processed
- A commit file - stores the metadata snapshot version that was committed
- Batch metadata - tracks which records went into which snapshot
Recovery Scenario: Offset 23, Last Commit 22
Let me show what happens on restart when a crash occurs mid-batch:
graph TD
A["⚡ CRASH During Batch 23<br/>Data written to S3<br/>But commit not saved"] -->|"Process restarts"| B["Read Last Committed State"]
B -->|"Check offsets/22"| C["Offset = 500<br/>Commit = v23.metadata.json<br/>Last successful batch"]
C -->|"Check for offsets/23"| D["File NOT FOUND<br/>Batch 23 was in flight"]
D -->|"Kafka seek to offset 500"| E["Read from Kafka<br/>offset 500 onward"]
E -->|"Process batch 23 again"| F["New data files written<br/>Same content, new file names"]
F -->|"Atomic swap"| G["version-hint → v24.metadata.json<br/>Save offsets/23 = 600<br/>Save commits/23 = v24.metadata.json"]
G -->|"Success"| H["Table now shows<br/>Snapshot 24<br/>Records 0-599"]
Detailed view of what’s happening:
On Crash at Batch 23:
══════════════════════
Checkpoint state on disk:
├── offsets/22 → 500 ← Last committed offset
├── commits/22 → v23.metadata.json
└── offsets/23 → DOES NOT EXIST (batch 23 in flight)
S3 state:
└── s3://bucket/warehouse/data/
├── batch-22-part-0.parquet (committed)
├── batch-22-part-1.parquet (committed)
├── batch-23-part-0.parquet (⚡ ORPHANED, not in metadata)
└── batch-23-part-1.parquet (⚡ ORPHANED, not in metadata)
Version hint still points to v23.metadata.json (not updated)
On Restart:
═══════════
1. Read checkpoint directory:
max_committed_batch = 22
last_offset = 500
2. Check: does offsets/23 exist?
NO → batch 23 was never committed
3. Kafka seek(500):
Start reading from offset 500 onward
4. Batch 23 processing (SECOND ATTEMPT):
Read Kafka records 500-599 (same as first attempt)
Write new parquet files:
- batch-23-part-0.parquet (new file names with deterministic hash)
- batch-23-part-1.parquet
5. Atomic commit:
├─ Write v24.metadata.json (includes new batch-23 files)
├─ Save offsets/23 → 600
├─ Save commits/23 → v24.metadata.json
└─ Update version-hint → v24.metadata.json
6. Result:
├─ Readers see snapshot 24
├─ Table contains records 0-599
└─ Orphaned batch-23 files from first attempt are garbage collected
This is the exactly-once guarantee in action.
Multi-Batch Restart: What If Batches 22, 23, 24 Were In-Flight
graph TD
A["Batch 21: Committed<br/>offset = 500"] -->|"Batch 22"| B["Data written<br/>Metadata created"]
B -->|"Batch 23"| C["Data written<br/>Metadata created"]
C -->|"Batch 24"| D["Data written<br/>⚡ FULL APP CRASH"]
D -->|"Restart reads<br/>offsets/21 = 500<br/>Last committed"| E["Kafka seek(500)"]
E -->|"Reprocess from 500"| F["Batch 22 retries<br/>New files, same content<br/>Commits: offsets/22 = 550"]
F -->|"Continue"| G["Batch 23 retries<br/>New files, same content<br/>Commits: offsets/23 = 600"]
G -->|"Continue"| H["Batch 24 retries<br/>New files, same content<br/>Commits: offsets/24 = 650"]
H -->|"Result"| I["All previous attempts orphaned<br/>Batches 22-24 successfully committed<br/>Table snapshot = 25"]
The Commit Directory Deep Dive
After 25 batches, your commit history looks like:
commits/
├── 0 → v1.metadata.json
├── 1 → v2.metadata.json
├── 2 → v3.metadata.json
...
├── 21 → v22.metadata.json
├── 22 → v23.metadata.json (batch 22 created snapshot 23)
├── 23 → v24.metadata.json (batch 23 created snapshot 24)
├── 24 → v25.metadata.json (batch 24 created snapshot 25)
└── ... (continuing up to current)
Each commit file contains:
{
"batch-id": 23,
"snapshot-id": 24,
"metadata-version": "v24.metadata.json",
"kafka-offset": 600,
"timestamp": "2024-12-09T10:15:32Z",
"data-files": [
"batch-23-part-0-abc123.parquet",
"batch-23-part-1-def456.parquet"
],
"row-count": 100000,
"status": "committed"
}
When a stream restarts:
- Find the highest batch ID in the
commits/directory - That tells us which Kafka offset to seek to
- Continue from there with the next batch
Failover Scenarios: Iceberg vs. Previous Tech
Scenario 1: Single Executor Fails in Multi-Executor Batch
Old tech (Spark + Hive partitions):
Executor 1: writes records 0-2500 to s3://bucket/part-0.parquet ✓
Executor 2: writes records 2500-5000 to s3://bucket/part-1.parquet ✓
Executor 3: ⚡ CRASHES writing records 5000-7500
part-2.parquet is half-written (corrupt)
Offset committed to Hive: records 0-7500
Result:
- Records 0-5000 readable
- Records 5000-7500 unreadable (corrupt file)
- Offset says 7500 are safe (LIE)
- Next batch starts from 7500, so 5000-7500 are skipped forever
Iceberg:
Executor 1: writes 0-2500, creates manifest entry (invisible)
Executor 2: writes 2500-5000, creates manifest entry (invisible)
Executor 3: ⚡ CRASHES mid-write
All executor outputs are invisible (not in metadata yet)
Atomic swap fails (coordinator sees incomplete manifest)
Result:
- Metadata still points to previous snapshot
- Offset still at previous batch
- On restart: batch retries completely
- No data loss, no corruption, no skipped records
Scenario 2: Network Partition During Commit
Old tech:
Commit phase:
1. Update Hive Metastore: ALTER TABLE ADD PARTITION
- Network times out
- Metastore sees partial update
Readers see: inconsistent partition list
Some readers: see partition
Some readers: don't see partition
Iceberg:
Commit phase:
1. Atomic swap: write v3.metadata.json, update version-hint atomically
- S3 network partitions
- Some metadata files written, but version-hint swap fails
Result:
- v3.metadata.json orphaned in S3 (unreferenced)
- version-hint still points to v2.metadata.json
- ALL readers: see v2.metadata.json (consistent)
- On network recovery: coordinator retries swap
- Either succeeds and v3 becomes visible
- Or times out and coordinator rolls back
- Never a "partially visible" state
Scenario 3: Coordinator Crashes After Metadata Swap, Before Writing Offset
Old tech (hard to reason about):
1. Data written to multiple locations
2. Hive metastore updated
3. Coordinator crashes before offset checkpoint
4. Restart reads offset = old_value
5. Re-processes same batch
6. Data duplicated in table
Iceberg:
1. Data written (invisible)
2. Metadata created (invisible)
3. Atomic swap: version pointer AND offset updated together
4. Coordinator crashes after swap
5. Restart reads version-hint = v3, offset = 400
6. Both are consistent
7. Re-reads from offset 400, produces same data
8. Data is idempotent on re-commit (same records, same file names)
The atomic swap means metadata and offset are never out of sync.
Idempotency: The Secret Weapon
Iceberg’s streaming is idempotent by design. If a batch runs twice:
Batch 5 (first run):
- Kafka offset 500-599
- Generates file: 2024-12-09-batch-5-part-0.parquet
- Writes to Iceberg table
- Metadata snapshot includes this file
Batch 5 (second run, after restart):
- Kafka offset 500-599 (same offsets!)
- Generates file: 2024-12-09-batch-5-part-0.parquet (same file name!)
- Attempts write to Iceberg table
Result:
- Iceberg detects: file already in metadata
- Skips duplicate write
- Commit succeeds (no error)
This is why Iceberg is safer than Delta Lake or Hive for streaming:
- Delta: requires expensive merges to handle duplicates
- Hive: has no deduplication (data gets duplicated)
- Iceberg: atomic writes prevent duplicates at the source
Summary: Checkpoint Consistency with Iceberg
┌─────────────────────────────────────────────────┐
│ Streaming Checkpoint at Atomic Swap │
├─────────────────────────────────────────────────┤
│ │
│ Before swap: │
│ ├─ Checkpoint: offset = 1000, batch = 5 │
│ └─ Data: invisible in S3 │
│ │
│ During swap (atomic): │
│ ├─ Write version-hint ← v6.metadata.json │
│ ├─ Write offsets/5 ← 1000 │
│ └─ Both succeed or both fail (no middle state) │
│ │
│ After swap: │
│ ├─ Checkpoint: offset = 1000, batch = 5 │
│ ├─ Data: readers now see records 0-999 │
│ └─ Guaranteed: offset = records visible │
│ │
└─────────────────────────────────────────────────┘
Key differences from previous tech:
| Aspect | Spark + Hive | Delta Lake | Iceberg |
|---|---|---|---|
| Data write & offset atomic? | No | Merge needed | Yes |
| Multi-sink consistency? | No | Partial | Yes |
| Orphaned files cleanup? | Manual | Eventual | Automatic |
| Duplicate handling on retry? | Duplicates occur | Must dedupe | Idempotent |
| Checkpoint-to-data drift? | Common | Possible | Never |
Key Takeaways
- Iceberg separates data writes from metadata commits
- Data files written first (invisible)
- Metadata created next (still invisible)
- Atomic pointer swap last (single source of truth)
- The version hint file is the transaction boundary
- Changing this pointer atomically is the entire transaction
- No partial writes visible to readers
- Faults before the swap are safe
- Data files become orphaned but are invisible
- Readers see consistent old snapshot
- Cleanup jobs later remove orphaned files
- Conflicts during swap are detected and handled
- CAS (compare-and-swap) ensures only one writer wins
- Losers see a conflict exception and retry
- No reader ever sees a partial write
- This enables true ACID semantics on object storage
- Without distributed consensus
- Without global locks
- With time-travel as a bonus feature
- Streaming checkpoints are atomic with data commits
- Offset and metadata version always in sync
- Batch retries are safe and idempotent
- Multi-sink writes are all-or-nothing
- Failover scenarios are robust
- Single executor crashes: no data loss
- Full app crashes: batch retries cleanly
- Network partitions: metadata stays consistent
- Duplicates on retry: handled automatically
Exactly-Once Semantics: How Iceberg and Spark Structured Streaming Achieve It
The gold standard for streaming systems is exactly-once semantics: each record is processed and written exactly one time, no duplicates, no skips.
The Challenge
Most systems achieve only at-least-once or at-most-once:
- At-least-once: If a crash happens, replay the batch. Problem: duplicates if offset was updated before crash
- At-most-once: Don’t replay, skip it. Problem: data loss
Achieving exactly-once requires coordinating three operations atomically:
- Read from source (Kafka) at offset X
- Write data to sink (S3/Iceberg)
- Commit offset X
If any fails, all must fail. If any succeeds, all must succeed.
Iceberg’s Exactly-Once Architecture
graph TD
A["Batch N Starts<br/>Read Kafka offsets 500-599"] -->|"Transform & Write"| B["Data Files Created<br/>S3: batch-N-part-*.parquet"]
B -->|"Invisible to readers"| C["Metadata File Created<br/>v100.metadata.json<br/>References batch N files"]
C -->|"Still invisible"| D["Atomic Swap Point"]
D -->|"All-or-Nothing"| E["THREE Operations Atomically:<br/>1. Update version-hint → v100<br/>2. Save offsets/N → 599<br/>3. Save commits/N → v100.metadata.json"]
E -->|"Success"| F["Readers see batch N data<br/>Offset checkpoint updated<br/>Metadata locked"]
E -->|"Failure"| G["Nothing changes<br/>offsets/N file never created<br/>version-hint still old"]
G -->|"Retry"| A
The key is: offset is written in the same atomic operation as metadata.
Checkpoint Directory State Before and After Atomic Swap
BEFORE atomic swap (batch 22 in flight):
.checkpoint/
├── offsets/
│ ├── 20 → 450
│ └── 21 → 500
│
├── commits/
│ ├── 20 → v21.metadata.json
│ └── 21 → v22.metadata.json
│
└── In-Flight Data (Batch 22):
├── Data files written to S3 (orphaned)
├── v23.metadata.json created (unreferenced)
└── offsets/22 NOT YET WRITTEN
AFTER atomic swap (batch 22 committed):
.checkpoint/
├── offsets/
│ ├── 20 → 450
│ ├── 21 → 500
│ └── 22 → 550 ← APPEARED ATOMICALLY
│
├── commits/
│ ├── 20 → v21.metadata.json
│ ├── 21 → v22.metadata.json
│ └── 22 → v23.metadata.json ← APPEARED ATOMICALLY
│
└── S3 readers can now see:
Snapshot v23 with batch 22 data
The critical insight: offsets/22 and commits/22 appear together, atomically, or not at all.
Recovery Decision Tree
graph TD
A["Process Restarts"] -->|"Read checkpoint"| B["Check Max Committed Batch"]
B -->|"Example: offsets/22 exists"| C["Last committed: batch 22"]
C -->|"Max Kafka offset from offsets/22"| D["offsets/22 = 550"]
D -->|"Does offsets/23 exist?"| E{File Found?}
E -->|"Yes"| F["Batch 23 already committed<br/>Seek to offsets/23<br/>Process batch 24 next"]
E -->|"No"| G["Batch 23 crashed mid-flight<br/>Kafka seek(550)<br/>Reprocess batch 23"]
G -->|"Batch 23 retry starts"| H["Read Kafka 550-599<br/>Write data (same content)<br/>Atomic swap"]
H -->|"Success"| I["offsets/23 created<br/>commits/23 created<br/>Table snapshot updated"]
Why This Is Exactly-Once (Not At-Least-Once)
The traditional streaming architecture (Spark + Kafka + Parquet + Hive):
Spark reads: offsets 0-99 ✓
Spark writes: part-0.parquet ✓
Hive metastore updates: partition added ✓
Offset checkpoint: 100 ⚡ CRASH HERE
On restart:
- Partition exists (readers see it)
- Offset is still 0 (retry logic restarts at 0)
- Result: records 0-99 READ TWICE
→ AT-LEAST-ONCE (duplicates)
Iceberg’s approach (offset + metadata atomic):
Iceberg reads: offsets 0-99 ✓
Iceberg writes: data files ✓
Iceberg writes: v1.metadata.json ✓
ATOMIC SWAP STARTS:
- Update version-hint ✓
- Write offsets/0 = 99 ✓
- Write commits/0 = v1.metadata.json ✓
ATOMIC SWAP COMPLETE
⚡ CRASH HERE (too late, already committed)
On restart:
- offsets/0 exists → batch 0 definitely committed
- commits/0 exists → snapshot definitely exists
- Kafka seek(99) → read next batch starting at 100
- Result: records 0-99 READ EXACTLY ONCE
→ EXACTLY-ONCE (no duplicates)
The Three Guarantees
Let’s visualize how each guarantee differs:
graph LR
A["Source<br/>offsets 0-99"] -->|"Read"| B["Process<br/>transform"]
B -->|"Write"| C["Sink<br/>S3 files"]
C -->|"Commit?"| D["Checkpoint<br/>offset = 99"]
style A fill:#e1f5ff
style B fill:#fff3e0
style C fill:#f3e5f5
style D fill:#e8f5e9
At-Most-Once (lose data):
Crash during write?
→ Restart seeks to offset 100
→ Records 0-99 never re-attempted
→ Data permanently lost
At-Least-Once (duplicates):
Crash after write, before checkpoint?
→ Restart seeks to offset 0 (checkpoint says 0)
→ Re-read and re-write records 0-99
→ Now in sink TWICE
→ Duplicates in table
Exactly-Once (Iceberg):
Crash any time?
→ Restart checks: is checkpoint file for this batch there?
→ If yes: offset and snapshot BOTH exist → skip to next batch
→ If no: offset file missing → this batch incomplete → retry
→ Atomic swap ensures they appear or disappear together
→ NO duplicates, NO data loss
Spark Structured Streaming’s Exactly-Once with Iceberg
Spark Structured Streaming achieves exactly-once through a similar pattern:
streamingDF.writeStream
.option("checkpointLocation", "/path/to/checkpoint")
.foreachBatch { (batchDF, batchId) =>
batchDF.write
.mode("append")
.format("iceberg")
.save("my_table")
}
.start()
Behind the scenes, Spark does:
For each micro-batch:
1. Read from Kafka (offsets tracked internally)
2. Transform (deterministic = same output if re-run)
3. Write to Iceberg table:
- Create manifest files (invisible)
- Create metadata snapshot (invisible)
4. Atomic swap (version-hint + offset + batch metadata)
5. If swap succeeds:
- Checkpoint record saved to disk
- Next batch starts from new offset
6. If swap fails:
- Checkpoint not saved
- Next restart retries this batch
- Same input → same output (idempotent)
→ Exactly-once achieved
The deterministic output is crucial. Batch N must produce identical data every time it runs with the same input offsets. Iceberg helps here because:
- File names are deterministic (based on offset range)
- Data in files is identical (same transformation)
- If retry writes same file: Iceberg deduplicates
- No double-writing
Mermaid: End-to-End Exactly-Once Flow
sequenceDiagram
participant KA as Kafka
participant S3 as S3 Storage
participant CP as Checkpoint Dir
participant RE as Readers
KA ->> S3: Batch 5: Read offsets 500-599
S3 ->> S3: Write data files (invisible)
S3 ->> S3: Create v25.metadata.json (invisible)
S3 ->> CP: Atomic Op: Save offsets/5 = 599
S3 ->> CP: Atomic Op: Save commits/5 = v25.metadata.json
CP ->> CP: Update version-hint → v25
Note over CP: All 3 writes succeeded atomically
CP ->> RE: Readers now see snapshot v25
RE ->> S3: Query returns batch 5 data
Note over KA,RE: Even if crash happens AFTER this point,<br/>offsets/5 file proves batch 5 was committed
KA ->> S3: Batch 6: Starts at offset 599
Note over KA,RE: EXACTLY-ONCE: Each record seen once
Why Not Just Use Spark + Hive?
Old architecture vs. new:
OLD (Spark + Hive Partitions):
┌──────────────────────────────────┐
│ Batch Processing │
├──────────────────────────────────┤
│ 1. Read Kafka offsets 0-99 │
│ 2. Write to s3://data/part-0 │
│ 3. INSERT INTO table PARTITION │ ← Hive metastore call
│ 4. Write offset checkpoint │ ← Separate operation
│ │
│ Problem: 2 independent ops (#3, #4)
│ Crash between them = mess │
└──────────────────────────────────┘
NEW (Spark + Iceberg):
┌──────────────────────────────────┐
│ Batch Processing │
├──────────────────────────────────┤
│ 1. Read Kafka offsets 0-99 │
│ 2. Write to s3://data/... │
│ 3. Create v5.metadata.json │
│ 4. ATOMIC: Swap version-hint + │
│ Save offsets + Save commits │
│ │
│ Advantage: Single atomic op │
│ All-or-nothing guarantee │
└──────────────────────────────────┘