PySpark Integration¶
Distributed execution of rusket algorithms across Databricks or on-prem Hadoop clusters via zero-copy Apache Arrow transfers.
rusket integrates with PySpark clusters via zero-copy Apache Arrow transfers, enabling distributed execution of all its core algorithms without manual serialisation.
All distributed functions live in rusket.spark and use applyInArrow (Spark 3.4+) with applyInPandas as a fallback for older versions.
Setup¶
mine_grouped — Distributed Market Basket Analysis per Store / Region¶
import rusket.spark
spark_df = spark.table("gold.baskets_ohe")
freq_df = rusket.spark.mine_grouped(
df=spark_df,
group_col="store_id",
min_support=0.05,
method="auto",
use_colnames=True,
max_len=3,
)
# Output: store_id (string) | support (double) | itemsets (array<string>)
rules_grouped — Distributed Association Rules per Segment¶
rules_df = rusket.spark.rules_grouped(
df=freq_df,
group_col="store_id",
num_itemsets={"store_A": 45_000, "store_B": 12_300},
metric="confidence",
min_threshold=0.6,
)
# Output: store_id | antecedents | consequents | confidence | lift | ...
Full end-to-end regional pipeline:
freq_df = rusket.spark.mine_grouped(spark_df, group_col="store_id", min_support=0.05)
rules_df = rusket.spark.rules_grouped(freq_df, group_col="store_id", num_itemsets=20_000)
rules_df.write.mode("overwrite").saveAsTable("gold.per_store_rules")
prefixspan_grouped — Distributed Customer Journey Analysis¶
seq_df = rusket.spark.prefixspan_grouped(
df=spark_df,
group_col="region",
user_col="customer_id",
time_col="event_ts",
item_col="product_id",
min_support=100,
max_len=4,
)
# Output: region | support (long) | sequence (array<string>)
# Example:
# EMEA | 432 | [broadband, mobile, tv_bundle, cancel]
hupm_grouped — Distributed High-Profit Bundle Discovery¶
hupm_df = rusket.spark.hupm_grouped(
df=spark_df,
group_col="region",
transaction_col="receipt_id",
item_col="product_id",
utility_col="margin",
min_utility=500.0,
max_len=3,
)
# Output: region | utility (double) | itemset (array<long>)
recommend_batches — Overnight Batch Personalisation at Scale¶
from rusket import ALS
als = ALS(factors=64, iterations=15).fit(user_item_csr)
rec_df = rusket.spark.recommend_batches(
df=spark.table("silver.user_sessions"),
model=als,
user_col="customer_id",
k=10,
)
# Output: customer_id (string) | recommended_items (array<int>)
rec_df.write.mode("overwrite").saveAsTable("gold.daily_recommendations")
mine_spark — Global Mining via FPMiner Streaming¶
from rusket.streaming import mine_spark
freq_df = mine_spark(
spark_df=spark.table("silver.order_lines"),
n_items=200_000,
txn_col="order_id",
item_col="sku_index",
min_support=0.001,
max_len=3,
)
to_spark¶
Convert a Pandas or Polars DataFrame to a PySpark DataFrame: