FOUNDRYTransformsOfficialFeb 22, 202656 views

PySpark Transforms in Foundry — Complete Guide

#pyspark#transforms#python#spark#data-processing

PySpark Transforms in Foundry — Complete Guide

Overview

PySpark transforms are the primary way to process large-scale data in Foundry. They run on managed Apache Spark clusters and support the full PySpark API plus Foundry-specific extensions.

Basic Transform Structure

from transforms.api import transform, Input, Output
from pyspark.sql import functions as F, types as T

@transform(
    output=Output('/path/to/output'),
    input_data=Input('/path/to/input'),
)
def compute_transform(output, input_data):
    df = input_data.dataframe()
    
    result = (
        df
        .filter(F.col('status') == 'ACTIVE')
        .groupBy('department', F.year('created_date').alias('year'))
        .agg(
            F.count('*').alias('count'),
            F.sum('amount').alias('total_amount'),
            F.avg('amount').alias('avg_amount'),
        )
        .orderBy('department', 'year')
    )
    
    output.write_dataframe(result)

Incremental Transforms

For large datasets, use incremental processing to only compute changed data:

from transforms.api import incremental, transform, Input, Output

@incremental()
@transform(
    output=Output('/my/output'),
    source=Input('/my/source'),
)
def incremental_transform(output, source):
    # Only processes new/changed records since last run
    new_data = source.dataframe()
    output.write_dataframe(new_data)

Working with Multiple Inputs

@transform(
    output=Output('/path/to/joined_output'),
    orders=Input('/data/orders'),
    customers=Input('/data/customers'),
    products=Input('/data/products'),
)
def join_transform(output, orders, customers, products):
    orders_df = orders.dataframe()
    customers_df = customers.dataframe()
    products_df = products.dataframe()
    
    result = (
        orders_df
        .join(customers_df, 'customer_id', 'left')
        .join(products_df, 'product_id', 'left')
        .select(
            'order_id', 'order_date',
            'customer_name', 'customer_email',
            'product_name', 'quantity', 'unit_price',
            (F.col('quantity') * F.col('unit_price')).alias('line_total')
        )
    )
    
    output.write_dataframe(result)

Schema Definition

Define explicit schemas to avoid type inference overhead on large datasets:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType

output_schema = StructType([
    StructField('id',          StringType(),    False),
    StructField('name',        StringType(),    True),
    StructField('value',       DoubleType(),    True),
    StructField('updated_at',  TimestampType(), True),
    StructField('count',       IntegerType(),   True),
])

@transform(
    output=Output('/path/to/output', schema=output_schema),
    source=Input('/path/to/source'),
)
def typed_transform(output, source):
    df = source.dataframe(schema=output_schema)
    output.write_dataframe(df)

UDFs (User-Defined Functions)

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re

@udf(returnType=StringType())
def clean_text(text):
    if not text:
        return None
    return re.sub(r'[^a-zA-Z0-9\s]', '', text.strip().lower())

@transform(output=Output('/clean/text'), source=Input('/raw/text'))
def clean_transform(output, source):
    df = source.dataframe()
    result = df.withColumn('clean_text', clean_text(F.col('raw_text')))
    output.write_dataframe(result)

Performance Best Practices

  1. Partition your output by high-cardinality columns used in filters
  2. Cache intermediate DataFrames used more than once
  3. Avoid UDFs when built-in Spark functions exist (10-100x faster)
  4. Use broadcast joins for small lookup tables (<100MB)
  5. Filter early — reduce data volume before joins and aggregations
# ✅ Good: filter early, broadcast small table
small_lookup = F.broadcast(lookup_df)
result = large_df.filter(F.col('active') == True).join(small_lookup, 'id')

# ❌ Bad: join first, filter after
result = large_df.join(lookup_df, 'id').filter(F.col('active') == True)

Common Patterns

Slowly Changing Dimensions (SCD Type 2)

from pyspark.sql import Window

window = Window.partitionBy('entity_id').orderBy(F.desc('effective_date'))

scd2 = (
    df
    .withColumn('rn', F.row_number().over(window))
    .withColumn('is_current', F.col('rn') == 1)
    .drop('rn')
)

Deduplication

deduplicated = (
    df
    .withColumn('rn', F.row_number().over(
        Window.partitionBy('id').orderBy(F.desc('updated_at'))
    ))
    .filter(F.col('rn') == 1)
    .drop('rn')
)