Pipeline Architecture¶
For production systems requiring advanced retrieval and ranking, rusket provides the Pipeline class. This mirrors the "retrieve → rerank → filter" paradigm used by Twitter/X and modern ML stacks.
The Pipeline allows you to effortlessly combine the strengths of different memory-based and latent-factor models, optionally re-scoring their candidates with a slower, heavier model (like Factorization Machines) before applying final business logic filters.
The Retrieve → Rerank → Filter Funnel¶
from rusket import ALS, BPR, Pipeline
# Train multiple base models
als = ALS(factors=64).fit(interactions)
bpr = BPR(factors=128).fit(interactions)
# Compose the Pipeline
pipeline = Pipeline(
retrieve=[als, bpr],
merge_strategy="max",
rerank=bpr,
)
- Retrieve: You can pass one or multiple trained recommenders to the
retrieveparameter. E.g. retrieving candidates from anALScollaborative filter and anItemKNNmodel. - Merge Strategy: When passing multiple retrievers, the
merge_strategydetermines how scores are grouped if a candidate item is generated by more than one retriever. Options: "max": take the highest score"sum": sum up the scores"mean": take the average- Rerank (optional): You can provide a secondary model to re-score the generated candidates. Rerankers are usually slower to run on the entire catalogue, so they use the narrowed down candidate space.
- Filter (optional): Define Python functions to mathematically apply rules (e.g. inventory control, blacklisting).
Batch Prediction Optimizations¶
The Pipeline uses the same heavily optimized Rust-accelerated fast paths as standard rusket models whenever possible.
If all configured models expose user_factors and item_factors (such as ALS and BPR) and share identical user mappings, .recommend_batch() skips Python loops entirely.
# Blazing-fast Batch Scoring utilizing Rust inner loops
batch_recs = pipeline.recommend_batch(
user_ids=[1001, 1002, 1003],
n=10,
format="polars" # Avoid pandas memory bloat
)
Performance Note: If any of your configured models inside the
Pipelinedo not support the Rust-batch path (e.g., Python-level custom classes or ItemKNN without factor graphs),rusketsafely falls back to a multi-threaded Python loop.
Custom Filtering¶
You can supply custom python callbacks to the filter argument. This function accepts candidate IDs and their scores, returning filtered arrays.
import numpy as np
def exclude_premium_skus(candidate_ids: np.ndarray, candidate_scores: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""Remove items representing premium-tier items for basic-tier users."""
mask = ~np.isin(candidate_ids, [99, 104, 255]) # SKUs to exclude
return candidate_ids[mask], candidate_scores[mask]
# Create pipeline with an ad-hoc custom filter
pipeline = Pipeline(
retrieve=[als],
filter=exclude_premium_skus
)
recs, scores = pipeline.recommend(user_id=1, n=5)
# returned candidates are guaranteed to be filtered over the mask.
Injecting Business Rules¶
While mathematical filters are great for removing items, you sometimes need to forcibly promote items based on curated business logic (e.g., "If a user bought a laptop, they should definitely see an extended warranty").
By passing RuleBasedRecommender instances to the rules parameter, their item associations are evaluated and injected into the final candidate set after re-ranking, receiving an artificially high mathematical advantage (a score boost of +1M). This guarantees your curated rules always rank at the top.
import pandas as pd
from rusket import Pipeline, ALS, ItemKNN, RuleBasedRecommender
als = ALS(factors=64).fit(interactions)
knn = ItemKNN(k=50).fit(interactions)
# 1. Define specific item-to-item business rules
rules_df = pd.DataFrame({
"antecedent": ["102", "102"], # Laptop SKU
"consequent": ["999", "888"], # Warranty SKU & Premium Mouse SKU
"score": [2.0, 1.5]
})
# 2. Build the rule model
rule_model = RuleBasedRecommender.from_transactions(
interactions,
rules=rules_df,
user_col="user",
item_col="item"
).fit()
# 3. Inject it into the pipeline's `rules` phase
pipeline = Pipeline(
retrieve=[als, knn],
rerank=als,
rules=rule_model, # Injects forced correlations
)
# User 42 interaction history is checked during retrieval.
# If they previously interacted with Laptop "102", the warranty "999"
# will bypass reranking logic and rank #1.
recs, scores = pipeline.recommend(user_id=42, n=5)