Resources
  • Journal
  • R&D Columns
Adopting Apache Iceberg for Our Data Lakehouse - Part 2: Leveraging Apache Iceberg
2025.07.23

✅ Title: Adopting Apache Iceberg for Our Data Lakehouse - Part 2: Leveraging Apache Iceberg



1. Why We Designed a New Architecture with Iceberg


Iceberg is more than just a file format. It emerged to address structural limitations in legacy Hive-based table formats, such as metadata consistency issues and rigid partitioning strategies. Now adopted by large-scale analytics systems, Iceberg provides reliable transactions and high-performance querying across distributed environments, with compatibility across major open-source processing engines.


Recognizing Iceberg’s structural capabilities and potential to meet our system requirements, we began designing a new data architecture around it. Before diving into the full pipeline design, we’ll first introduce what Apache Iceberg is and what makes it technically significant.



1-1. What is Apache Iceberg?


Apache Iceberg is a large-scale analytic table format initially developed at Netflix. It was built to overcome the limitations of traditional Hive table formats and is now compatible with popular processing engines such as Spark, Trino, and Flink.


Iceberg addresses problems that commonly arise in Hadoop-based data warehouses, including file tracking difficulties, rigid partitioning, and degraded query performance. It bridges the gap between data lakes and warehouses, making lakehouse architectures more feasible.


Key Features of Iceberg

  • Schema Evolution
    Add, remove, rename columns or adjust partition strategies at the table level with ease. This flexibility reduces maintenance overhead when evolving data structures.

  • Snapshot-Based Versioning (Time Travel)
    Rather than overwriting data, Iceberg maintains snapshots upon updates. This allows rollback to specific points in time, historical data analysis, and audit trails.

  • Flexible Partitioning (Hidden Partitioning)
    Partitioning strategies can be adjusted even after table creation, without compromising query performance.

  • Metadata Indexing for Query Optimization
    Each data file contains partition info, column stats, null ratios, etc., enabling selective scanning instead of full scans. This boosts query performance even on large datasets.

  • ACID Transaction Support
    Although file-based, Iceberg provides transactional guarantees for reads and writes. Concurrent writes are conflict-resolved automatically, and only the first commit is honored. Readers can continue to query safely during batch writes.

  • Cloud-Native Design
    Optimized for object storage like S3, GCS, and MinIO.


1-2. How We Structured Our Architecture Around Iceberg




The new pipeline is structured around Iceberg and consists of the following stages:


1. Raw Data Collection: Compressed archive files are directly stored in MinIO, along with metadata such as compression type and source path at the time of ingestion.


2. File Structure Analysis and Metadata Extraction: The archive is unpacked and individual files are analyzed for directory structure, file paths, extensions, metadata, and blob/text contents. These results are saved as Parquet to Iceberg tables.


3. Parallel Processing with Spark: Multiple Spark workers simultaneously process the files and run analysis modules in parallel, storing results in Iceberg.


4. SQL-Based Query and Indexing: Analysts or applications can query the Iceberg tables directly using Spark SQL. Partitioning and column-level indexing can be applied as needed.


5. Scalable Analysis and Pipeline Reusability: The structured data can be reused across multiple pipelines. This enables downstream refinements or productized analysis on top of the same Iceberg table.



1-3. Technologies and Implementation


To convert unpacked files into an Iceberg-compatible analytical structure, we had to address several technical considerations.


Handling large volumes of unstructured data isn’t just about storing files,it requires metadata indexing, conditional filtering, and high-performance querying.


In the following sections, we explain how we leveraged Iceberg’s structural capabilities to meet these needs.



1-3-1. Table Schema Design


Our initial design used two separate tables: one for metadata and one for blob content. Since the same file might be collected from different sources, the metadata table would track each ingestion history, while the blob table deduplicated using the file’s SHA-256 hash. These tables were joined by that hash.


However, for better query efficiency and simpler management, we moved to a unified table combining metadata and blob content. This unified structure allows each file record to include source archive information, full path, parent directory, and—in the case of text files—tokenized contents stored as an array. This enables efficient queries that retrieve both the file and its metadata in one scan.


We also experimented with different ways to store the text body. Initially, we stored the entire text in a single string field and applied Bloom filters, hoping for space and speed benefits. However, this approach slowed down keyword searches and failed to leverage filter pushdown. In contrast, storing tokenized arrays significantly improved search performance and worked better with Iceberg’s column-level statistics. We concluded that the array-based approach was optimal for storing text.



1-3-2. Partitioning Strategy




