Introduction
Resolving OutOfMemory (OOM) Errors in PySpark is essential when working with large datasets or complex data pipelines. These errors occur when Spark processes consume more memory than allocated, causing job failures or degraded performance. The good news? You can avoid them with proper configurations and smart coding practices.
This guide walks you through easy-to-implement strategies to prevent OOM errors and keep your PySpark jobs running smoothly.
1. Adjust Spark Configuration (Memory Management)
The first step in avoiding OOM errors is allocating sufficient memory and CPU resources for Spark components like executors and drivers.
🔧 Key Configurations
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.cores", "2") \
.getOrCreate()
spark.executor.memory
: Memory allocated to each executor.spark.driver.memory
: Memory for the driver program.spark.executor.cores
: Number of cores per executor.
💾 Use Disk Persistence
Large datasets can be persisted to disk to reduce memory pressure:
from pyspark import StorageLevel
df.persist(StorageLevel.DISK_ONLY)
This offloads storage from RAM to disk, reducing the chance of OOM errors.
2. Enable Dynamic Allocation
Dynamic Allocation allows Spark to increase or decrease the number of executors at runtime depending on workload demand. This ensures efficient memory usage and minimizes resource overuse.
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "2")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "10")
minExecutors
: Minimum number of executors Spark can use.initialExecutors
: Number of executors to start with.maxExecutors
: Maximum number of executors Spark can scale to.
This approach automatically adapts resources to avoid OOM errors in varying data loads.
3. Enable Adaptive Query Execution (AQE)
Adaptive Query Execution (AQE) optimizes query plans at runtime based on actual data metrics. It helps prevent unnecessary memory usage by:
- Merging small shuffle partitions
- Selecting better join strategies
- Handling skewed joins intelligently
Enable AQE with:
spark.conf.set("spark.sql.adaptive.enabled", "true")
AQE helps Spark be smarter with memory during query execution, especially when dealing with unexpected data sizes.
4. Enforce Schema for Unstructured Data
When reading unstructured data (e.g., JSON), Spark tries to infer the schema — which can be expensive and memory-consuming. You can avoid this by specifying the schema explicitly.
👇 Example:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
df = spark.read.schema(schema).json("path/to/data.json")
This reduces overhead and speeds up the reading process while saving memory.
5. Tune the Number of Partitions
Too few partitions overload some executors, while too many cause task scheduling overhead. Use repartition()
to balance the load:
df = df.repartition(200, "column_name")
You can start with 2 to 3 times the total number of CPU cores in your cluster. Monitor performance and adjust accordingly.
6. Handle Data Skew Dynamically
Data skew means some keys appear more frequently than others, overloading the corresponding executor. This often leads to OOM errors during joins.
One common solution is salting:
from pyspark.sql import functions as F
df = df.withColumn("join_key_salted", F.concat(F.col("join_key"), F.lit("_"), F.rand()))
This randomizes the key distribution, balancing the workload during joins.
7. Limit Cache Usage for Large DataFrames
Caching everything can fill memory quickly. Instead, cache only what’s needed — and use disk when necessary:
df.persist(StorageLevel.MEMORY_AND_DISK)
After use, clean up to free memory:
df.unpersist()
Always monitor memory usage when caching.
8. Optimize Joins for Large DataFrames
When joining a large table with a small one, broadcast joins work best. Spark replicates the smaller table across all executors, preventing shuffle and reducing memory needs.
from pyspark.sql.functions import broadcast
df_join = large_df.join(broadcast(small_df), "join_key", "left")
Use this only when the small table is less than 10MB.
9. Monitor Spark Jobs
Use Spark UI (usually available at http://localhost:4040
) to monitor:
- Memory usage per executor
- Stages and tasks
- Shuffle read/write
- Cached data
- Job execution time
Regular monitoring helps spot inefficiencies early and avoid OOM crashes.
10. Consider Partitioning Strategy When Writing Data
When saving large datasets, writing partitioned files helps Spark read only relevant data during future queries:
df.write.partitionBy("region").parquet("output/path")
This reduces read-time memory usage and improves query performance by pruning irrelevant partitions.
Conclusion
Resolving OutOfMemory (OOM) Errors in PySpark is all about smart memory management, efficient code practices, and leveraging Spark’s built-in optimization features. By following these 10 best practices:
- Tune memory and cores
- Use dynamic and adaptive settings
- Optimize joins and schema
- Manage caching and partitions carefully
You’ll make your PySpark jobs more reliable, scalable, and performant — no more surprise crashes or memory bottlenecks.
learn about the Partitioning and Bucketing
Spark Join Strategies Explained — Useful tips on optimizing joins.