Skip to content

Pipeline Architecture

For production systems requiring advanced retrieval and ranking, rusket provides the Pipeline class. This mirrors the "retrieve → rerank → filter" paradigm used by Twitter/X and modern ML stacks.

The Pipeline allows you to effortlessly combine the strengths of different memory-based and latent-factor models, optionally re-scoring their candidates with a slower, heavier model (like Factorization Machines) before applying final business logic filters.


The Retrieve → Rerank → Filter Funnel

from rusket import ALS, BPR, Pipeline

# Train multiple base models
als = ALS(factors=64).fit(interactions)
bpr = BPR(factors=128).fit(interactions)

# Compose the Pipeline
pipeline = Pipeline(
    retrieve=[als, bpr],
    merge_strategy="max",
    rerank=bpr,
)
  1. Retrieve: You can pass one or multiple trained recommenders to the retrieve parameter. E.g. retrieving candidates from an ALS collaborative filter and an ItemKNN model.
  2. Merge Strategy: When passing multiple retrievers, the merge_strategy determines how scores are grouped if a candidate item is generated by more than one retriever. Options:
  3. "max": take the highest score
  4. "sum": sum up the scores
  5. "mean": take the average
  6. Rerank (optional): You can provide a secondary model to re-score the generated candidates. Rerankers are usually slower to run on the entire catalogue, so they use the narrowed down candidate space.
  7. Filter (optional): Define Python functions to mathematically apply rules (e.g. inventory control, blacklisting).

Batch Prediction Optimizations

The Pipeline uses the same heavily optimized Rust-accelerated fast paths as standard rusket models whenever possible. If all configured models expose user_factors and item_factors (such as ALS and BPR) and share identical user mappings, .recommend_batch() skips Python loops entirely.

# Blazing-fast Batch Scoring utilizing Rust inner loops
batch_recs = pipeline.recommend_batch(
    user_ids=[1001, 1002, 1003],
    n=10,
    format="polars"  # Avoid pandas memory bloat
)

Performance Note: If any of your configured models inside the Pipeline do not support the Rust-batch path (e.g., Python-level custom classes or ItemKNN without factor graphs), rusket safely falls back to a multi-threaded Python loop.


Custom Filtering

You can supply custom python callbacks to the filter argument. This function accepts candidate IDs and their scores, returning filtered arrays.

import numpy as np

def exclude_premium_skus(candidate_ids: np.ndarray, candidate_scores: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    """Remove items representing premium-tier items for basic-tier users."""
    mask = ~np.isin(candidate_ids, [99, 104, 255])  # SKUs to exclude
    return candidate_ids[mask], candidate_scores[mask]

# Create pipeline with an ad-hoc custom filter
pipeline = Pipeline(
    retrieve=[als],
    filter=exclude_premium_skus
)

recs, scores = pipeline.recommend(user_id=1, n=5)
# returned candidates are guaranteed to be filtered over the mask.

Injecting Business Rules

While mathematical filters are great for removing items, you sometimes need to forcibly promote items based on curated business logic (e.g., "If a user bought a laptop, they should definitely see an extended warranty").

By passing RuleBasedRecommender instances to the rules parameter, their item associations are evaluated and injected into the final candidate set after re-ranking, receiving an artificially high mathematical advantage (a score boost of +1M). This guarantees your curated rules always rank at the top.

import pandas as pd
from rusket import Pipeline, ALS, ItemKNN, RuleBasedRecommender

als = ALS(factors=64).fit(interactions)
knn = ItemKNN(k=50).fit(interactions)

# 1. Define specific item-to-item business rules
rules_df = pd.DataFrame({
    "antecedent": ["102", "102"],    # Laptop SKU
    "consequent": ["999", "888"],    # Warranty SKU & Premium Mouse SKU
    "score": [2.0, 1.5]
})

# 2. Build the rule model
rule_model = RuleBasedRecommender.from_transactions(
    interactions, 
    rules=rules_df, 
    user_col="user", 
    item_col="item"
).fit()

# 3. Inject it into the pipeline's `rules` phase
pipeline = Pipeline(
    retrieve=[als, knn],
    rerank=als,
    rules=rule_model,  # Injects forced correlations
)

# User 42 interaction history is checked during retrieval. 
# If they previously interacted with Laptop "102", the warranty "999" 
# will bypass reranking logic and rank #1.
recs, scores = pipeline.recommend(user_id=42, n=5)