Partitioning in Iceberg logically divides data in a table, enabling faster and more cost-efficient queries by scanning only relevant partitions. For large tables, this significantly impacts performance. Therefore, we designed partitioning strategies based on actual query patterns and data distribution. Choosing the right partition key is critical for maximizing query performance.


Initially, we used `timestamp_store` for metadata table partitioning and 16-bucket SHA-256 hashes for the blob table. After combining into a single table, we kept only `timestamp_store` as the partition key, and SHA-256 was used solely for filtering.


This setup maintained stable performance even with large datasets and allowed quick access to specific files.


We deliberately avoided composite partitioning (e.g., `timestamp_store + sha256`) due to the following potential issues:


  • Metadata Overhead:
    Since `timestamp_store` was daily and SHA-256 split into 16 buckets, up to 16 partitions could be created per day. This could lead to thousands of partitions, degrading Iceberg’s snapshot commit and manifest merging performance.

  • Small File Explosion & Poor Compression:
    Dispersed data across partitions based on SHA-256 would result in many small Parquet files, reducing compression efficiency and increasing compaction and read overhead.


1-3-3. Optimizing Data Ingestion


We considered two ingestion methods for loading unpacked files into Iceberg: MERGE INTO and INSERT.


MERGE INTO updates existing data or inserts new rows based on specified conditions. Typically, it involves preparing a view or temp table, joining with the target table on a key, and applying updates or inserts accordingly. While useful for deduplication and conditional updates, this approach is resource-intensive due to full scans and evaluation logic—especially for large datasets.


By contrast, INSERT is a simpler, faster append-only operation that imposes less load on the system.


Aspect MERGE INTO INSERT
Operation Conditional UPDATE or INSERT Simple INSERT (append only)
Performance Costly due to full scan & comparison Fast and lightweight
Deduplication Possible (with defined conditions) Not supported (duplicates allowed)
Complexity High (requires view and logic) Simple
Schema Evolution Supported Supported


Initially, we used MERGE INTO to prevent duplicates, but performance degraded as data volume increased due to the overhead of evaluation and merging.


By executing the rewrite command below, we were able to reduce ingestion time from 10 minutes to about 30 seconds:



spark.sql(f"CALL <catalog_name>.system.rewrite_data_files(table => '{table_name}', options => map('rewrite-all', 'true'), strategy => 'sort', sort_order => 'sha256 ASC')")

Still, performance would degrade again as data grew, and regular rewrites were not a sustainable long-term solution.




We ultimately replaced MERGE INTO with INSERT, combined with pre-insertion deletion of older batches. This simplified the process while maintaining consistency in ingestion.


This change stabilized ingestion performance and improved query usability in analytic workflows.



2. What Issues Did We Encounter During Operation, and How Did We Solve Them?


2-1. Key Error Cases


While operating the Iceberg-based analytic pipeline, we encountered several unexpected issues when handling large datasets—particularly when processing large batches of metadata in parallel. These included Spark executor OutOfMemory (OOM) errors and task serialization size overflows.


For example, we saw the following error message:



Error during batch 5 metadata insertion: Job aborted due to stage failure:
Serialized task 81:0 was 1316422320 bytes, which exceeds max allowed: spark.rpc.message.maxSize (536870912 bytes).
Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.

java.lang.OutOfMemoryError
The executor with id 1 exited with exit code 52(JVM OOM)

This occurred because a single Spark task was processing too much data, either exceeding the 512MB serialization limit or running out of executor memory.


2-2. How We Addressed the Problem


To resolve the issues above, we applied the following strategies:


Reduced Batch Size
We lowered the metadata batch size from 5,000 to 1,000 to reduce the volume handled per task.


Changed List Initialization
Instead of using .clear() to reuse the same list, we created a new ArrayList<>() to reduce GC overhead.


Adopted KryoSerializer
We switched from the default JavaSerializer to Kryo to reduce memory usage and improve serialization speed.


Repartition for Task Parallelism
We repartitioned the dataset into 56 partitions to match our Spark cluster’s structure (2 cores × 7 instances × 4). The default partition count (8) created a bottleneck.


Increased spark.rpc.message.maxSize
We increased spark.rpc.message.maxSize to the maximum value of 2047MB to accommodate larger serialized payloads.


Used Broadcast Variables as Fallback
When Dataset loading failed, we passed data through SparkContext’s broadcast variable to minimize executor-to-executor network transmission. However, we excluded cases where the broadcast data exceeded 50MB.



Broadcast<List<IcebergMetadata>> broadcastBatch =
    sparkSession.sparkContext().broadcast(batch, ClassTag$.MODULE$.apply(List.class));

