pyspark.sql.DataFrame.lateralJoin#

DataFrame.lateralJoin(other, on=None, how=None)[source]#

Lateral joins with another DataFrame, using the given join expression.

A lateral join (also known as a correlated join) is a type of join where each row from one DataFrame is used as input to a subquery or a derived table that computes a result specific to that row. The right side DataFrame can reference columns from the current row of the left side DataFrame, allowing for more complex and context-dependent results than a standard join.

New in version 4.0.0.

Parameters
otherDataFrame

Right side of the join

onColumn, optional

a join expression (Column).

howstr, optional

default inner. Must be one of: inner, cross, left, leftouter, and left_outer.

Returns
DataFrame

Joined DataFrame.

Examples

Setup a sample DataFrame.

>>> from pyspark.sql import functions as sf
>>> from pyspark.sql import Row
>>> customers_data = [
...     Row(customer_id=1, name="Alice"), Row(customer_id=2, name="Bob"),
...     Row(customer_id=3, name="Charlie"), Row(customer_id=4, name="Diana")
... ]
>>> customers = spark.createDataFrame(customers_data)
>>> orders_data = [
...     Row(order_id=101, customer_id=1, order_date="2024-01-10",
...         items=[Row(product="laptop", quantity=5), Row(product="mouse", quantity=12)]),
...     Row(order_id=102, customer_id=1, order_date="2024-02-15",
...         items=[Row(product="phone", quantity=2), Row(product="charger", quantity=15)]),
...     Row(order_id=105, customer_id=1, order_date="2024-03-20",
...         items=[Row(product="tablet", quantity=4)]),
...     Row(order_id=103, customer_id=2, order_date="2024-01-12",
...         items=[Row(product="tablet", quantity=8)]),
...     Row(order_id=104, customer_id=2, order_date="2024-03-05",
...         items=[Row(product="laptop", quantity=7)]),
...     Row(order_id=106, customer_id=3, order_date="2024-04-05",
...         items=[Row(product="monitor", quantity=1)]),
... ]
>>> orders = spark.createDataFrame(orders_data)

Example 1 (use TVF): Expanding Items in Each Order into Separate Rows

>>> customers.join(orders, "customer_id").lateralJoin(
...     spark.tvf.explode(sf.col("items").outer()).select("col.*")
... ).select(
...     "customer_id", "name", "order_id", "order_date", "product", "quantity"
... ).orderBy("customer_id", "order_id", "product").show()
+-----------+-------+--------+----------+-------+--------+
|customer_id|   name|order_id|order_date|product|quantity|
+-----------+-------+--------+----------+-------+--------+
|          1|  Alice|     101|2024-01-10| laptop|       5|
|          1|  Alice|     101|2024-01-10|  mouse|      12|
|          1|  Alice|     102|2024-02-15|charger|      15|
|          1|  Alice|     102|2024-02-15|  phone|       2|
|          1|  Alice|     105|2024-03-20| tablet|       4|
|          2|    Bob|     103|2024-01-12| tablet|       8|
|          2|    Bob|     104|2024-03-05| laptop|       7|
|          3|Charlie|     106|2024-04-05|monitor|       1|
+-----------+-------+--------+----------+-------+--------+

Example 2 (use subquery): Finding the Two Most Recent Orders for Customer

>>> customers.alias("c").lateralJoin(
...     orders.alias("o")
...     .where(sf.col("o.customer_id") == sf.col("c.customer_id").outer())
...     .select("order_id", "order_date")
...     .orderBy(sf.col("order_date").desc())
...     .limit(2),
...     how="left"
... ).orderBy("customer_id", "order_id").show()
+-----------+-------+--------+----------+
|customer_id|   name|order_id|order_date|
+-----------+-------+--------+----------+
|          1|  Alice|     102|2024-02-15|
|          1|  Alice|     105|2024-03-20|
|          2|    Bob|     103|2024-01-12|
|          2|    Bob|     104|2024-03-05|
|          3|Charlie|     106|2024-04-05|
|          4|  Diana|    NULL|      NULL|
+-----------+-------+--------+----------+