FOUNDRYTransformsOfficialFeb 22, 202656 views
PySpark Transforms in Foundry — Complete Guide
#pyspark#transforms#python#spark#data-processing
PySpark Transforms in Foundry — Complete Guide
Overview
PySpark transforms are the primary way to process large-scale data in Foundry. They run on managed Apache Spark clusters and support the full PySpark API plus Foundry-specific extensions.
Basic Transform Structure
from transforms.api import transform, Input, Output
from pyspark.sql import functions as F, types as T
@transform(
output=Output('/path/to/output'),
input_data=Input('/path/to/input'),
)
def compute_transform(output, input_data):
df = input_data.dataframe()
result = (
df
.filter(F.col('status') == 'ACTIVE')
.groupBy('department', F.year('created_date').alias('year'))
.agg(
F.count('*').alias('count'),
F.sum('amount').alias('total_amount'),
F.avg('amount').alias('avg_amount'),
)
.orderBy('department', 'year')
)
output.write_dataframe(result)
Incremental Transforms
For large datasets, use incremental processing to only compute changed data:
from transforms.api import incremental, transform, Input, Output
@incremental()
@transform(
output=Output('/my/output'),
source=Input('/my/source'),
)
def incremental_transform(output, source):
# Only processes new/changed records since last run
new_data = source.dataframe()
output.write_dataframe(new_data)
Working with Multiple Inputs
@transform(
output=Output('/path/to/joined_output'),
orders=Input('/data/orders'),
customers=Input('/data/customers'),
products=Input('/data/products'),
)
def join_transform(output, orders, customers, products):
orders_df = orders.dataframe()
customers_df = customers.dataframe()
products_df = products.dataframe()
result = (
orders_df
.join(customers_df, 'customer_id', 'left')
.join(products_df, 'product_id', 'left')
.select(
'order_id', 'order_date',
'customer_name', 'customer_email',
'product_name', 'quantity', 'unit_price',
(F.col('quantity') * F.col('unit_price')).alias('line_total')
)
)
output.write_dataframe(result)
Schema Definition
Define explicit schemas to avoid type inference overhead on large datasets:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
output_schema = StructType([
StructField('id', StringType(), False),
StructField('name', StringType(), True),
StructField('value', DoubleType(), True),
StructField('updated_at', TimestampType(), True),
StructField('count', IntegerType(), True),
])
@transform(
output=Output('/path/to/output', schema=output_schema),
source=Input('/path/to/source'),
)
def typed_transform(output, source):
df = source.dataframe(schema=output_schema)
output.write_dataframe(df)
UDFs (User-Defined Functions)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re
@udf(returnType=StringType())
def clean_text(text):
if not text:
return None
return re.sub(r'[^a-zA-Z0-9\s]', '', text.strip().lower())
@transform(output=Output('/clean/text'), source=Input('/raw/text'))
def clean_transform(output, source):
df = source.dataframe()
result = df.withColumn('clean_text', clean_text(F.col('raw_text')))
output.write_dataframe(result)
Performance Best Practices
- Partition your output by high-cardinality columns used in filters
- Cache intermediate DataFrames used more than once
- Avoid UDFs when built-in Spark functions exist (10-100x faster)
- Use broadcast joins for small lookup tables (<100MB)
- Filter early — reduce data volume before joins and aggregations
# ✅ Good: filter early, broadcast small table
small_lookup = F.broadcast(lookup_df)
result = large_df.filter(F.col('active') == True).join(small_lookup, 'id')
# ❌ Bad: join first, filter after
result = large_df.join(lookup_df, 'id').filter(F.col('active') == True)
Common Patterns
Slowly Changing Dimensions (SCD Type 2)
from pyspark.sql import Window
window = Window.partitionBy('entity_id').orderBy(F.desc('effective_date'))
scd2 = (
df
.withColumn('rn', F.row_number().over(window))
.withColumn('is_current', F.col('rn') == 1)
.drop('rn')
)
Deduplication
deduplicated = (
df
.withColumn('rn', F.row_number().over(
Window.partitionBy('id').orderBy(F.desc('updated_at'))
))
.filter(F.col('rn') == 1)
.drop('rn')
)