dataset = sparkSession.createDataset(
    broadcastBatch.value(),
    Encoders.bean(IcebergMetadata.class)
).toDF();

2-3. Other Relevant Settings


The following Spark settings are not directly related to the errors above, but may be helpful in similar environments. While we mostly used defaults, we list them here for reference:


  • spark.memory.fraction = 0.6
    Uses 60% of JVM heap for execution and storage.

  • spark.memory.storageFraction = 0.5
    Allocates half of that 60% (i.e., 30% of heap) for storage.

  • spark.memory.offHeap.enabled = true
    Enables use of off-heap memory outside the JVM.

  • spark.memory.offHeap.size = 8g
    Sets off-heap memory size to 8GB
  • spark.shuffle.spill = true
    Spills intermediate data to disk when memory is insufficient.

  • spark.shuffle.spill.compress = true
    Compresses spilled data for better I/O efficiency.

  • spark.executor.memory = 32g
    Executor heap memory.

  • spark.executor.memoryOverhead = 8g
    Memory for off-heap and shuffle handling.


2-4. Results


These measures allowed us to stabilize memory usage during parallel processing of large datasets. As a result, data ingestion pipeline failures dropped significantly, and ingestion performance became much more consistent. This also contributed to overall reliability of the batch pipeline.



3. What Benefits Did We Gain After Adopting Iceberg?


By adopting an Iceberg-based architecture, we built a structure that allows users to quickly filter and analyze required data using SQL-based queries. Users can now filter data based on file paths, formats, or metadata, and directly access the corresponding files. This greatly improves analysis efficiency.


Since each file and its related metadata are structurally organized, users can retrieve target data using queries alone, without having to manually navigate complex directory hierarchies.


Additionally, we preserved the key advantages of traditional data lakes, such as reduced storage costs and the ability to retain raw data. All original compressed files are stored in MinIO, ensuring high reliability and recoverability even if issues arise during table construction or transformation logic fails.


Because extracted data is saved as Iceberg tables, it’s now reusable across other pipelines and analytic workflows. This improves data reusability and operational efficiency. For example, a parsed file can be reused in downstream tasks without needing reprocessing. Multiple analytic pipelines can operate in parallel on the same table.


Iceberg’s metadata-driven design also enables analysts, planners, and non-developer users to access data more intuitively. Through SQL interfaces, meaningful analysis can be performed without relying on developer support. Tasks such as data extraction and conditional filtering, which previously required engineering assistance, are now part of a self-service model. This shift has significantly enhanced cross-team collaboration.




These improvements in structural flexibility and performance also helped accelerate product feature development and analytic pipeline expansion. In the past, each new analytic requirement required redesigning the data structure or creating a new pipeline. Now, we can simply reuse structured data in Iceberg tables to build new analytic environments quickly and flexibly.



4. What Insights Did This Experience Provide, and Where Are We Headed Next?


Through this Iceberg-based architecture design, we gained practical insights into how to manage and analyze large-scale unstructured data efficiently in a distributed environment. In particular, we realized how crucial it is to maintain a balance between rawness and structure while also ensuring scalability and query performance.


  • By separating data structure from processing flow, we improved pipeline maintainability and extensibility.
  • We fine-tuned critical performance-related elements like file metadata indexing, snapshot-based versioning, and partitioning strategies—confirming the strengths of Iceberg in real-world scenarios.

Going forward, we plan to expand in the following directions:


  • Ongoing Performance Optimization: Redesign partitions, schedule regular compactions, and clean up snapshots to improve operational efficiency.
  • Refine Airflow-based Pipelines: Modularize current tasks, enhance DAG flexibility and reusability, and build a more reliable workflow foundation.
  • Split Analysis Tasks by Module: Isolate parsing, preprocessing, and indexing into separate tasks to improve fault isolation, simplify reruns, and support scalable architecture.
  • Design Secondary Pipelines on Iceberg: Extend modular analytics using the existing table structure and leverage Iceberg’s abstraction layer for flexible pipeline design.

Iceberg is more than just a storage format; it serves as a solid foundation for designing analytic strategies and operational data flows. This project reaffirmed the importance of building architectures that balance structural flexibility with technical scalability. With continued hands-on experience, we aim to develop even more refined and resilient analytic infrastructure in the future.

 

Read <Adopting Apache Iceberg for Our Data Lakehouse - Part 1>
 


🧑‍💻 Author: S2W KE Team


👉 Contact Us: https://s2w.inc/en/contact


*Discover more about SAIP, S2W’s Generative AI Platform, in the details below.


List