Foundations · 12 Stages

Python for AI & Data.

From language core to NumPy, Pandas, async, and performance — every stage built for Data Scientists and AI Engineers, with production pitfalls and interview Q&A throughout.

NumPyPandasasynciomypypytestfunctoolsitertoolsdataclasses
Python for Data Work 01–02
01

Data Types Every Data Scientist Needs

The types you work with daily — numbers, strings, lists, dicts, None/NaN — have subtle behaviours that cause silent bugs in ML metrics, NLP pipelines, and data quality checks. Get these right before reaching for any library.

Numbers & Float Precision in ML
IEEE 754 floats in metrics, math.isclose for threshold comparisons, int vs float64 in NumPy

Python floats are IEEE 754 double-precision (64-bit binary). Most decimal fractions cannot be represented exactly, so 0.1 + 0.2 != 0.3. In ML this matters whenever you compare a metric to a threshold: accuracy >= 0.95 may silently fire or not depending on floating-point rounding. Use math.isclose() for all float comparisons in evaluation code. For counts (correct predictions, sample sizes) prefer int arithmetic before dividing. In NumPy, int64 arrays cannot hold NaN — only float64 can, which is why Pandas silently promotes int columns to float64 when NaN is introduced.

Python — float precision in ML metrics, math.isclose, NumPy dtypes
import math, numpy as np, pandas as pd

# ── Float precision in metric comparisons ────────────────
correct, total = 19, 20
accuracy = correct / total           # 0.95

a = 0.1 + 0.2
print(a == 0.3)                      # False (0.30000000000000004)
print(math.isclose(a, 0.3, rel_tol=1e-9))  # True ✓

# Safe threshold comparison for model evaluation
threshold = 0.95
if math.isclose(accuracy, threshold, rel_tol=1e-6) or accuracy > threshold:
    print("Model meets target")

# ── RMSE comparison ───────────────────────────────────────
rmse_v1, rmse_v2 = 0.4230000000001, 0.423
improved = not math.isclose(rmse_v1, rmse_v2, rel_tol=1e-6)
print("V2 improved:", improved)      # False — same within tolerance

# ── NumPy dtype: int64 cannot hold NaN ───────────────────
arr_int = np.array([1, 2, 3], dtype=np.int64)
# arr_int[0] = np.nan  → ValueError: cannot convert float NaN to int

arr_float = np.array([1, 2, 3], dtype=np.float64)
arr_float[0] = np.nan               # works — float64 holds NaN

# ── Pandas silently promotes int → float64 with NaN ──────
s = pd.Series([1, 2, 3])
print(s.dtype)    # int64
s[0] = None
print(s.dtype)    # float64 ← automatic promotion!
print(s)          # [NaN, 2.0, 3.0]

# Use nullable Int64 to keep integers with NaN
s2 = pd.array([1, 2, None], dtype=pd.Int64Dtype())
print(s2)         # [1, 2, <NA>]  ← no float promotion
Pitfall Comparing float metrics with ==

if accuracy == 0.95: silently fails when accuracy is 0.9500000000001 due to floating-point arithmetic. The model passes all logic checks but the evaluation branch never fires.

Fix Use math.isclose(accuracy, 0.95, rel_tol=1e-6) or numpy.isclose(). Set rel_tol based on how much precision your metric actually carries.
Pitfall Expecting int columns to stay int when NaN is present

A Pandas int64 column gets silently cast to float64 when you assign NaN or None. Downstream dtype checks like dtype == "int64" then fail unexpectedly.

Fix Use pd.Int64Dtype() (capital I, nullable integer) for columns that may contain missing values. Or use pd.array([...], dtype=pd.Int64Dtype()) at creation time.
Pitfall Using / for integer division expecting int output

In Python 3, 5 / 2 == 2.5 (true division always). If you expect an integer index from a division like mid = (lo + hi) / 2, you get a float and array indexing raises TypeError.

Fix Use // for floor division: mid = (lo + hi) // 2. Always use // when the result must be an integer index or count.

Python floats follow IEEE 754 double-precision binary. Most decimal fractions have no exact binary representation — 0.1 is stored as 0.1000000000000000055511... Arithmetic accumulates rounding errors. In ML: use math.isclose(a, b, rel_tol=1e-9) for threshold comparisons, numpy.isclose() for array-level comparisons, and round(metric, 4) when logging to suppress spurious precision in experiment reports.

NumPy int64 has no NaN representation — NaN is an IEEE 754 float concept. When you assign None or np.nan to a Pandas int64 Series, Pandas must promote the dtype to float64 to accommodate it. In Pandas 1.0+ use pd.Int64Dtype() for nullable integers. Always check df.dtypes after loading data to catch unexpected promotions before they affect model inputs.

Use int for counts, class labels, indices, and batch sizes — integer arithmetic is exact and avoids NaN-promotion issues. Use float for ratios, probabilities, and continuous values. Key rule: if the column will ever contain NaN, it must be float64 or a nullable integer type. Mixing silently (e.g., label 0 becomes 0.0) causes issues in model APIs that expect integer labels for classification tasks.

Strings & Text for NLP/LLM Work
Text normalisation, f-strings for logging and prompts, regex for feature extraction

Strings are immutable Unicode sequences. For NLP and LLM work the core skills are: (1) text normalisation — lowercase, strip, remove special characters; (2) f-strings for building log messages and LLM prompts efficiently; (3) join() not + in loops for string building; (4) regex for extracting structured features from raw text. In LLM engineering, prompt strings are your primary interface to the model — treat them with the same rigour as code.

Python — text cleaning, LLM prompt building, regex feature extraction
import re

# ── Text normalisation pipeline ───────────────────────────
def clean_text(text: str) -> str:
    text = text.lower().strip()
    text = re.sub(r'[^a-z0-9 ]', '', text)   # keep alphanumeric + spaces
    return ' '.join(text.split())             # collapse whitespace

raw = "  Hello, World! This is a TEST.  "
print(clean_text(raw))    # "hello world this is a test"

# ── f-strings for experiment logging ─────────────────────
epoch, loss, acc = 5, 0.234, 0.912
log_line = f"Epoch {epoch:02d} | loss={loss:.4f} | acc={acc:.2%}"
print(log_line)           # "Epoch 05 | loss=0.2340 | acc=91.20%"

# ── LLM prompt building ───────────────────────────────────
def build_prompt(query: str, context: list[str], max_ctx: int = 3) -> str:
    ctx_block = "
".join(f"  - {c}" for c in context[:max_ctx])
    return (
        "You are a helpful assistant. Answer based on the context below.

"
        f"Context:
{ctx_block}

"
        f"Question: {query}
Answer:"
    )

prompt = build_prompt(
    "What is RAG?",
    ["RAG retrieves relevant documents", "LLMs use retrieved context to answer"]
)

# ── Efficient string building (join not +) ────────────────
tokens = ["the", "model", "predicted", "positive"]
# BAD — O(n²): creates a new string object on every iteration
result = ""
for t in tokens: result += t + " "

# GOOD — O(n): one allocation, one pass
result = " ".join(tokens)

# ── Regex for feature extraction ──────────────────────────
text = "Order #12345 confirmed. Contact [email protected]"
order_ids = re.findall(r'#(\d+)', text)     # ['12345']
emails    = re.findall(r'[\w.]+@[\w.]+', text)  # ['[email protected]']
Pitfall String concatenation with + inside a loop

for token in tokens: result += token builds a new string object every iteration — O(n²) time. For a 10,000-token NLP preprocessing loop this is measurably slow.

Fix Collect into a list and join once: " ".join(tokens). join() makes a single allocation and one pass — the standard Python pattern for building strings from parts.
Pitfall Forgetting .strip() before comparisons or dict lookups

Text from CSV or API responses often carries trailing newlines or spaces. if label == "positive" fails silently when the value is "positive\n", causing missed detections in evaluation pipelines.

Fix Always apply .strip() (or .strip().lower()) at the point data enters your pipeline — in the CSV reader or API response parser, not scattered throughout the code.
Pitfall Treating bytes objects as strings

open("file.txt", "rb") returns bytes. Calling .lower() or .split() on bytes raises AttributeError. This surfaces when reading binary model files or when file encoding is not specified.

Fix Use open("file.txt", "r", encoding="utf-8") for text files. Always specify encoding — do not rely on the platform default, which differs between Linux (UTF-8) and Windows (cp1252).

Use f-strings or string concatenation for parameterised prompts. Write a dedicated function that takes context and query as typed arguments and returns the prompt string. Always strip and normalise user input before interpolating it into prompts to reduce prompt injection risk. For production systems, consider Jinja2 templates for versioning and validation. Store prompt templates separately from code so they can be updated without a redeploy.

"".join(list_of_parts) is the correct idiom — a single allocation for the final string. Concatenation with += in a loop is O(n²) because each step allocates a new string. For very large outputs, io.StringIO acts as a mutable buffer: write parts with sio.write(), retrieve with sio.getvalue(). In Pandas, use vectorised string operations (df["col"].str.lower()) instead of apply with a lambda for column-wise transformations.

Python 3 strings are Unicode by default. Use unicodedata.normalize("NFC", text) to canonicalise accented characters. To strip accents: normalize("NFD", text) then filter out combining characters (unicodedata.category(c) == "Mn"). Use re with the default Unicode-aware mode for Python 3. For tokenising non-Latin scripts, use dedicated tokenisers (spaCy, HuggingFace tokenizers) rather than split(), which only handles whitespace.

Lists, Dicts & Sets in Data Pipelines
Label encoding with dicts, deduplication with sets, list comprehensions for feature extraction

Lists (ordered sequences), dicts (hash maps, O(1) lookup), and sets (hash sets, O(1) membership) are the core preprocessing structures. Use dicts for label-to-integer mappings, use sets for deduplication and vocabulary membership, use lists for feature vectors and batch items. Choosing the wrong structure — a list where a set belongs — can silently turn an O(n) operation into O(n²) at scale.

Python — label encoding, deduplication, feature extraction patterns
from collections import defaultdict

# ── Label encoding with dict ─────────────────────────────
labels = ["cat", "dog", "cat", "bird", "dog", "cat"]

label2id = {label: idx for idx, label in enumerate(sorted(set(labels)))}
id2label = {v: k for k, v in label2id.items()}
# label2id = {'bird': 0, 'cat': 1, 'dog': 2}

encoded = [label2id[l] for l in labels]        # [1, 2, 1, 0, 2, 1]
decoded = [id2label[i] for i in encoded]

# Safe lookup for unknown labels at inference time
def encode_label(label: str, fallback: int = -1) -> int:
    return label2id.get(label, fallback)

encode_label("fish")    # -1 — unknown class, no KeyError

# ── Deduplication with set ────────────────────────────────
urls = ["http://a.com", "http://b.com", "http://a.com", "http://c.com"]
unique_urls = list(dict.fromkeys(urls))   # preserves insertion order (3.7+)

# ── O(1) vocabulary membership check ────────────────────
vocab = {"cat", "dog", "bird", "fish"}         # set, not list!
word = "fish"
print(word in vocab)    # O(1)   ← hashed lookup
# AVOID: word in ["cat", "dog", "bird", "fish"] → O(n) linear scan

# ── List comprehension for feature extraction ─────────────
records = [
    {"text": "great product", "stars": 5},
    {"text": "terrible",      "stars": 1},
    {"text": "it's ok",       "stars": 3},
]
texts   = [r["text"]                      for r in records]
ratings = [r["stars"]                     for r in records]
is_pos  = [r["stars"] >= 4                for r in records]  # binary label

# ── defaultdict for group-by ──────────────────────────────
buckets = defaultdict(list)
for r in records:
    key = "positive" if r["stars"] >= 4 else "negative"
    buckets[key].append(r["text"])
# {"positive": ["great product"], "negative": ["terrible", "it's ok"]}
Pitfall Using list for membership testing in a hot loop

if token in stop_words_list — O(n) per lookup. For a 50,000-word vocabulary and 1M tokens, that is 50 billion comparisons. This looks correct but is catastrophically slow.

Fix Convert once: stop_words = set(stop_words_list). Then if token in stop_words is O(1). The O(n) conversion cost is paid once; every subsequent lookup is O(1).
Pitfall dict[key] instead of dict.get(key) at inference time

label2id[unknown_label] raises KeyError if the model encounters a category not seen during training. In a production serving endpoint this crashes the request.

Fix Use label2id.get(label, fallback_value) for lookups that may fail. Handle the unknown-class case explicitly — return a default class ID, log the unknown label, or raise a meaningful business error.
Pitfall Modifying a list while iterating over it

for i, item in enumerate(data): if bad(item): data.pop(i) — skips items because indices shift as elements are removed, silently letting bad data through.

Fix Filter with a comprehension: data = [item for item in data if not bad(item)]. Cleaner, faster, and correct. Never modify a collection you are currently iterating.

Build a dict from sorted unique values: label2id = {label: idx for idx, label in enumerate(sorted(set(labels)))}. Encode with a comprehension: [label2id[l] for l in data]. Sorted order makes encoding reproducible across runs. For unknown labels at inference, use .get(label, -1). For production, serialise the mapping dict to JSON so encoding is consistent between training and serving. LabelEncoder from scikit-learn does this automatically and is the preferred option in sklearn pipelines.

list: O(n) — linear scan through all elements. set: O(1) average — hash lookup. dict key check: O(1) average — hash lookup. This matters for vocabulary lookups, stop-word filtering, and deduplication over large datasets. Rule: if you test membership more than once, convert to set first. The O(n) conversion is paid once; every subsequent check is O(1).

defaultdict(list) is ideal for group-by accumulation — no KeyError if the key is new, just creates an empty list. defaultdict(int) builds counters. defaultdict(set) builds inverted indexes (word → document IDs). Use a regular dict when keys are known in advance and a missing key should raise (an error signals a bug). Use .get() with a default when absence is expected but you should not auto-create keys.

None vs NaN — Missing Data Handling
pd.isna(), fillna, dropna, comparison traps, model input validation before predict()

None is Python's null singleton. NaN is IEEE 754 "not a number" — a float value. In Pandas both represent missing data but behave differently. The critical trap: np.nan == np.nan is always False by IEEE 754 design. Always use pd.isna() to check for missing values, never ==. Before feeding data to a model, validate and handle missing values explicitly — sklearn raises ValueError on NaN inputs by default.

Python — None vs NaN, pd.isna, fillna, dropna, pre-predict validation
import numpy as np, pandas as pd

# ── None vs NaN comparison ────────────────────────────────
print(None == None)         # True
print(np.nan == np.nan)     # False  ← IEEE 754: NaN != NaN always
print(np.nan is np.nan)     # True   ← same object in CPython

value = np.nan
print(value == np.nan)      # False — always False, even when it IS NaN!
print(pd.isna(value))       # True  ✓  use pd.isna()

# pd.isna handles both None and NaN uniformly
print(pd.isna(None))        # True
print(pd.isna(np.nan))      # True
print(pd.isna(0))           # False
print(pd.isna(""))          # False — empty string is NOT missing!

# ── Handling missing values in a DataFrame ────────────────
df = pd.DataFrame({
    "age":    [25.0, None, 30.0, np.nan],
    "income": [50000, 60000, None, 80000],
    "label":  [1, 0, 1, 0],
})

print(df.isna().sum())          # missing count per column

# Strategy 1: drop rows with any missing value
df_clean = df.dropna()

# Strategy 2: fill with statistics
df["age"]    = df["age"].fillna(df["age"].median())
df["income"] = df["income"].fillna(df["income"].mean())

# ── Validate before model.predict() ──────────────────────
def validate_features(X: pd.DataFrame) -> None:
    missing = X.isna().any()
    if missing.any():
        cols = missing[missing].index.tolist()
        raise ValueError(f"Missing values in columns: {cols}")
    if not np.isfinite(X.to_numpy(dtype=float)).all():
        raise ValueError("Non-finite values (inf / -inf) detected")

validate_features(df[["age", "income"]])   # must pass before predict()
Pitfall Checking for NaN with == or is

x == np.nan is always False — even when x is NaN. This is IEEE 754 behaviour. Checking np.nan is np.nan is True only because CPython reuses the same object, but this is not guaranteed by the language.

Fix Use pd.isna(x) for Pandas / mixed types, np.isnan(x) for NumPy floats (raises on None), or math.isnan(x) for pure Python floats. pd.isna() is the safest — it handles None, np.nan, pd.NaT, and pd.NA.
Pitfall Treating empty string as missing

pd.isna("") returns False — an empty string is not considered missing by Pandas. Text columns loaded from CSV may use "" as the missing marker, silently passing your isna() validation.

Fix Replace empty strings explicitly: df.replace("", np.nan, inplace=True). Add this to your standard data loading pipeline before any missing-value checks.
Pitfall Not checking for inf after division or log operations

1.0 / 0.0 returns inf in NumPy (no exception for floats). Inf values propagate silently through arithmetic and can produce NaN in softmax or log operations, crashing model training with an opaque loss=nan.

Fix After any division or log transform: np.isfinite(arr).all(). Replace inf: arr = np.where(np.isfinite(arr), arr, 0.0). Add this check to your validate_features() utility.

None is Python's null object — appears in object-dtype columns (strings, mixed types). NaN is IEEE 754 "not a number" — appears in float64 columns. Pandas uses NaN for numeric missing because None cannot be stored in a NumPy float array. When you assign None to a float column, Pandas silently converts it to NaN. pd.isna() detects both, making it the universal safe check. Pandas 1.0+ introduces pd.NA for nullable integer and boolean extensions.

sklearn estimators raise ValueError on NaN by default. Strategy: (1) dropna() if missing is rare and random; (2) fillna(median/mean) for numeric features; (3) use SimpleImputer inside a Pipeline so imputation statistics are fit on training data only and applied consistently at inference — preventing data leakage. For tree models like XGBoost and LightGBM that handle NaN natively, pass NaN directly and let the model learn missingness as a split criterion.

By IEEE 754 standard, NaN is defined as not equal to any value, including itself — two undefined results (0/0, sqrt(-1)) are not necessarily the same undefined result. Python floats follow IEEE 754, so float("nan") != float("nan") is True. The correct checks are math.isnan(), np.isnan(), or pd.isna(). NaN also cannot be used as a dict key or set member — hashing would be inconsistent with equality.

Most ML bugs are not algorithmic. They are float precision in a metric threshold, None vs NaN confusion in a missing-value check, or a list membership test that silently runs in O(n) for 1M tokens. These basics prevent a week of debugging.
02

Functions, Comprehensions & Decorators for Data Work

Functions are the building blocks of data pipelines. Comprehensions process data concisely and fast. Decorators add retry, caching, and profiling without cluttering business logic. These are daily tools for production DS/AI code.

Data Transformation Functions
Single-responsibility transforms, pipeline composition, *args/**kwargs for flexible configs

In data work, functions should do one thing: normalise, encode, clip, or validate. Compose them in sequence rather than writing monolithic preprocessing blocks. Use *args for transforms that operate on variable numbers of columns, **kwargs for passing model hyperparameters through a config dict. Keep functions pure (same input → same output, no side effects) — pure functions are easy to test and safe to parallelise with multiprocessing.

Python — preprocessing functions, pipeline composition, **kwargs for model config
import pandas as pd, numpy as np
from functools import partial, reduce
from typing import Callable

# ── Single-responsibility transforms ─────────────────────
def clip_outliers(series: pd.Series, lower=0.01, upper=0.99) -> pd.Series:
    lo, hi = series.quantile(lower), series.quantile(upper)
    return series.clip(lo, hi)

def log_transform(series: pd.Series) -> pd.Series:
    return np.log1p(series)         # log(1 + x) — safe for zeros

def standardise(series: pd.Series) -> pd.Series:
    return (series - series.mean()) / series.std()

# ── Pipeline composition ──────────────────────────────────
def apply_pipeline(series: pd.Series,
                   *transforms: Callable[[pd.Series], pd.Series]) -> pd.Series:
    return reduce(lambda s, fn: fn(s), transforms, series.copy())

income = pd.Series([0, 150000, 1_000_000, 50000, 75000])
income_clean = apply_pipeline(income, clip_outliers, log_transform, standardise)

# ── **kwargs for passing model config ─────────────────────
def train_model(X_train, y_train, model_class, **model_params):
    model = model_class(**model_params)
    model.fit(X_train, y_train)
    return model

from sklearn.ensemble import RandomForestClassifier
config = {"n_estimators": 200, "max_depth": 10, "random_state": 42}
model = train_model(X_train, y_train, RandomForestClassifier, **config)

# ── Partial application: pre-configure transforms ─────────
clip_top5pct = partial(clip_outliers, upper=0.95)
age_clean    = apply_pipeline(df["age"], clip_top5pct, standardise)

# ── Mutable default argument bug (common in data code) ────
def add_feature(df, new_cols=[]):   # BUG: shared across calls
    new_cols.append("new")          # silently grows each call

def add_feature_fixed(df, new_cols=None):   # CORRECT
    if new_cols is None: new_cols = []
    new_cols.append("new")
    return new_cols
Pitfall Functions that modify DataFrames in place and return None

def preprocess(df): df["col"] = ... — with inplace=True Pandas operations it is common to forget the return. The caller gets None and raises AttributeError on the next line.

Fix Always return the result: def preprocess(df): df = df.copy(); ...; return df. Use .copy() inside the function to avoid mutating the caller's DataFrame. Treat DataFrames as immutable inside transforms.
Pitfall Using apply() with a lambda for numeric column transforms

df["col"].apply(lambda x: x * 2) is Python-level iteration — 100–1000x slower than vectorised NumPy/Pandas operations for numeric data.

Fix Use vectorised operations: df["col"] * 2 or np.log1p(df["col"]). Reserve apply() for string processing or complex custom transforms that have no vectorised equivalent.
Pitfall Mutable default argument in preprocessing functions

def extract_features(df, feature_list=[]): feature_list.append("new") — feature_list is evaluated once at function definition and shared across all calls, growing silently.

Fix Use None as the default sentinel: if feature_list is None: feature_list = []. This creates a fresh list on every call that does not supply an argument.

Two approaches: (1) function pipeline — a list of callables applied with reduce(lambda df, fn: fn(df), steps, initial_df); (2) scikit-learn Pipeline — Pipeline([("impute", SimpleImputer()), ("scale", StandardScaler()), ("clf", model)]) which handles fit/transform separation, preventing data leakage. Prefer sklearn Pipeline for production: it serialises with joblib, integrates with GridSearchCV, and guarantees that transform statistics are fit on training data only.

*args collects extra positional arguments as a tuple — use for functions that accept a variable number of inputs (apply_pipeline(*transforms)). **kwargs collects extra keyword arguments as a dict — use for forwarding hyperparameters to sklearn, PyTorch, or XGBoost without knowing their names in advance. In practice, **config is the standard pattern for building model-agnostic training wrappers.

Pure functions (same input → same output, no side effects) are: (1) safe to parallelise with multiprocessing.Pool.map() — no shared state to corrupt; (2) trivial to unit test — assert output == expected; (3) composable in any order; (4) cacheable with lru_cache or joblib.Memory. Side effects like logging belong at the pipeline boundary. If a transform must log, use the logging module with the function name — not print().

Comprehensions & Generators for Data
Feature extraction with comprehensions, memory-efficient batch processing with generators

List comprehensions are faster than equivalent for loops (implemented in a tight C loop internally) and are the idiomatic way to transform sequences. Generator expressions compute one element at a time — O(1) memory — making them essential for processing large files, streaming datasets, or computing embeddings without OOM. Understanding when to materialise (list) vs keep lazy (generator) is a practical production skill.

Python — feature extraction, generator batching, dict comprehensions
# ── List comprehension: feature extraction ───────────────
records = [
    {"text": "great product", "stars": 5},
    {"text": "terrible",      "stars": 1},
    {"text": "it's ok",       "stars": 3},
]

texts    = [r["text"]           for r in records]
ratings  = [r["stars"]          for r in records]
is_pos   = [r["stars"] >= 4     for r in records]   # binary label
lengths  = [len(r["text"])      for r in records]   # text length feature

# Dict comprehension: build label encoding in one line
words  = ["cat", "dog", "cat", "bird"]
vocab  = {w: i for i, w in enumerate(sorted(set(words)))}
# {'bird': 0, 'cat': 1, 'dog': 2}

# ── Generator: memory-efficient large-file processing ─────
import csv

def batch_csv(filepath: str, batch_size: int = 1000):
    """Yield batches of rows — O(batch_size) memory, not O(file)."""
    with open(filepath, encoding="utf-8") as f:
        reader = csv.DictReader(f)
        batch = []
        for row in reader:
            batch.append(row)
            if len(batch) == batch_size:
                yield batch
                batch.clear()
        if batch:
            yield batch

# Process a 10M-row file with constant memory
for chunk in batch_csv("train.csv", batch_size=500):
    feats = extract_features(chunk)
    model.partial_fit(feats)         # online/incremental learning

# ── Generator expression: lazy embedding pipeline ─────────
texts = load_all_texts()             # could be millions
# Compute embeddings one-by-one — never all in RAM
embedding_gen = (embed(t) for t in texts)

import numpy as np
# Materialise only when you need the full matrix
X = np.array(list(embedding_gen))   # fits in memory? materialise.
Pitfall Consuming a generator twice

gen = (x*2 for x in range(5)); list(gen) → [0,2,4,6,8]; list(gen) → [] — the second call returns nothing because the generator is exhausted. Silently empties your dataset.

Fix If you need to iterate multiple times, materialise with list(gen) once and reuse the list. Or wrap in a function that returns a fresh generator each time it is called.
Pitfall list() on a large generator defeats the purpose

embeddings = list(embed(t) for t in million_texts) loads all embeddings into RAM at once — exactly what the generator was meant to avoid. This causes OOM on large datasets.

Fix Process in the generator loop: for emb in (embed(t) for t in texts): store(emb). Or use batch generators and process chunk-by-chunk. Only materialise when the full result fits comfortably in memory.
Pitfall Deeply nested comprehensions that are unreadable

[f(x) for row in matrix for x in row if pred(x)] with another nesting level becomes unmaintainable. The cleverness costs more in future debugging than it saves now.

Fix Extract the inner loop into a named helper function or break into two list comprehensions. Code readability in a team setting beats one-liner cleverness every time.

Use a generator when: (1) the dataset does not fit in memory; (2) you process items one at a time (streaming inference, online learning); (3) you iterate only once; (4) items are expensive to compute and early stopping is possible. Use a list when: you need random access, you iterate multiple times, or you need the full dataset for batch NumPy operations. Rule: default to generators for file I/O and API calls; materialise to list only when the data is small enough and you need to reuse it.

List comprehensions execute in a tight C loop inside CPython, avoiding the bytecode dispatch overhead for each iteration. They also pre-allocate the result list, avoiding repeated list reallocation from .append(). Benchmark: for a simple scalar transform on 1M items, a comprehension is typically 1.5–2x faster than an explicit loop with .append(). For numeric data, NumPy vectorised operations are faster still — they avoid Python-level iteration entirely.

A generator expression is a concise one-liner: (expr for x in iterable if cond). A generator function uses yield inside a def block and can have complex multi-step logic, multiple yields, cleanup in finally, and yield from for delegation. Both return a generator object; both are lazy. Use expressions for simple one-step transformations. Use generator functions when you need: state between yields, try/finally cleanup, multiple yield points, or yield from to delegate to a sub-generator.

Decorators for DS/AI Code
@timer for profiling, @retry for LLM APIs, @lru_cache for embedding caching

Three decorator patterns every DS/AI engineer uses in production: @timer for profiling preprocessing and inference bottlenecks, @retry with exponential backoff for LLM API rate limits and transient errors, @lru_cache for memoising expensive computations like embeddings. All three are implemented with functools.wraps to preserve the original function's identity in logs and tooling.

Python — @timer profiler, @retry for LLM APIs, @lru_cache for embeddings
import functools, time, logging
from typing import Any

log = logging.getLogger(__name__)

# ── @timer: profile any function ─────────────────────────
def timer(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        t0 = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - t0
        log.info(f"{func.__name__} completed in {elapsed:.3f}s")
        return result
    return wrapper

@timer
def preprocess_batch(texts: list[str]) -> list[str]:
    return [t.lower().strip() for t in texts]

# ── @retry: LLM API with exponential backoff ─────────────
def retry(max_attempts: int = 3, delay: float = 1.0,
          exceptions: tuple = (Exception,)):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exc = RuntimeError("No attempts made")
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exc = e
                    wait = delay * (2 ** attempt)
                    log.warning(f"{func.__name__} attempt {attempt+1} failed: {e}."
                                f" Retrying in {wait:.1f}s...")
                    if attempt < max_attempts - 1:
                        time.sleep(wait)
            raise last_exc
        return wrapper
    return decorator

import openai

@retry(max_attempts=5, delay=2.0,
       exceptions=(openai.RateLimitError, openai.APITimeoutError))
def call_llm(prompt: str) -> str:
    resp = openai.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    return resp.choices[0].message.content

# ── @lru_cache: memoise embedding computations ────────────
from functools import lru_cache

@lru_cache(maxsize=10_000)    # str is hashable — works directly
def get_embedding(text: str) -> tuple[float, ...]:
    return tuple(embedding_model.encode(text))  # tuple is hashable

import numpy as np
embed_text = lambda t: np.array(get_embedding(t))  # back to array

print(get_embedding.cache_info())
# CacheInfo(hits=42, misses=100, maxsize=10000, currsize=100)
Pitfall Caching functions that take mutable arguments

@lru_cache requires all arguments to be hashable. Passing a list, dict, or numpy array raises TypeError: unhashable type. Many DS/AI functions take arrays or DataFrames as input.

Fix Cache at a higher level where inputs are strings or tuples (hashable). For array inputs, convert to tuple: lru_cache works on tuple(arr.tobytes()). For disk-level caching, use joblib.Memory which handles arbitrary objects.
Pitfall Retry without jitter in high-concurrency systems

If 50 workers all hit a rate limit and retry with the same fixed exponential delay, they all hammer the API again at the same moment — a thundering herd. The rate limit fires again immediately.

Fix Add jitter: wait = delay * (2 ** attempt) + random.uniform(0, 1). This spreads retries across time. The tenacity library handles jitter, stop conditions, and retry hooks out of the box.
Pitfall Forgetting @functools.wraps on the inner wrapper

Without @wraps, the decorated function's __name__ becomes "wrapper". Production logs show "wrapper completed in 0.5s" instead of "preprocess_batch completed in 0.5s". Debugging becomes opaque.

Fix Always apply @functools.wraps(func) to the inner wrapper function. This preserves __name__, __doc__, and sets __wrapped__ = func, enabling inspect.unwrap() to recover the original.

Implement a parametrised @retry decorator catching specific exception types (RateLimitError, APITimeoutError) with exponential backoff and jitter. Or use the tenacity library: @retry(wait=wait_exponential(min=1, max=60), stop=stop_after_attempt(5), retry=retry_if_exception_type(RateLimitError)). The OpenAI Python SDK v1.x also has built-in retries via the max_retries parameter. Always set a max_attempts ceiling, log each retry at WARNING level, and re-raise the last exception if all retries fail.

For in-process caching of string inputs: @lru_cache(maxsize=N) works directly since strings are hashable. For persistent caching across runs: joblib.Memory caches function results to disk keyed by arguments — works with arrays. For production multi-process/multi-machine: cache in Redis, keying on a hash of the text (md5/sha256). Always invalidate the cache when the embedding model version changes — otherwise stale embeddings from the old model pollute the index.

@functools.wraps(func) copies __name__, __qualname__, __doc__, __module__, __annotations__ from the wrapped function to the wrapper, and sets __wrapped__ = func. Without it: production logs show "wrapper" instead of the real function name, help() shows no documentation, FastAPI generates wrong operation IDs, and pytest fixtures may not resolve. It also enables inspect.unwrap(decorated_fn) to recover the original function — useful for testing the un-decorated behaviour.

Essential Standard Library for Data Work
pathlib for file paths, json for configs/results, csv chunking, logging for experiment tracking

Four standard library modules every DS/AI engineer uses daily: pathlib for readable, cross-platform file path operations; json for reading/writing model configs and evaluation results; csv with chunked iteration for large files; logging for structured experiment tracking that replaces print(). These form the plumbing of data pipelines — mastering them prevents brittle, platform-specific, hard-to-debug code.

Python — pathlib, json results, csv chunking, logging for ML
from pathlib import Path
import json, csv, logging
from datetime import datetime

# ── pathlib: readable cross-platform paths ────────────────
DATA_DIR  = Path("data") / "raw"          # works on Win & Linux
MODEL_DIR = Path("models")
RESULTS   = Path("results") / "metrics.json"

MODEL_DIR.mkdir(parents=True, exist_ok=True)  # create if missing

csvs = list(DATA_DIR.glob("*.csv"))           # find all CSVs

# Read and write without open()
text   = (DATA_DIR / "readme.txt").read_text(encoding="utf-8")
config = {"lr": 0.001, "epochs": 50}
(MODEL_DIR / "config.json").write_text(json.dumps(config, indent=2))

# ── json: persist model results ───────────────────────────
results = {
    "model":      "RandomForest",
    "accuracy":   0.924,
    "f1":         0.891,
    "trained_at": datetime.now().isoformat(),
    "params":     {"n_estimators": 200, "max_depth": 10},
}

with open(RESULTS, "w", encoding="utf-8") as f:
    json.dump(results, f, indent=2)

with open(RESULTS, encoding="utf-8") as f:
    loaded = json.load(f)
print(loaded["accuracy"])     # 0.924

# ── csv: chunked iteration for large files ────────────────
def iter_csv_chunks(path: Path, chunk_size: int = 5000):
    with open(path, encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        batch = []
        for row in reader:
            batch.append(row)
            if len(batch) >= chunk_size:
                yield batch
                batch.clear()
        if batch: yield batch

for chunk in iter_csv_chunks(DATA_DIR / "train.csv"):
    df = pd.DataFrame(chunk)   # process chunk, not full file

# ── logging: structured experiment tracking ───────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
    handlers=[logging.FileHandler("run.log"), logging.StreamHandler()],
)
log = logging.getLogger("trainer")

log.info("Training started: model=rf, n_samples=50000")
log.warning("Class imbalance: positive_rate=%.3f", 0.05)

for epoch in range(10):
    loss = train_one_epoch()
    log.info(f"Epoch {epoch+1:02d} | loss={loss:.4f}")
Pitfall Using print() instead of logging in production ML code

print("Training complete") has no severity level, no timestamp, no file handler, and cannot be filtered or redirected. In production you cannot distinguish an info message from an error, and logs cannot be ingested by aggregation systems.

Fix Use logging.getLogger(__name__) and the appropriate level: info for milestones, warning for recoverable issues, error for failures. Configure once at the entry point; all module loggers inherit the configuration.
Pitfall Hardcoded file paths with string concatenation

"data/" + "raw/" + filename fails on Windows (backslash separator), breaks on extra/missing slashes, and is hard to refactor when directories change.

Fix Use Path("data") / "raw" / filename. pathlib handles separator differences across OS, resolves relative paths, and makes path manipulation readable. Pass Path objects directly to open(), json.dump(), etc.
Pitfall Loading an entire large CSV with pd.read_csv() without chunking

pd.read_csv("10GB_file.csv") loads all rows into RAM. On a machine with 16GB RAM this causes OOM or severe swapping, crashing the job silently overnight.

Fix Use pd.read_csv(path, chunksize=10000) for an iterator of DataFrames, or the csv module with a generator as shown above. For very large datasets, consider Polars or DuckDB which process data lazily without loading into memory.

Configure once at the entry point: logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s", handlers=[FileHandler("run.log"), StreamHandler()]). In each module: log = logging.getLogger(__name__). Log training milestones at INFO, anomalies at WARNING, failures at ERROR. For structured JSON logging, use python-json-logger. For experiment tracking beyond logging, use MLflow or Weights & Biases to record metrics, parameters, and artefacts with hyperlink dashboards.

Three approaches: (1) csv module with a generator — yield rows or batches from a DictReader loop, keeping O(batch_size) memory; (2) pandas chunksize — pd.read_csv(path, chunksize=N) returns an iterator of DataFrames; (3) Polars lazy frame — pl.scan_csv(path).filter(...).collect() processes without loading the full file. For analytical queries on very large files (>10GB), DuckDB is fastest: duckdb.execute("SELECT * FROM read_csv('file.csv') WHERE label = 1").

pathlib provides an object-oriented interface. Advantages: (1) / operator for joining — Path("data") / "raw" / "file.csv" is readable and cross-platform; (2) .read_text(), .write_text() for simple reads/writes without open(); (3) .glob() and .rglob() for pattern matching; (4) .stem, .suffix, .parent for path components; (5) automatic OS separator handling. os.path is string-based, requires memorising many function names, and is error-prone for concatenation. pathlib is the modern standard since Python 3.4.

A well-structured data pipeline is just functions composing functions. Master these three patterns and you can build preprocessing, training, and inference pipelines that are readable, testable, and production-ready.
Data Structures & OOP 03–04
03

Built-in Data Structures for Data Work

Lists, dicts, sets, tuples, and the collections module are the building blocks of every data pipeline. Choosing the wrong one — a list where a set belongs — silently converts an O(1) operation into O(n) at scale.

Lists & Tuples in Data Work
Feature vectors, batch containers, NamedTuples for typed records, sorting predictions by score

Lists are ordered mutable sequences — the default container for feature vectors, batch items, and accumulated results. Tuples are immutable — use them for fixed-length records (label, confidence pairs) and as dict keys or set members. NamedTuple combines tuple performance with named field access, producing lightweight typed records with _asdict() for JSON serialisation. batch.clear() empties a list in-place in O(1) — more efficient than batch = [] in hot loops.

Python — NamedTuple records, sorting by confidence, batch accumulation
from typing import NamedTuple

# ── NamedTuple: typed readable prediction records ─────────
class Prediction(NamedTuple):
    label:       str
    confidence:  float
    latency_ms:  float

pred = Prediction("dog", 0.91, 12.5)
print(pred.label)       # "dog"
print(pred.confidence)  # 0.91
print(pred._asdict())   # {'label': 'dog', 'confidence': 0.91, 'latency_ms': 12.5}

# ── Rank predictions by confidence ────────────────────────
preds = [
    Prediction("cat",  0.72, 11.2),
    Prediction("dog",  0.91, 12.5),
    Prediction("bird", 0.43, 10.8),
]
ranked = sorted(preds, key=lambda p: p.confidence, reverse=True)
top1   = ranked[0]   # Prediction(label='dog', confidence=0.91, ...)

# Single best: max() is O(n) — faster than sort for top-1
best = max(preds, key=lambda p: p.confidence)

# ── Slicing for train/val/test split ──────────────────────
data = list(range(1000))
n    = len(data)
train = data[:int(0.70 * n)]
val   = data[int(0.70 * n):int(0.85 * n)]
test  = data[int(0.85 * n):]

# ── Batch accumulation with clear() ──────────────────────
BATCH_SIZE = 64
batch: list = []
for record in data_stream:
    batch.append(record)
    if len(batch) >= BATCH_SIZE:
        flush_batch(batch)
        batch.clear()          # O(1) in-place — faster than batch = []
if batch:
    flush_batch(batch)         # handle final partial batch
Pitfall Using list for membership testing in data loops

if word in stop_words_list — O(n) scan per check. For 50,000 stop words and 1M tokens, that is 50 billion comparisons. Code is correct but catastrophically slow.

Fix Convert once: stop_words = set(stop_words_list). Every "in" check is then O(1). The O(n) conversion cost is paid once at startup; all subsequent lookups are O(1).
Pitfall Using sort() on the full list to get top-k results

sorted(scores, reverse=True)[:k] sorts all n items to get k — O(n log n). For n=1M and k=10 you sort a million items unnecessarily.

Fix Use heapq.nlargest(k, scores) — O(n log k), much faster when k << n. For top-1 only, max() is O(n) — fastest of all.
Pitfall batch = [] inside a loop creates a new list object each iteration

Allocating a new list and leaving the old one for garbage collection adds GC pressure in tight processing loops at scale.

Fix Use batch.clear() — empties the list in-place in O(1) without creating a new object. Reuse the same list buffer across batch iterations.

NamedTuple when: you need tuple behaviour (positional unpacking, indexing, hashable for set/dict use), the data is immutable, and you want _asdict() for JSON serialisation. @dataclass when: you need mutability, custom methods (to_json, validate), __post_init__ validation, or inheritance. For simple immutable records like (label, confidence, latency), NamedTuple is lighter and just as readable. For complex mutable configs with validation, @dataclass wins.

append(): O(1) amortised. pop() from end: O(1). pop(0) or insert(0, x): O(n) — shifts all elements. "x in list": O(n) linear scan. sort(): O(n log n) stable Timsort. slice [a:b]: O(b-a). list(range(n)): O(n). For O(1) pop from the front, use collections.deque. For O(1) membership, use a set or dict. These differences matter in hot data preprocessing loops at scale.

heapq.nlargest(k, predictions, key=lambda p: p.confidence) is O(n log k) — optimal when k << n. For top-1: max(predictions, key=...) is O(n). For full ranking: sorted(..., reverse=True) is O(n log n). For numpy arrays: np.argsort(scores)[-k:][::-1] or np.argpartition(scores, -k)[-k:] which is O(n) for the partition step (indices only, no stable sort).

Dictionaries — Hash Maps for Data
O(1) encoder lookups, Counter for class distribution, nested result dicts, config merging

Python dicts are hash maps with O(1) average-case lookup, insertion, and deletion. In DS/AI: vocabulary encoders, label maps, model config storage, and experiment result tracking. Counter (a dict subclass) handles frequency analysis — class distribution, token counts, most-common labels. Dict comprehensions build encoders in one line. Python 3.9+ merge operator | makes config inheritance clean without mutation.

Python — Counter for class distribution, model result dicts, config merging
from collections import Counter

# ── Counter: class distribution analysis ─────────────────
y_train = ["pos"] * 500 + ["neg"] * 150 + ["neutral"] * 350
dist    = Counter(y_train)
print(dist.most_common())   # [('pos', 500), ('neutral', 350), ('neg', 150)]

total = sum(dist.values())
# Inverse-frequency class weights for imbalanced training
class_weights = {k: total / (len(dist) * v) for k, v in dist.items()}
# {'pos': 0.67, 'neutral': 0.95, 'neg': 2.22}

# Token frequency for NLP vocabulary
tokens = "the quick brown fox jumps over the lazy dog".split()
freq   = Counter(tokens)
top3   = freq.most_common(3)   # [('the', 2), ('quick', 1), ...]

# ── Nested dict for model comparison ─────────────────────
results = {
    "random_forest": {"accuracy": 0.924, "f1": 0.891, "train_s": 12.3},
    "xgboost":       {"accuracy": 0.941, "f1": 0.908, "train_s": 45.1},
    "logistic_reg":  {"accuracy": 0.871, "f1": 0.855, "train_s":  0.8},
}
best    = max(results, key=lambda m: results[m]["accuracy"])
best_acc = results[best]["accuracy"]
print(f"Best: {best} ({best_acc:.3f})")

# Safe nested access — no KeyError on missing key
xgb_f1 = results.get("gbm", {}).get("f1", 0.0)  # 0.0 — key missing

# ── Config inheritance (Python 3.9+) ─────────────────────
base_cfg  = {"lr": 0.001, "epochs": 50, "batch": 32, "seed": 42}
overrides = {"lr": 0.01,  "epochs": 10}
final_cfg = base_cfg | overrides   # overrides win; base not mutated
# {'lr': 0.01, 'epochs': 10, 'batch': 32, 'seed': 42}

# Pre-3.9 equivalent: {**base_cfg, **overrides}

# ── Simple in-memory feature store ───────────────────────
_cache: dict = {}
def get_features(user_id: str) -> dict:
    if user_id not in _cache:
        _cache[user_id] = compute_features(user_id)
    return _cache[user_id]
Pitfall Counter("hello") counts characters, not words

Counter("the cat sat") returns character frequencies — Counter({" ": 2, "t": 3, ...}). A very common NLP mistake when doing word frequency analysis.

Fix Always split first: Counter(text.split()) for whitespace-delimited word counts, or Counter(text.lower().split()) for case-insensitive. For proper tokenisation, use nltk or spaCy.
Pitfall KeyError on missing model name in result dicts

results["gbm"]["accuracy"] raises KeyError if "gbm" was never added. In evaluation pipelines this crashes the reporting step after a long training run.

Fix Use results.get("gbm", {}).get("accuracy", 0.0) for safe nested access. Or use defaultdict(dict) so missing keys return an empty dict automatically.
Pitfall Using dict.update() when you want a new merged dict

base_cfg.update(overrides) modifies base_cfg in place. If base_cfg is a shared default, this mutates it for all future callers that expected the original defaults.

Fix Use final = base_cfg | overrides (Python 3.9+) or final = {**base_cfg, **overrides} to produce a new merged dict without mutating either source.

O(1) average for lookup, insert, and delete — Python dicts use hash tables. The key is hashed to a bucket index; lookup reads that bucket directly without scanning other keys. Worst case is O(n) due to hash collisions, but Python's hash implementation (SipHash, perturbation) makes collisions extremely rare in practice. Only hashable (immutable) objects can be dict keys: str, int, float, tuple, frozenset.

Counter(y_train) gives exact class counts in one pass. dist.most_common() ranks classes. Imbalance ratio = max_count / min_count — if > 10, consider class weighting or oversampling. For sklearn: pass class_weight="balanced" to let the estimator compute weights automatically as total / (n_classes * class_count). For manual weights, pass to the sample_weight argument in model.fit(). Always check class balance before any classification task — a 99/1 split means a model that always predicts the majority achieves 99% accuracy.

Python 3.9+: final = base | override — clean, returns a new dict, neither source is mutated, override keys win. Python 3.5+: final = {**base, **override} — equivalent. For deep merging (nested dicts): the | operator only merges one level — nested dicts are replaced, not merged. Write a recursive merge or use the deepmerge library for nested configs. Never use update() when you need the original preserved; it mutates in place.

Sets & Set Algebra for Data Work
O(1) membership, vocabulary analysis, stop-word filtering, train/test leakage detection

Sets are unordered hash sets with O(1) membership testing, union (|), intersection (&), and difference (-). In DS/AI: build vocabularies, filter stop words in O(1) per token, deduplicate IDs, and use set algebra to detect data leakage (the same IDs in both train and test). frozenset is immutable and hashable — usable as a dict key for caching feature combination scores.

Python — vocabulary ops, stop-word filter, leakage detection, frozenset keys
# ── Vocabulary analysis with set algebra ─────────────────
train_vocab = set("the cat sat on the mat".split())
test_vocab  = set("the dog ran on the mat quickly".split())

oov       = test_vocab - train_vocab      # words only in test
overlap   = test_vocab & train_vocab      # words in both
all_vocab = test_vocab | train_vocab      # union

coverage  = len(overlap) / len(test_vocab)
print(f"Test vocab coverage: {coverage:.1%}")   # 57.1%
print(f"OOV words: {oov}")                       # {'dog', 'ran', 'quickly'}

# ── Stop-word filtering — O(1) per token ─────────────────
STOP_WORDS = {"the", "a", "an", "is", "are", "was", "in", "on", "of"}

tokens  = "the cat sat on the mat in the garden".split()
clean   = [t for t in tokens if t not in STOP_WORDS]
# ['cat', 'sat', 'mat', 'garden']

# ── Deduplication — preserving insertion order ────────────
urls = ["a.com", "b.com", "a.com", "c.com"]
unique = list(dict.fromkeys(urls))    # preserves order (Python 3.7+)
# ['a.com', 'b.com', 'c.com']

# ── Data leakage detection ────────────────────────────────
train_ids = {101, 102, 103, 104, 105}
test_ids  = {104, 105, 106, 107, 108}
leakage   = train_ids & test_ids

if leakage:
    raise ValueError(f"Data leakage: {len(leakage)} IDs in both splits: {leakage}")

# ── frozenset as hashable dict key ────────────────────────
feature_cache: dict = {}

def score_feature_combo(features: frozenset[str]) -> float:
    if features not in feature_cache:
        feature_cache[features] = run_ablation(features)
    return feature_cache[features]

result = score_feature_combo(frozenset({"age", "income", "education"}))
Pitfall Using a list literal for stop words

STOP_WORDS = ["the", "a", ...] — "token in STOP_WORDS" is O(n) per check. With 10K stop words and 10M tokens, you perform 100 billion element comparisons silently.

Fix STOP_WORDS = {"the", "a", ...} — a set literal (curly braces, no key:value). Or convert at module level: STOP_WORDS = set(stop_word_list). Every membership check is then O(1).
Pitfall Relying on set iteration order

for word in vocab_set: iterates in hash-table insertion order, which is implementation-dependent and can differ across Python versions and environments.

Fix If order matters, sort explicitly: sorted(vocab_set). For reproducible vocabulary indices: {w: i for i, w in enumerate(sorted(vocab_set))} — sorting before ID assignment guarantees identical mappings across runs.
Pitfall Adding unhashable items (list, array) to a set

vocab.add(["word", "phrase"]) raises TypeError: unhashable type: list. Lists, dicts, and NumPy arrays cannot be set members because their hash could change if their contents change.

Fix Convert to a hashable type: vocab.add(tuple(phrase_list)). For feature sets, use frozenset. For numpy arrays: tuple(arr.tolist()) if they are small; hash the bytes for large arrays.

Convert IDs to sets and check the intersection: leakage = set(train_ids) & set(test_ids). If len(leakage) > 0, examples appear in both splits. This is critical for: user-level splits (same user in train and test means the model memorises the user), time-series splits (future data leaking into the past), and row-level augmentation (augmented copies of the same original appearing in both folds). For stratified group splits, use GroupKFold which guarantees no group appears in both train and validation.

set is mutable — you can add(), discard(), update(), and clear() it. frozenset is immutable — created once, cannot be modified. Because it is immutable, frozenset is hashable and can be a dict key or a set member (a set of frozensets). Use frozenset for: feature combination caching keyed by the set of features used, elements of a set-of-sets (e.g., frequent itemsets in association rule mining), or any context requiring hashability. frozenset is created with frozenset(iterable) — there is no {} literal syntax for frozenset.

Step 1: collect unique tokens — use a set during streaming (O(1) add). Step 2: filter by frequency — Counter(all_tokens).most_common(50000) keeps top-N tokens. Step 3: sort before assigning IDs — sorted(vocab) guarantees alphabetical order, producing identical mappings across runs and environments. Final: word2id = {w: i for i, w in enumerate(sorted(vocab))}. Never skip sorting: hash table iteration order is not guaranteed to be consistent across Python versions.

collections Module for Data Work
Counter for frequency, defaultdict for group-by & inverted index, deque for sliding windows, heapq for top-k

The collections module provides specialised containers: Counter for frequency analysis and class distribution, defaultdict for group-by accumulation without KeyError, deque(maxlen=N) for sliding-window time-series features (O(1) append and auto-eviction), heapq.nlargest/nsmallest for top-k retrieval without full sorting.

Python — Counter, defaultdict inverted index, deque window, heapq top-k
from collections import Counter, defaultdict, deque
import heapq

# ── Counter: class distribution ───────────────────────────
y_train = ["pos"] * 500 + ["neg"] * 150 + ["neutral"] * 350
dist    = Counter(y_train)
print(dist.most_common())   # [('pos', 500), ('neutral', 350), ('neg', 150)]

total = sum(dist.values())
class_weight = {k: total / (len(dist) * v) for k, v in dist.items()}
# {'pos': 0.67, 'neutral': 0.95, 'neg': 2.22}

# ── defaultdict: group-by ─────────────────────────────────
records = [("alice", "nlp"), ("bob", "cv"), ("alice", "mlops")]
by_person = defaultdict(list)
for person, topic in records:
    by_person[person].append(topic)
# {'alice': ['nlp', 'mlops'], 'bob': ['cv']}

# ── defaultdict: inverted index ───────────────────────────
corpus  = {0: "cat sat mat", 1: "dog ran away", 2: "cat and dog"}
inverted = defaultdict(set)
for doc_id, text in corpus.items():
    for token in text.split():
        inverted[token].add(doc_id)
# {'cat': {0, 2}, 'dog': {1, 2}, 'sat': {0}, ...}

# Boolean AND retrieval: docs containing "cat" AND "dog"
cat_and_dog = inverted["cat"] & inverted["dog"]   # {2}

# ── deque(maxlen): sliding window feature ─────────────────
WINDOW = 5
window = deque(maxlen=WINDOW)   # auto-evicts oldest when full
rolling_means = []
for price in stock_prices:
    window.append(price)        # old item auto-removed when maxlen exceeded
    if len(window) == WINDOW:
        rolling_means.append(sum(window) / WINDOW)

# ── heapq: top-k — O(n log k) vs sort O(n log n) ─────────
scores = [(0.72, "cat"), (0.91, "dog"), (0.43, "bird"), (0.88, "fox")]
top2   = heapq.nlargest(2, scores)    # [(0.91, 'dog'), (0.88, 'fox')]
bot2   = heapq.nsmallest(2, scores)   # [(0.43, 'bird'), (0.72, 'cat')]
Pitfall Counter on a string counts characters

Counter("the cat") → Counter({"t": 2, "h": 1, "e": 1, " ": 1, "c": 1, "a": 1}) — character frequencies, not word frequencies. This is the most common Counter mistake in NLP.

Fix Counter(text.split()) for word frequencies, Counter(text.lower().split()) for case-insensitive. For bigrams or trigrams, use nltk.ngrams or a sliding window over the token list.
Pitfall deque without maxlen grows unboundedly

window = deque() with no maxlen — every .append() adds an element indefinitely. After 1M ticks of a time-series stream, the deque holds all 1M elements, consuming all available memory.

Fix Always specify maxlen for sliding windows: deque(maxlen=N). With maxlen set, appending beyond capacity automatically removes from the left — both operations are O(1). maxlen is a required discipline for streaming data.
Pitfall Using sorted()[:k] when heapq.nlargest would be faster

sorted(million_scores, reverse=True)[:10] sorts all million items to get 10 — O(n log n). For large n and small k this is wasteful, especially in real-time inference ranking.

Fix heapq.nlargest(k, scores) is O(n log k). When k < n/log(n), this is meaningfully faster. For top-1: max() is O(n) — always the fastest single-item retrieval.

Counter: when working with pure Python lists before any Pandas is involved, when you need fast in-memory counting without DataFrame overhead, or when you want counter arithmetic (Counter + Counter adds counts, Counter - Counter removes, Counter & Counter keeps minimums — useful for co-occurrence analysis). Pandas value_counts(): when data is already a DataFrame column, when you need normalise=True for proportions, or when the result feeds into further Pandas operations. Counter is typically faster for pure Python iterables; value_counts() integrates seamlessly into Pandas pipelines.

Use defaultdict(set): for each (doc_id, text) pair, tokenise and add the doc_id to the set for each token. Retrieval: "cat AND dog" → inverted["cat"] & inverted["dog"] — documents containing both. "cat OR dog" → inverted["cat"] | inverted["dog"]. Ranking: use term frequency (Counter per document) to score results. This is the conceptual core of keyword search. For production: Elasticsearch or OpenSearch handle tokenisation, stemming, TF-IDF, distributed storage, and updates. The Python version is useful for small in-memory corpora and understanding the algorithm.

deque(maxlen=N): append() O(1) amortised, popleft() O(1), automatic eviction of oldest element when maxlen is reached. list: append() O(1) amortised, but pop(0) is O(n) — it shifts all remaining elements. For a rolling window that processes 1M time-series ticks, list.pop(0) at every step is catastrophically slow. Always use deque(maxlen=N) for sliding windows. For NumPy-based rolling statistics over large arrays, use np.convolve or pandas.rolling() which are vectorised and even faster.

Counter for class distribution, defaultdict for group-by, deque for sliding-window features, heapq for top-k retrieval — these four solve 80% of the data structure problems in production DS/AI pipelines.
04

Classes & OOP for DS/AI Engineers

In DS/AI you write classes for custom sklearn estimators, PyTorch datasets, data containers, and model wrappers. Python OOP rewards composition — build behaviour by mixing in small, focused classes rather than deep inheritance hierarchies.

Class Anatomy for Data Work
Instance vs class attributes, method chaining, @classmethod constructors, @staticmethod validators

Instance attributes (self.x) hold per-object state — fitted statistics, model parameters. Class attributes (ClassName.x) are shared constants — column lists, default configs. @classmethod receives the class as its first argument, enabling alternative constructors (from_config, from_file). @staticmethod is a pure utility function that belongs to the class but needs neither instance nor class — input validation, schema checking. Return self from fit() to enable method chaining.

Python — FeatureEngineer class, @classmethod, @staticmethod, method chaining
import pandas as pd, numpy as np

class FeatureEngineer:
    # ── Class attribute: shared constant ─────────────────
    NUMERIC_COLS    = ["age", "income", "score"]
    CATEGORICAL_COLS = ["city", "category"]

    def __init__(self, clip_outliers: bool = True):
        # ── Instance attributes: per-object state ────────
        self.clip_outliers = clip_outliers
        self._stats: dict  = {}     # filled during fit()
        self._is_fitted    = False

    def fit(self, df: pd.DataFrame) -> "FeatureEngineer":
        FeatureEngineer.validate_columns(df, self.NUMERIC_COLS)
        for col in self.NUMERIC_COLS:
            self._stats[col] = {"mean": df[col].mean(), "std": df[col].std()}
        self._is_fitted = True
        return self     # enables: fe.fit(train).transform(test)

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        if not self._is_fitted:
            raise RuntimeError("Call fit() before transform()")
        df = df.copy()
        for col in self.NUMERIC_COLS:
            mu, sd = self._stats[col]["mean"], self._stats[col]["std"]
            df[col] = (df[col] - mu) / sd
        return df

    def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
        return self.fit(df).transform(df)

    @classmethod
    def from_config(cls, config: dict) -> "FeatureEngineer":
        """Alternative constructor — build from a JSON config dict."""
        return cls(clip_outliers=config.get("clip_outliers", True))

    @staticmethod
    def validate_columns(df: pd.DataFrame, required: list) -> None:
        """Pure validator — needs neither self nor cls."""
        missing = set(required) - set(df.columns)
        if missing:
            raise ValueError(f"Missing required columns: {missing}")

# Usage
fe = FeatureEngineer.from_config({"clip_outliers": False})
train_out = fe.fit_transform(train_df)    # fit on train
test_out  = fe.transform(test_df)         # apply same stats to test
Pitfall Mutating a class attribute via an instance

class MyModel: FEATURES = []. my_model.FEATURES.append("new") — this modifies the class-level list, affecting all instances. The list is mutated in place through the instance; it is not reassigned to a new instance attribute.

Fix Use immutable class attributes (tuples, frozensets) for constants. If mutation is needed, do it on the class explicitly: MyModel.FEATURES.append(...) so the intent is visible. Better: make FEATURES an instance attribute initialised in __init__.
Pitfall Not returning self from fit() breaks method chaining

sklearn and many DS libraries expect fit() to return self so callers can write fe.fit(X).transform(X). Returning None forces two separate lines and breaks the sklearn estimator contract.

Fix Always end fit() with return self. This also enables Pipeline and clone() compatibility, and satisfies the sklearn estimator contract.
Pitfall Doing computation in __init__ instead of fit()

Computing statistics, loading large files, or making API calls in __init__ makes the object expensive to create and impossible to clone. The computation also runs before any data is available.

Fix __init__ stores only hyperparameters (fast, cheap). All computation that depends on data belongs in fit(). Fitted attributes have a trailing underscore (self.mean_) by sklearn convention.

Instance attributes (self.x = value in __init__) are stored in each object's __dict__ — independent per instance. Class attributes (ClassName.x at class body level) are stored once in the class __dict__ and shared across all instances. Python looks up self.x in the instance __dict__ first, then the class __dict__. So self.COLS = new_list rebinds only the instance attribute; MyClass.COLS = new_list changes the class attribute for everyone. Mutating (not rebinding) a class-level mutable (list.append()) through self affects all instances.

@classmethod receives cls — use for alternative constructors: FeatureEngineer.from_config(cfg) needs to call cls(...) to create an instance (works correctly for subclasses). @staticmethod receives neither self nor cls — use for pure utility: validate_columns(df, cols) only validates; it does not need to create an instance or access class state. Rule: if you need to instantiate the class (or a subclass), use @classmethod. If you need neither self nor cls, use @staticmethod. If you need self (instance state or methods), use a regular method.

Inherit from BaseEstimator (gives get_params/set_params) and the appropriate mixin: TransformerMixin (fit_transform), ClassifierMixin (score for accuracy), or RegressorMixin (score for R²). Contract: (1) __init__ stores hyperparameters only — no computation; (2) __init__ parameter names must exactly match self.param_name assignments; (3) fit() returns self; (4) fitted attributes use trailing underscores (self.coef_, self.classes_). This enables clone(), cross_val_score(), Pipeline, and GridSearchCV compatibility automatically.

Inheritance for sklearn Estimators
BaseEstimator + TransformerMixin, fit/transform contract, Pipeline and GridSearchCV compatibility

The most common inheritance pattern in DS/AI is building sklearn-compatible custom transformers. BaseEstimator gives get_params/set_params (required for GridSearchCV and clone()). TransformerMixin gives fit_transform() for free. MRO (Method Resolution Order) determines which method runs in cooperative multiple inheritance — use ClassName.__mro__ to inspect it. Pipeline passes data through each step's transform(), calling fit() only on training data.

Python — custom sklearn LogTransformer, Pipeline, GridSearchCV tuning
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
import numpy as np

class LogTransformer(BaseEstimator, TransformerMixin):
    """Log(x + offset) — sklearn-compatible transformer."""

    def __init__(self, offset: float = 1.0):
        self.offset = offset      # name MUST match exactly — get_params uses it

    def fit(self, X, y=None):
        self.min_val_ = float(np.min(X))   # trailing _ = fitted attribute
        return self

    def transform(self, X):
        return np.log(np.abs(X) + self.offset)

# Inherits for free: get_params, set_params, fit_transform, clone
lt = LogTransformer(offset=0.5)
print(lt.get_params())    # {'offset': 0.5}

# ── sklearn Pipeline ───────────────────────────────────────
from sklearn.preprocessing import StandardScaler

pipe = Pipeline([
    ("log",   LogTransformer()),
    ("scale", StandardScaler()),
    ("clf",   LogisticRegression(max_iter=1000)),
])

pipe.fit(X_train, y_train)          # fit() on each step with training data
preds = pipe.predict(X_test)         # transform() then predict on test data

# ── GridSearchCV tunes transformer params inside pipeline ──
grid = GridSearchCV(
    pipe,
    param_grid={"log__offset": [0.0, 0.5, 1.0, 2.0]},  # step__param syntax
    cv=5,
    scoring="f1",
)
grid.fit(X_train, y_train)
print(grid.best_params_)    # {'log__offset': 1.0}
print(grid.best_score_)     # 0.912

# ── Inspect MRO ───────────────────────────────────────────
print(LogTransformer.__mro__)
# (<class 'LogTransformer'>, <class 'BaseEstimator'>,
#  <class 'TransformerMixin'>, <class 'object'>)
Pitfall __init__ parameter name must exactly match the instance attribute name

def __init__(self, n_trees): self.n_estimators = n_trees — BaseEstimator.get_params() uses inspect.signature(__init__) and finds "n_trees", but set_params() tries to set "n_trees" while the attribute is "n_estimators". GridSearchCV and clone() break silently.

Fix The __init__ parameter name and the self.x assignment name must be identical: def __init__(self, n_estimators): self.n_estimators = n_estimators. No aliases, no renaming.
Pitfall Using the same name for a hyperparameter and a fitted attribute

def __init__(self, mean): self.mean = mean. Then in fit(): self.mean = X.mean(). fit() overwrites the hyperparameter. get_params() now returns the training mean, and clone() creates an object with the wrong initial mean.

Fix Fitted attributes use trailing underscores: self.mean_ = X.mean(). The underscore is a sklearn convention that explicitly separates hyperparameters (__init__, no underscore) from learned attributes (fit(), trailing _).
Pitfall Doing computation or loading data in __init__

def __init__(self, X): self.mean = X.mean() — breaks clone() (no data at clone time), GridSearchCV (cannot create new instances without data), and serialisation with joblib.dump.

Fix Only store hyperparameters in __init__. All data-dependent computation belongs in fit(). __init__ should accept scalars, strings, and booleans — never arrays or DataFrames.

TransformerMixin provides fit_transform(X, y=None, **fit_params) for free — it calls self.fit(X, y, **fit_params).transform(X). This means your transformer automatically supports the fit_transform() pattern expected by Pipeline, without writing it manually. BaseEstimator provides get_params() (used by GridSearchCV and clone()) and set_params() (used to update hyperparameters, e.g., during grid search). Together they give you full sklearn ecosystem compatibility with fewer than 20 lines of code.

Fitted attributes (computed from data during fit()) are named with a trailing underscore: self.coef_, self.mean_, self.classes_, self.feature_importances_. This separates hyperparameters (set in __init__, no underscore, the parameters you tune) from learned parameters (set in fit(), trailing underscore, the model state). sklearn's check_is_fitted() utility uses this convention to verify that fit() has been called before transform() or predict() — raising NotFittedError if no trailing-underscore attributes are found.

Python uses C3 linearisation. For class C(A, B), the MRO is C → A → B → object. Python searches left-to-right: LogTransformer first, then BaseEstimator, then TransformerMixin, then object. super() follows the MRO — each class in the chain calls super() to pass control to the next. This is why TransformerMixin's fit_transform() correctly calls your fit() and transform() methods despite being defined in a parent class. Inspect with LogTransformer.__mro__ or LogTransformer.mro().

@property for Lazy Loading & Validation
Lazy-load large datasets on first access, validate hyperparameters at assignment, cached_property

@property turns a method into a computed attribute — attribute syntax (obj.data) instead of method syntax (obj.get_data()). In DS/AI: lazy-load large datasets only when first accessed (avoid 10GB in memory at import time), validate hyperparameter ranges at assignment so bad values fail immediately instead of hours into a training run, and expose computed statistics without precomputing them in __init__.

Python — lazy dataset loading, early hyperparameter validation, cached_property
import pandas as pd
from functools import cached_property

# ── Lazy loading: load on first access, cache result ──────
class Dataset:
    def __init__(self, filepath: str):
        self._filepath = filepath
        self._df: pd.DataFrame | None = None   # not loaded yet

    @property
    def df(self) -> pd.DataFrame:
        if self._df is None:
            print(f"Loading {self._filepath}...")
            self._df = pd.read_csv(self._filepath)
        return self._df    # subsequent accesses return cached result instantly

    @property
    def n_rows(self) -> int:
        return len(self.df)    # triggers load if not yet loaded

# Alternative: functools.cached_property — auto-caches after first call
class Dataset2:
    def __init__(self, filepath: str):
        self._filepath = filepath

    @cached_property
    def df(self) -> pd.DataFrame:
        return pd.read_csv(self._filepath)    # called once, stored in __dict__

ds = Dataset("train.csv")
# Nothing loaded yet — object creation is instant
print(ds.n_rows)   # loads CSV here, prints count
print(ds.n_rows)   # instant — already in self._df

# ── @property setter: validate hyperparameters early ──────
class ModelConfig:
    def __init__(self, n_estimators: int = 100, learning_rate: float = 0.001):
        self.n_estimators  = n_estimators    # routes through setter
        self.learning_rate = learning_rate

    @property
    def n_estimators(self) -> int:
        return self._n_estimators

    @n_estimators.setter
    def n_estimators(self, value: int) -> None:
        if not isinstance(value, int) or value < 1:
            raise ValueError(f"n_estimators must be int >= 1, got {value!r}")
        self._n_estimators = value

    @property
    def learning_rate(self) -> float:
        return self._learning_rate

    @learning_rate.setter
    def learning_rate(self, value: float) -> None:
        if value <= 0:
            raise ValueError(f"learning_rate must be > 0, got {value}")
        self._learning_rate = value

cfg = ModelConfig(n_estimators=200)
cfg.n_estimators = -5   # raises ValueError immediately — not 6 hours later
Pitfall Setting the backing attribute directly in __init__, bypassing the setter

def __init__(self, n): self._n_estimators = n — this skips the setter entirely. The validation code in the @property setter is never called, even for obviously invalid values like -1.

Fix Always set via the public name: self.n_estimators = n (without the underscore prefix). This routes through the setter and runs validation. The _n_estimators name is internal backing storage — only the setter should write to it directly.
Pitfall @property with expensive computation called repeatedly

@property def embeddings(self): return [compute(t) for t in self.texts] — recomputes all embeddings on every access. If called in a loop over batches, this multiplies the compute cost.

Fix Add caching: check if self._embeddings is None, compute and store, then return the cached value. Or use @functools.cached_property — it calls the method once and stores the result in __dict__ for subsequent O(1) lookups.
Pitfall Stale cached data when the underlying source changes

Dataset._df is cached from the original CSV. If the CSV is updated or re-generated, the cached DataFrame is stale. No automatic invalidation occurs.

Fix Add an explicit reload() method: def reload(self): self._df = None (or del self.df for cached_property). For production data workflows, prefer DVC or Delta Lake for data versioning rather than relying on file modification checks.

@property exposes computed values with attribute syntax (ds.n_rows) instead of method call syntax (ds.get_n_rows()) — cleaner for "read a value" semantics where the caller should not care whether it is stored or computed. The setter enables validation on simple assignment (cfg.n_estimators = -1 raises immediately). In DS/AI configs, catching bad hyperparameters at assignment time — rather than hours into a training run — saves real wall-clock time. Use @functools.cached_property (Python 3.8+) when the computation is expensive and the result should be computed once and memoised.

@property calls the method on every access — appropriate when the value could change (depends on mutable state). @cached_property calls the method once and stores the result in the instance __dict__, making subsequent accesses O(1) dictionary lookups. @cached_property is ideal for expensive computations that depend only on immutable instance state (loading a dataset, computing embeddings, building a vocabulary index). Note: cached_property requires a writable __dict__ — incompatible with __slots__.

Three approaches: (1) @property setter — validates immediately when the attribute is set, even in __init__; (2) __post_init__ in a @dataclass — validates after __init__ completes; (3) Pydantic BaseModel with validators — validates at model construction with automatic type coercion. All three catch errors at object creation time instead of when model.fit() eventually touches the bad value. Always validate: type (isinstance check), range (positive, bounded), and mutual constraints (min_child_samples < n_samples). Early validation is cheap insurance against wasted GPU hours.

Dataclasses for ML Configs & Results
@dataclass for typed configs, frozen=True for hashable cache keys, field() for mutable defaults, asdict()

@dataclass auto-generates __init__, __repr__, and __eq__ from field annotations, eliminating boilerplate. In DS/AI: typed experiment configurations (replacing untyped dicts), model result containers (serialisable to JSON with asdict()), and hashable experiment keys. frozen=True makes instances immutable and hashable — usable as dict keys for result caches. field(default_factory=list) solves the mutable default argument problem inside dataclasses.

Python — TrainingConfig, frozen ExperimentKey, ModelResults with asdict()
from dataclasses import dataclass, field, asdict
import json

@dataclass
class TrainingConfig:
    """Typed training configuration — replaces an untyped dict."""
    model_name:    str
    n_estimators:  int   = 100
    max_depth:     int   = 10
    learning_rate: float = 0.001
    random_state:  int   = 42
    feature_cols:  list  = field(default_factory=list)  # NOT =[] !

    def __post_init__(self):
        if self.learning_rate <= 0:
            raise ValueError(f"learning_rate must be > 0, got {self.learning_rate}")
        if self.n_estimators < 1:
            raise ValueError(f"n_estimators must be >= 1")

@dataclass(frozen=True)
class ExperimentKey:
    """Immutable, hashable key for caching experiment results."""
    model:        str
    dataset:      str
    n_estimators: int
    max_depth:    int
    # frozen=True → __hash__ auto-generated → usable as dict key

@dataclass
class ModelResults:
    model_name:         str
    accuracy:           float
    f1_score:           float
    train_time_s:       float
    feature_importance: dict = field(default_factory=dict)

    def to_json(self) -> str:
        return json.dumps(asdict(self), indent=2)  # recursively dict

# Usage
cfg = TrainingConfig("xgboost", n_estimators=200, max_depth=6)
print(cfg)
# TrainingConfig(model_name='xgboost', n_estimators=200, max_depth=6, ...)

key   = ExperimentKey("xgboost", "train_v2", 200, 6)
cache: dict[ExperimentKey, ModelResults] = {}
cache[key] = ModelResults("xgboost", 0.941, 0.908, 45.1)

# Serialize results to disk
print(cache[key].to_json())

# "Copy with changes" for frozen dataclasses
from dataclasses import replace
key_v2 = replace(key, n_estimators=300)   # new key, original unchanged
Pitfall Mutable default directly in a dataclass field

@dataclass class Config: cols: list = [] raises ValueError: mutable default <class 'list'> is not allowed. Python detects this because a shared mutable default would cause the same bug as in regular functions.

Fix Use field(default_factory=list): cols: list = field(default_factory=list). Python calls list() for each new instance, giving every object its own fresh list. Same applies to dict: field(default_factory=dict).
Pitfall Trying to modify a frozen dataclass field

key.n_estimators = 300 on a frozen=True dataclass raises FrozenInstanceError: cannot assign to field. frozen=True is intentional immutability.

Fix Use dataclasses.replace(key, n_estimators=300) to create a new instance with the modified field. The original remains unchanged. This is the correct pattern for "modified copy" of an immutable config.
Pitfall Using a plain dict for ML configs instead of a typed dataclass

config = {"n_estimators": 100, "lr": 0.001} — no IDE completion, no type checking, typos ("learing_rate") go undetected until runtime, no __post_init__ validation, no clear schema.

Fix Use @dataclass (or Pydantic BaseModel for JSON schema/API configs). You get IDE completion, type annotations, __repr__ for logging, __eq__ for comparison, and __post_init__ for validation — all with less code than a manual class.

@dataclass generates: __init__ (from field annotations and defaults), __repr__ (shows all fields), __eq__ (compares all fields by value). With frozen=True: __hash__ (from all fields). With order=True: __lt__, __le__, __gt__, __ge__. It does NOT generate: custom validation (use __post_init__), __hash__ when only eq=True and not frozen (instances are unhashable by default when __eq__ is defined), JSON serialisation (use asdict() + json.dumps), or JSON schema (use Pydantic for that).

Use frozen=True when: (1) the object must be hashable — to use as a dict key (ExperimentKey → result cache) or set member; (2) you want immutable configs — prevents accidental mutation after creation; (3) you need thread-safety on the config object. frozen=True auto-generates __hash__ from all fields. Downside: you cannot modify fields after creation — use dataclasses.replace() for modified copies. Do NOT use frozen=True when any field contains a mutable object (list, dict) — the container is frozen but the contents remain mutable.

Plain dict: no type hints, no IDE completion, typos undetected, easy JSON (json.dumps). Quick scripts only. @dataclass: typed fields, IDE completion, __post_init__ validation, asdict() for serialisation, no JSON schema. Best for internal configs. Pydantic BaseModel: full type coercion (str "42" → int 42), rich validators, automatic JSON schema, FastAPI integration. Best for API payloads, YAML/JSON config files, and production ML serving. Rule: internal training configs → @dataclass; API request/response schemas → Pydantic; throwaway exploration → dict.

Every sklearn estimator follows the same contract: store hyperparameters in __init__, fit on training data and return self, transform or predict on new data. Once you internalise this, you can write sklearn-compatible code that works with Pipeline, GridSearchCV, and cross_val_score — and read any estimator source.
Advanced OOP & Type System 05–06
05

Type Hints, Protocols & ABCs

Type hints are documentation that mypy can verify at CI time. Protocols define interfaces structurally — any class with the right methods satisfies them, no inheritance required. ABCs enforce interfaces through explicit inheritance.

Type Annotations for DS/AI Code
Annotating feature extractors, model wrappers, Optional, Union, Callable, TypeAlias

Type hints are optional but invaluable in production DS/AI code: they document function contracts, enable IDE autocompletion, and let mypy catch argument-type bugs before runtime. Key types for data work: Optional[X] for values that may be None, Union[X, Y] (or X | Y in Python 3.10+) for multiple accepted types, Callable[[arg], return] for typed function arguments, and numpy.typing.NDArray[np.float64] for typed arrays.

Python — annotating preprocessing, Optional, Callable, TypeAlias, NDArray
import pandas as pd
import numpy as np
import numpy.typing as npt
from typing import Optional, Callable

# TypeAlias — give complex types a readable name
Features = npt.NDArray[np.float64]   # type alias
Labels   = npt.NDArray[np.int64]
TextList = list[str]

# ── Annotate preprocessing functions ─────────────────────
def clean_text(text: str) -> str:
    return text.lower().strip()

def extract_features(texts: TextList) -> Features:
    return np.array([[len(t), t.count(" ")] for t in texts], dtype=np.float64)

# ── Optional: value may be absent ────────────────────────
def load_labels(path: str, default: Optional[Labels] = None) -> Optional[Labels]:
    try:
        return np.load(path)
    except FileNotFoundError:
        return default

result = load_labels("missing.npy")    # return type: Optional[Labels] = None

# ── Callable: typed function arguments ───────────────────
def apply_transform(
    series: pd.Series,
    fn: Callable[[pd.Series], pd.Series],
) -> pd.Series:
    return fn(series)

apply_transform(df["age"], np.log1p)   # passes a numpy ufunc as Callable

# ── Union / 3.10+ X | Y syntax ───────────────────────────
def encode_label(label: str | int) -> int:
    return label if isinstance(label, int) else LABEL2ID.get(label, -1)

# ── Annotate class methods ────────────────────────────────
class Scaler:
    def fit(self, X: Features) -> "Scaler": ...
    def transform(self, X: Features) -> Features: ...
    def fit_transform(self, X: Features) -> Features:
        return self.fit(X).transform(X)
Pitfall Using np.ndarray instead of npt.NDArray[np.float64]

def predict(X: np.ndarray) — np.ndarray accepts any dtype, so mypy allows passing an int32 array where float64 is expected. The bug surfaces at runtime, not at type-check time.

Fix Use numpy.typing.NDArray[np.float64] for float arrays, NDArray[np.int64] for label arrays. This gives mypy enough information to catch dtype mismatches at analysis time.
Pitfall Forgetting to handle the None branch of Optional

def process(labels: Optional[Labels]): return labels.mean() — if labels is None, this raises AttributeError. mypy will flag this as "Item None of Optional[NDArray] has no attribute mean".

Fix Add a guard: if labels is None: return 0.0. Or use assert labels is not None to narrow the type. mypy tracks these guards and removes None from the type inside the if-branch.
Pitfall Overusing Any — it disables all type checking

from typing import Any; def f(x: Any) -> Any — Any is compatible with everything, so mypy never reports errors. The whole point of type hints is lost.

Fix Use Any only at genuine boundaries (JSON parsing, dynamic attribute access). For collections with mixed types, prefer Union or overload. For unknown types from external APIs, use object (accepts any object but prevents arbitrary attribute access).

def preprocess(df: pd.DataFrame) -> pd.DataFrame: ... is the straightforward annotation. For typed column schemas, use TypedDict: class TrainRow(TypedDict): age: float; label: int. Then list[TrainRow] for a list of records. For generic DataFrame operations that should work on any column subset, use pd.DataFrame — mypy does not currently type-check individual column names without third-party plugins like pandas-stubs.

They are identical — Optional[X] is defined as Union[X, None]. Optional[X] is preferred for readability when None is the only alternative. Use X | None in Python 3.10+ for the same meaning with cleaner syntax. Both require the caller to handle the None case explicitly. If you forget to handle None, mypy reports "Item None of Optional[X] has no attribute y". Always narrow the type with an if guard or assert before accessing attributes.

Use Callable[[pd.DataFrame], pd.DataFrame] for each step and list[Callable[[pd.DataFrame], pd.DataFrame]] for a pipeline: def run_pipeline(df: pd.DataFrame, steps: list[Callable[[pd.DataFrame], pd.DataFrame]]) -> pd.DataFrame: return functools.reduce(lambda d, fn: fn(d), steps, df). For generic pipelines that work on types other than DataFrame, use TypeVar: T = TypeVar("T"); Callable[[T], T] — mypy infers T from the first call and checks consistency.

TypeVar & Generic Types
TypeVar for type-preserving transforms, Generic classes for typed pipelines, Sequence vs List

TypeVar lets you write functions that accept multiple types while telling mypy the return type is the same as the input type. Generic classes (class MyPipe[T]) parametrise the type throughout the class. Sequence[T] accepts any ordered collection (list, tuple, ndarray), while List[T] requires a list — prefer Sequence for read-only function arguments that should accept any sequence type.

Python — TypeVar for transforms, Generic pipeline, Sequence vs List
from typing import TypeVar, Generic, Sequence, Callable, Iterator
import pandas as pd, numpy as np

T  = TypeVar("T")
DF = TypeVar("DF", pd.DataFrame, pd.Series)   # constrained TypeVar

# ── TypeVar: return type mirrors input type ───────────────
def first(items: Sequence[T]) -> T:
    """Works on any sequence — returns the same element type."""
    return items[0]

x: int  = first([1, 2, 3])        # mypy infers int
s: str  = first(("a", "b", "c"))  # mypy infers str

# ── Constrained TypeVar: accept DataFrame or Series ───────
def clip_values(data: DF, lo: float, hi: float) -> DF:
    return data.clip(lo, hi)   # return type matches input

clipped_df = clip_values(train_df, 0.0, 1.0)   # returns DataFrame
clipped_s  = clip_values(series,   0.0, 1.0)   # returns Series

# ── Generic class: typed pipeline ────────────────────────
class DataPipeline(Generic[T]):
    def __init__(self) -> None:
        self._steps: list[Callable[[T], T]] = []

    def add(self, step: Callable[[T], T]) -> "DataPipeline[T]":
        self._steps.append(step)
        return self

    def run(self, data: T) -> T:
        for step in self._steps:
            data = step(data)
        return data

    def __iter__(self) -> Iterator[Callable[[T], T]]:
        return iter(self._steps)

# Typed as DataPipeline[str] — each step must be str→str
text_pipe: DataPipeline[str] = DataPipeline()
text_pipe.add(str.lower).add(str.strip)
result = text_pipe.run("  Hello World  ")   # "hello world"

# ── Sequence vs List ──────────────────────────────────────
# Sequence[str]: accepts list, tuple, ndarray — read-only
def count_tokens(texts: Sequence[str]) -> list[int]:
    return [len(t.split()) for t in texts]

count_tokens(["hello world", "hi"])   # list — OK
count_tokens(("hi there",))          # tuple — OK (if List, this would fail mypy)
Pitfall Using List[str] for read-only function arguments

def f(items: List[str]) — callers with a tuple or any other sequence get a mypy error, even though the function only reads items. This forces unnecessary conversions.

Fix Use Sequence[str] for read-only inputs (supports list, tuple, ndarray, str). Use Iterable[str] if you only iterate once (allows generators). Reserve List[str] for when you need append/pop/sort — mutation-specific operations.
Pitfall TypeVar without constraints accepts everything including incompatible types

T = TypeVar("T"); def process(x: T) -> T — mypy accepts process(None), process(42), process({}) even if the function body calls .split() which only works on str.

Fix Constrain TypeVar to valid types: T = TypeVar("T", str, bytes) for text types, or use an upper bound: T = TypeVar("T", bound=pd.DataFrame) to accept DataFrame and subclasses only.
Pitfall Returning a new type inside a TypeVar-annotated function

def transform(df: DF) -> DF: return df.to_dict() — the return is dict, not DF. mypy flags this as an incompatible return type.

Fix Ensure the return type genuinely matches the input type. If the function may return a different type, use Union or separate overloads. TypeVar promises "same type out as in" — a promise you must keep.

TypeVar establishes a relationship between input and output types: T = TypeVar("T"); def identity(x: T) -> T promises "the return type is the same type as the input". Union[str, int] means "accepts str OR int but may return either — no relationship". Use TypeVar when the output type depends on the input type (first(), copy(), sorted()). Use Union when the function accepts multiple types but the return type is fixed or independent.

List[T]: mutable, supports indexing, append, sort — accepts only list. Sequence[T]: read-only ordered access with len() and indexing — accepts list, tuple, range, str. Iterable[T]: only guarantees for-loop iteration — accepts generators, files, anything with __iter__. Use the most permissive type your function actually needs: Iterable for a single-pass loop, Sequence for random access, List only if you call append/pop/sort. This makes functions more reusable and tests easier to write.

A generic DataPipeline[T] ensures all steps are Callable[[T], T] — mypy catches if you accidentally add a str→int step to a DataFrame pipeline at analysis time, not at runtime. Without generics, you'd use List[Any] for steps and lose all type safety. In production DS/AI code, typed pipelines document the data contract at each stage, making it safe to refactor step implementations without worrying about breaking the data flow.

Protocol — Structural Typing for DS/AI
Define Predictor/Transformer interfaces that sklearn models satisfy automatically, runtime_checkable

Protocol defines an interface structurally — any class that has the required methods satisfies it, with no inheritance needed. This is "duck typing with static checking". In DS/AI: define a Predictor protocol (fit + predict) and a Transformer protocol (fit + transform). sklearn's RandomForestClassifier and StandardScaler satisfy these protocols automatically, without any changes to sklearn.

Python — Predictor/Transformer protocols, sklearn compatibility, runtime_checkable
from typing import Protocol, runtime_checkable
import numpy as np

# ── Define structural interfaces ──────────────────────────
@runtime_checkable
class Predictor(Protocol):
    """Any class with fit() and predict() satisfies this — including all sklearn classifiers."""
    def fit(self, X: np.ndarray, y: np.ndarray) -> "Predictor": ...
    def predict(self, X: np.ndarray) -> np.ndarray: ...

@runtime_checkable
class Transformer(Protocol):
    def fit(self, X: np.ndarray, y=None) -> "Transformer": ...
    def transform(self, X: np.ndarray) -> np.ndarray: ...

# ── sklearn models satisfy these without any changes ──────
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler

rf     = RandomForestClassifier()
scaler = StandardScaler()

print(isinstance(rf,     Predictor))    # True — rf has fit() and predict()
print(isinstance(scaler, Transformer))  # True — scaler has fit() and transform()

# ── Write functions that accept any conforming model ──────
def cross_validate(model: Predictor, X: np.ndarray,
                   y: np.ndarray, n_folds: int = 5) -> float:
    """Works with ANY class that has fit() and predict()."""
    from sklearn.model_selection import cross_val_score
    return cross_val_score(model, X, y, cv=n_folds).mean()

# Works with sklearn, XGBoost, custom models — anything that fits
cross_validate(rf, X_train, y_train)

# ── Custom model satisfies Protocol without inheriting ────
class NaiveBayes:
    def fit(self, X: np.ndarray, y: np.ndarray) -> "NaiveBayes":
        self.class_priors_ = np.bincount(y) / len(y)
        return self
    def predict(self, X: np.ndarray) -> np.ndarray:
        return np.zeros(len(X), dtype=int)   # stub

print(isinstance(NaiveBayes(), Predictor))  # True — structurally compatible
cross_validate(NaiveBayes(), X_train, y_train)  # accepted by mypy
Pitfall Protocol without @runtime_checkable cannot be used in isinstance()

isinstance(model, Predictor) raises TypeError: Protocols with non-method members are not supported by isinstance if @runtime_checkable is missing. Protocol checks are normally only static (mypy time).

Fix Add @runtime_checkable to the Protocol class if you need isinstance() checks at runtime. Note: runtime_checkable only checks method existence, not their signatures. mypy checks signatures at analysis time.
Pitfall Confusing Protocol (structural) with ABC (nominal)

Both define interfaces, but a class must explicitly inherit from an ABC to be considered conforming. A class with the right methods but no inheritance still fails isinstance(obj, MyABC).

Fix Use Protocol when you want structural typing — any class with the right methods qualifies, no inheritance needed. Use ABC when you want to enforce inheritance and guarantee that abstract methods are implemented (raises TypeError at instantiation if not).
Pitfall Protocol methods with incorrect signatures

Protocol.fit(self, X) but a class implements fit(self, X, y, sample_weight) — the extra parameters are fine for structural compatibility, but if you call protocol.fit(X, y) on a Protocol-typed variable, mypy may flag the call as having wrong arity.

Fix Define Protocol methods with the minimal required signature. Extra parameters on implementations are fine. Use **kwargs in the Protocol signature if implementations may have extra keyword arguments.

Protocol (structural): no inheritance required — any class with the specified methods satisfies it. Works with existing classes (sklearn models) without modification. Checked statically by mypy, optionally at runtime with @runtime_checkable. ABC (nominal): requires explicit inheritance from the ABC. If a subclass forgets an abstract method, TypeError is raised at instantiation — a runtime safety net. Use Protocol for external/third-party code you cannot modify. Use ABC for internal interfaces where you control all implementations and want enforcement via TypeError.

Annotate your pipeline with the Protocol type: def train_and_evaluate(model: Predictor, X, y) — mypy verifies all code inside only uses fit() and predict(). You can now pass RandomForestClassifier, XGBoostClassifier, LogisticRegression, or any custom model — all satisfy Predictor structurally. This is the dependency inversion principle: the pipeline depends on the Predictor abstraction, not on specific model classes. Swapping models does not require changing the pipeline code.

Only with @runtime_checkable. isinstance(rf, Predictor) returns True if rf has all methods listed in the Protocol — it checks method existence, not signatures. This is useful for input validation at the boundary of an inference endpoint: if not isinstance(model, Predictor): raise TypeError. However, runtime isinstance checks are shallow (method names only); full signature compatibility is only verified by mypy at static analysis time.

ABCs for Custom DS/AI Interfaces
Abstract base classes with @abstractmethod, template method pattern, enforced interface contracts

Abstract Base Classes (ABCs) define interfaces through inheritance. @abstractmethod marks methods that every subclass must implement — Python raises TypeError at instantiation if any abstract method is missing. In DS/AI: define a BaseDataLoader with abstract load() and preprocess(), or a BaseEvaluator with abstract compute_metrics(). The template method pattern implements the common orchestration logic in the base class, calling abstract methods that subclasses customise.

Python — BaseDataLoader ABC, template method, multiple implementations
from abc import ABC, abstractmethod
import pandas as pd

class BaseDataLoader(ABC):
    """Abstract interface — all loaders must implement load() and preprocess()."""

    def __init__(self, source: str, batch_size: int = 256):
        self.source     = source
        self.batch_size = batch_size

    @abstractmethod
    def load(self) -> pd.DataFrame:
        """Load raw data from source (CSV, API, DB, S3...)."""
        ...

    @abstractmethod
    def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
        """Clean, validate, and transform raw data."""
        ...

    def load_and_process(self) -> pd.DataFrame:
        """Template method — orchestrates abstract steps."""
        raw = self.load()
        return self.preprocess(raw)

# Trying to instantiate the ABC directly raises TypeError
# BaseDataLoader("path.csv")  →  TypeError: Can't instantiate abstract class

class CsvLoader(BaseDataLoader):
    def load(self) -> pd.DataFrame:
        return pd.read_csv(self.source)

    def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
        return df.dropna().reset_index(drop=True)

class S3Loader(BaseDataLoader):
    def load(self) -> pd.DataFrame:
        import boto3, io
        obj = boto3.client("s3").get_object(Bucket="ml-data", Key=self.source)
        return pd.read_parquet(io.BytesIO(obj["Body"].read()))

    def preprocess(self, df: pd.DataFrame) -> pd.DataFrame:
        df["text"] = df["text"].str.lower().str.strip()
        return df.dropna(subset=["text", "label"])

# Both loaders share the same orchestration
for Loader in [CsvLoader, S3Loader]:
    data = Loader("train.csv").load_and_process()
    train_model(data)

# isinstance works without @runtime_checkable on ABC
print(isinstance(CsvLoader("f.csv"), BaseDataLoader))  # True
Pitfall Forgetting @abstractmethod — subclass skips implementation silently

class BaseLoader(ABC): def load(self): pass (no @abstractmethod) — Python treats this as a concrete method with an empty body, not abstract. Subclasses that forget to implement load() will call the base method (which does nothing) and return None silently.

Fix Always decorate abstract methods with @abstractmethod. Optionally raise NotImplementedError inside the body to give a clear message if called directly (rarely needed since TypeError at instantiation time is usually sufficient).
Pitfall Putting too much logic in abstract methods

An abstract method with a large default implementation that subclasses must call via super().preprocess(df) creates fragile coupling. If a subclass forgets super(), the base logic is silently skipped.

Fix Abstract methods should be pure interface (just ... or pass). Put shared logic in non-abstract methods (the template method pattern). Concrete shared behaviour goes in the base; only variability is abstract.
Pitfall Using ABC where Protocol is more appropriate

If you want to accept sklearn models and custom models in a pipeline, using ABC forces you to make sklearn inherit from your ABC — which you cannot do without monkey-patching.

Fix Use Protocol for external/third-party code you cannot inherit from (sklearn, PyTorch, XGBoost). Use ABC for your own internal class hierarchies where you control all implementations and want the TypeError safety net.

The template method defines the skeleton of an algorithm in a base class using non-abstract methods that call abstract methods. Subclasses fill in the variabilities (load(), preprocess()) while the orchestration (load_and_process()) is shared. In DS/AI this is ideal for data loaders (different sources, same processing order), evaluators (different metrics, same reporting logic), and experiment runners (different models, same train/validate/log loop). It avoids duplicating orchestration code across every implementation.

At instantiation time — when you call CsvLoader(...) (or any subclass of an ABC). If any @abstractmethod is not overridden in the subclass, Python raises TypeError: Can't instantiate abstract class CsvLoader with abstract method load. This is earlier than a protocol check (which mypy catches at analysis time) and earlier than a missing-method AttributeError at the call site. The error message names the missing method, making it immediately actionable.

ABC is closest to Java's abstract class: supports partial implementation (non-abstract methods), instance attributes, __init__, and multiple inheritance. Python has no separate interface construct — Protocol serves that role for structural typing. In Java, interface = pure abstract contract with no state; abstract class = partial implementation. In Python: ABC can be either — it is up to the author whether to include concrete methods. Best practice: if the ABC has no concrete methods, consider Protocol instead (more flexible, works with external code).

A RandomForestClassifier satisfies your custom Predictor protocol without changing a single line of sklearn code. That is duck typing with static checking — and it is the reason production DS/AI code annotated with Protocol is much safer to refactor than code that uses Any.
06

Iterators, Context Managers & Generator Pipelines

The iterator protocol powers every for loop, streaming data loader, and lazy pipeline in Python. Context managers guarantee cleanup — file handles, DB connections, GPU memory — even when exceptions occur.

Iterator Protocol — Custom Dataset Iterators
__iter__, __next__, StopIteration, BatchDataset iterator for ML training loops

Any object with __iter__ and __next__ is an iterator. __iter__ returns the iterator object (usually self). __next__ returns the next value and raises StopIteration when exhausted. Python's for loop calls these automatically. In DS/AI: write a BatchDataset iterator that yields mini-batches from a DataFrame, resets on each epoch, and reports its length for progress bars.

Python — BatchDataset iterator, for loop integration, epoch-safe reset
import math, pandas as pd

class BatchDataset:
    """Iterable dataset that yields mini-batches; resets on each epoch."""

    def __init__(self, df: pd.DataFrame, batch_size: int = 64):
        self._df         = df.reset_index(drop=True)
        self._batch_size = batch_size
        self._pos        = 0

    def __iter__(self) -> "BatchDataset":
        self._pos = 0          # reset position on every new for loop
        return self

    def __next__(self) -> pd.DataFrame:
        if self._pos >= len(self._df):
            raise StopIteration
        batch     = self._df.iloc[self._pos : self._pos + self._batch_size]
        self._pos += self._batch_size
        return batch

    def __len__(self) -> int:
        return math.ceil(len(self._df) / self._batch_size)

dataset = BatchDataset(train_df, batch_size=32)

# Works seamlessly with for, enumerate, tqdm
for i, batch in enumerate(dataset):
    loss = train_step(batch)
    if i % 10 == 0:
        print(f"batch {i}/{len(dataset)} loss={loss:.4f}")

# Multi-epoch training — __iter__ resets each time
for epoch in range(10):
    for batch in dataset:      # calls __iter__() → self._pos = 0
        train_step(batch)

# Can also use next() manually
it = iter(dataset)
first_batch  = next(it)
second_batch = next(it)

# ── Separating container (iterable) from iterator ─────────
class DataFolder:
    """Iterable container — creating multiple iterators is safe."""
    def __init__(self, items): self._items = items
    def __iter__(self):
        return iter(self._items)   # returns a NEW iterator each time
Pitfall Iterator that cannot be reset — exhausted after one pass

A generator function returns a one-shot iterator. Iterating twice: for x in gen: ... and then for x in gen: ... — the second loop processes nothing because the generator is exhausted.

Fix Implement __iter__ in a class that resets state (self._pos = 0) at the start of each iteration. Or wrap the generator in a function so each call returns a fresh generator: for x in make_gen(): ...
Pitfall Confusing an iterable (has __iter__) with an iterator (has __next__)

A list is an iterable but not an iterator — calling next(my_list) raises TypeError. An iterator is both iterable and has __next__. Mixing these up causes confusing "cannot unpack non-iterable" errors.

Fix Distinguish: iterable = anything you can for-loop over. iterator = an iterable that remembers position and supports next(). A class can be both by having __iter__ return self and also implementing __next__.
Pitfall Raising StopIteration inside a generator function (Python 3.7+)

Before Python 3.7, raising StopIteration inside a generator silently stopped it. In 3.7+, PEP 479 makes this a RuntimeError. Code that relied on the old behaviour breaks silently in newer Python.

Fix Use return to stop a generator (equivalent to raising StopIteration from outside). Never raise StopIteration explicitly inside a generator function body.

An iterable has __iter__ — it can produce iterators. A list, tuple, DataFrame, and any generator expression are iterables. An iterator has both __iter__ (returns self) and __next__ (returns the next value or raises StopIteration). iter(obj) calls obj.__iter__() to get an iterator; next(it) calls it.__next__(). Python's for loop calls iter() once to get the iterator, then next() in a loop until StopIteration. A key property: iterables can be iterated multiple times (list creates a new iterator each time); iterators are one-shot (exhausted after one pass).

Implement a class with __iter__ (resets position to 0 and returns self), __next__ (returns the next batch or raises StopIteration), and __len__ (for progress bars and scheduler step calculations). Reset position in __iter__ so each new for loop restarts from the beginning. This means you can write for epoch in range(10): for batch in dataset: — the inner for loop calls __iter__ each time, resetting position. Add shuffle=True logic in __iter__ to shuffle the index array before each epoch.

for x in obj: is syntactic sugar for: it = iter(obj); while True: try: x = next(it); except StopIteration: break; <body>. iter(obj) calls obj.__iter__(). next(it) calls it.__next__(). This protocol works for lists, generators, files (line-by-line), network sockets, database cursors, and any custom class — all by implementing the same two-method interface.

Generator Pipelines — yield & yield from
Composable lazy stages, yield from for delegation, streaming data pipelines with O(batch_size) memory

yield from delegates all yielded values from a sub-generator — enabling composable pipeline stages. Generator pipelines chain lazy stages: each stage pulls from the previous, yielding processed items one at a time. The entire pipeline processes a 100GB dataset with O(batch_size) memory because no stage materialises its full output. This pattern underpins production streaming ML data loaders.

Python — yield from, composable pipeline stages, streaming load→filter→clean→batch
from pathlib import Path
from typing import Iterator
import csv

# ── yield from: delegate to sub-generator ────────────────
def read_csv(path: Path) -> Iterator[dict]:
    with open(path, encoding="utf-8") as f:
        yield from csv.DictReader(f)   # delegates all DictReader yields

def load_many(paths: list[Path]) -> Iterator[dict]:
    for path in paths:
        yield from read_csv(path)      # concatenate files lazily

# ── Composable pipeline stages ────────────────────────────
def filter_valid(rows: Iterator[dict]) -> Iterator[dict]:
    for row in rows:
        if row.get("text") and row.get("label"):
            yield row                  # only pass through rows with content

def normalise(rows: Iterator[dict]) -> Iterator[dict]:
    for row in rows:
        row["text"] = row["text"].lower().strip()
        yield row

def to_batches(rows: Iterator[dict], size: int = 64) -> Iterator[list[dict]]:
    batch: list[dict] = []
    for row in rows:
        batch.append(row)
        if len(batch) == size:
            yield batch
            batch.clear()
    if batch:
        yield batch            # last partial batch

# ── Wire stages together — O(batch_size) memory always ────
csv_files = list(Path("data").glob("**/*.csv"))   # all CSVs in tree

pipeline  = to_batches(
    normalise(
        filter_valid(
            load_many(csv_files)
        )
    ),
    size=256,
)

for batch in pipeline:
    features = extract_features(batch)
    model.partial_fit(features)        # online / incremental learning
Pitfall Materialising a stage with list() defeats lazy evaluation

rows = list(load_many(csv_files)) loads all rows into memory before passing to the next stage. For a 50M-row dataset, this causes OOM — exactly what the generator was designed to avoid.

Fix Pass generator objects directly between stages: to_batches(normalise(filter_valid(rows))). Only materialise at the very last stage when you actually need the full batch in memory (e.g., for NumPy operations).
Pitfall yield from on a non-generator expression

yield from [1, 2, 3] works (lists are iterable), but yield from compute_result() requires compute_result() to return an iterable — a function that returns a bare value will raise TypeError.

Fix yield from works on any iterable: lists, tuples, generators, files, range(). If you are unsure whether the expression returns an iterable, check with iter(expr) — if it does not raise, it is iterable.
Pitfall Not handling the final partial batch

A to_batches() implementation that only yields when batch is full silently drops the last rows if len(data) % batch_size != 0. Common with non-round dataset sizes.

Fix Always add the final yield after the loop: if batch: yield batch. This ensures every row is processed, including the remainder batch that is smaller than batch_size.

yield from iterates the sub-iterable and yields each of its values one by one — it delegates, which means: (1) no explicit for loop needed; (2) it correctly propagates StopIteration, send(), and throw() to the sub-generator; (3) the sub-generator's return value is the result of the yield from expression. yield from x is semantically equivalent to for item in x: yield item, but yield from also handles two-way generator communication (send/throw) that the for loop cannot.

Each stage is a generator — it produces one item at a time, on demand, only when the next stage calls next(). No stage stores its full output: load_many() holds at most one open file handle; filter_valid() holds one row; normalise() holds one row; to_batches() holds batch_size rows. Python pulls data through the chain lazily: the for batch in pipeline loop calls next(to_batches), which calls next(normalise), which calls next(filter_valid), which calls next(load_many), which reads one row from disk. Memory usage is bounded by the batch size, not the dataset size.

Generator pipeline: dataset does not fit in RAM; streaming/online processing; process-and-discard pattern; building a batched data loader for incremental learning. Pandas DataFrame: dataset fits in RAM and you need vectorised operations (groupby, merge, rolling window); you iterate multiple times; you need random access; you need Pandas' rich API for transformations. Rule: if the dataset is under a few GB and fits comfortably in memory, load into a DataFrame for the rich API. For everything larger, stream through generators.

Context Managers for Data & ML Code
@contextmanager for timers/DB connections, suppress for expected errors, GPU memory management

The with statement guarantees that cleanup code runs even if an exception occurs. In DS/AI: time preprocessing and inference blocks, manage database and vector-store connections, release GPU memory after inference, suppress expected errors like missing cache files. contextlib.contextmanager turns a generator function (with try/yield/finally) into a context manager without writing a class.

Python — @contextmanager timer, DB connection, GPU memory, suppress
import time
from contextlib import contextmanager, suppress
from pathlib import Path

# ── @contextmanager: code block timer ─────────────────────
@contextmanager
def timer(label: str):
    t0 = time.perf_counter()
    try:
        yield                  # block runs here
    finally:
        elapsed = time.perf_counter() - t0
        print(f"{label}: {elapsed:.3f}s")   # always prints, even on exception

with timer("preprocessing"):
    df = preprocess(raw_df)    # timed regardless of exceptions

with timer("batch inference"):
    preds = model.predict(X_test)

# ── suppress: silence expected exceptions ─────────────────
with suppress(FileNotFoundError):
    Path("cache/embeddings.npy").unlink()   # delete if exists, ignore if not

with suppress(KeyError):
    val = config["optional_param"]          # use a default instead

# ── DB / vector store connection ──────────────────────────
@contextmanager
def get_connection(dsn: str):
    conn = connect(dsn)
    try:
        yield conn
    except Exception:
        conn.rollback()
        raise                  # re-raise — do not swallow errors
    else:
        conn.commit()          # only commits if no exception
    finally:
        conn.close()           # always closes

with get_connection(DB_URL) as conn:
    upsert_embeddings(conn, embeddings)

# ── GPU memory management ─────────────────────────────────
@contextmanager
def inference_mode(model_path: str):
    import torch
    model = torch.load(model_path, map_location="cuda")
    model.eval()
    try:
        with torch.no_grad():
            yield model
    finally:
        del model
        torch.cuda.empty_cache()   # free GPU memory even on exception

with inference_mode("best_model.pt") as model:
    preds = model(batch_tensor)
Pitfall Not using with for file I/O — resource leak

f = open("data.csv"); data = f.read() — if an exception occurs before f.close(), the file handle leaks. In a long-running server, this exhausts file descriptors.

Fix Always use with open(...) as f: — Python calls f.__exit__() which closes the handle even on exception. This applies to any resource with a context manager: database connections, model files, network sockets.
Pitfall Swallowing exceptions in a context manager __exit__

def __exit__(self, exc_type, exc_val, tb): return True — returning True suppresses the exception. A training error is silently swallowed, the loop continues, and the model is saved in a broken state.

Fix Return None (or False) from __exit__ to let exceptions propagate normally. Only suppress exceptions deliberately with contextlib.suppress() for specific, expected exception types. Never use a blanket except Exception: pass inside __exit__.
Pitfall Using suppress too broadly

with suppress(Exception): run_training() — any error in training is silently ignored: OOM, CUDA errors, assertion failures. You never know why training produced no output.

Fix suppress() should only be used for specific, expected, non-critical exceptions (FileNotFoundError for optional cache files, KeyError for optional config keys). Never suppress broad Exception or BaseException in code that performs important work.

Python calls the context manager's __exit__(exc_type, exc_val, traceback) with the exception details. If __exit__ returns a truthy value, the exception is suppressed and execution continues after the with block. If __exit__ returns False/None, the exception propagates normally. In @contextmanager, the code after yield (in the finally block) runs regardless — guaranteeing cleanup. This is why with open() safely closes files, and why @contextmanager with try/finally safely releases resources.

@contextlib.contextmanager turns a generator function into a context manager. The function must: (1) optionally set up resources; (2) yield exactly once (the yielded value is bound to the as target); (3) optionally clean up in a finally block. The code before yield is __enter__, the yield is the body of the with block, and finally is __exit__. This is cleaner than writing a class with __enter__ and __exit__ for simple cases. Use a class when the context manager needs its own attributes or multiple methods.

Nest @contextmanager timer blocks: with timer("data_load"): ...; with timer("preprocessing"): ...; with timer("model_forward"): ... — each block reports its own elapsed time. For more structured profiling, use cProfile or py-spy for CPU profiling, or torch.profiler.profile() for GPU profiling. For production, emit timing metrics to your observability stack (Prometheus, Datadog, MLflow) inside the timer context manager rather than printing.

itertools for Data Manipulation
chain for combining datasets, islice for sampling, product for hyperparameter grids, groupby

itertools provides lazy, composable tools for working with sequences without materialising them. In DS/AI: chain.from_iterable for flattening nested feature lists, islice for sampling the first N rows from a large generator, product for building hyperparameter search grids without nested loops, groupby for category-level aggregation.

Python — chain, islice sampling, product for hyperparameter grid, groupby
import itertools
from itertools import chain, islice, product, groupby, chain

# ── chain: combine datasets lazily ───────────────────────
train_iter  = read_csv(Path("train.csv"))
augment_iter = read_csv(Path("augmented.csv"))
all_data    = chain(train_iter, augment_iter)   # single lazy iterator

# chain.from_iterable: flatten nested token lists
doc_tokens  = [["cat", "sat", "mat"], ["dog", "ran", "away"]]
all_tokens  = list(chain.from_iterable(doc_tokens))
# ['cat', 'sat', 'mat', 'dog', 'ran', 'away']

# ── islice: sample first N from a large generator ─────────
huge_rows   = read_csv(Path("100M_rows.csv"))    # lazy — nothing loaded yet
sample_500  = list(islice(huge_rows, 500))       # only 500 rows read from disk

# ── product: hyperparameter grid — no nested loops ────────
n_est_vals  = [100, 200, 500]
depth_vals  = [5, 10, 20]
lr_vals     = [0.01, 0.001, 0.0001]

for n_est, depth, lr in product(n_est_vals, depth_vals, lr_vals):
    # 3 × 3 × 3 = 27 combinations
    model  = train(n_estimators=n_est, max_depth=depth, lr=lr)
    result = evaluate(model)
    print(f"n={n_est} d={depth} lr={lr}: {result:.3f}")

# Equivalent to np.meshgrid but works with any iterables, not just numbers
all_combos = list(product(n_est_vals, depth_vals))
# [(100, 5), (100, 10), (100, 20), (200, 5), ...]

# ── groupby: aggregate predictions by category ────────────
# IMPORTANT: input must be sorted by the key
records = sorted([
    {"category": "pos", "score": 0.91},
    {"category": "neg", "score": 0.23},
    {"category": "pos", "score": 0.85},
    {"category": "neg", "score": 0.31},
], key=lambda r: r["category"])

for cat, group in groupby(records, key=lambda r: r["category"]):
    scores = [r["score"] for r in group]
    print(f"{cat}: n={len(scores)} mean={sum(scores)/len(scores):.2f}")
Pitfall groupby does not group like SQL — input must be sorted first

itertools.groupby() groups consecutive elements with the same key. If the data is not sorted, "pos" records are split into multiple groups. groupby([pos, neg, pos, pos]) gives three groups, not two.

Fix Always sort by the grouping key before calling groupby: sorted(records, key=lambda r: r["category"]). For SQL-style grouping on unsorted data, use defaultdict(list) or Pandas groupby() which sorts internally.
Pitfall Consuming the group iterator after moving to the next group

for key, group in groupby(data, key=fn): keys.append(key) — storing the key but processing group later. By the time you use group, groupby has advanced past it and the group is empty.

Fix Always consume the group iterator inside the loop body: for key, group in groupby(data, key=fn): items = list(group); process(items). Do not store the group object for later use.
Pitfall islice on an exhausted generator returns empty silently

gen = read_csv("f.csv"); list(gen); sample = list(islice(gen, 100)) — sample is [] because gen was already exhausted. No error is raised.

Fix islice does not reset the generator. Create a fresh generator for each islice call, or use itertools.tee() if you need multiple independent iterators over the same data (with memory trade-offs).

product(n_estimators, max_depths, learning_rates) generates the cartesian product — every combination — lazily, without nested loops. For 3 values × 3 values × 3 values = 27 combinations generated in one expression. Combined with islice you can sample a random subset. Combined with a parallel executor (joblib.Parallel, multiprocessing.Pool) you can evaluate combinations concurrently. For large grids, use RandomizedSearchCV (random sample) or Optuna (Bayesian optimisation) instead of exhaustive product.

islice(generator, N) reads exactly N items from the generator and stops — the rest of the file is never read. This is O(N) time and O(N) memory for the sample only. Use it for: quick sanity-checking ("does the schema look right?"), creating small debugging datasets, and reservoir sampling (combined with random.shuffle on the result). For stratified sampling, you need to read more than N rows — use a two-pass approach or reservoir sampling.

chain(iter1, iter2, iter3) concatenates multiple iterators passed as separate arguments. chain.from_iterable([[1,2],[3,4],[5,6]]) flattens one level of a nested iterable — equivalent to chain(*nested) but works on arbitrarily long or lazy outer iterables (no unpacking needed). In DS/AI: chain for combining a fixed list of dataset iterators; chain.from_iterable for flattening token lists, nested feature arrays, or any structure where the number of inner iterables is not known in advance.

A generator pipeline passes data lazily through composable stages: load → filter → clean → batch, each a generator, with O(batch_size) memory regardless of dataset size. This pattern underpins every production streaming ML pipeline.
Error Handling & Testing 07–08
07

Exceptions & Runtime Validation

Good error design means failures are specific, traceable, and caught at the right layer. Runtime validation with Pydantic turns bad data into a clear error before it reaches your model.

Custom Exception Hierarchy
DataValidationError, ModelNotFittedError, chaining with raise X from Y

Custom exceptions make failure modes explicit and catchable at the right layer. The convention of a trailing underscore for fitted attributes (`model_`, `feature_names_`) means you can check `hasattr(self, "model_")` to raise a clear error before predict(). Exception chaining with `raise X from Y` preserves the root cause while providing context — the original traceback is always visible with `__cause__`.

Custom exceptions for DS/AI pipelines
class DataValidationError(ValueError):
    """Raised when input data fails schema or type checks."""
    def __init__(self, column: str, issue: str, n_bad: int = 0) -> None:
        self.column = column
        self.n_bad = n_bad
        msg = f"[{column}] {issue}"
        if n_bad:
            msg += f" ({n_bad} rows affected)"
        super().__init__(msg)

class ModelNotFittedError(RuntimeError):
    """Raised when predict() is called before fit()."""

class PipelineError(Exception):
    """Top-level pipeline failure — wraps the upstream cause."""

# Chaining: raise X from Y preserves root cause
try:
    validate_schema(df)
except DataValidationError as e:
    raise PipelineError("Ingestion stage failed") from e
# PipelineError.__cause__ → DataValidationError → full traceback

# Raise early, catch narrow
def predict(self, X: pd.DataFrame) -> npt.NDArray[np.float64]:
    if not hasattr(self, "model_"):          # trailing _ = fitted state
        raise ModelNotFittedError("Call fit() before predict()")
    missing = set(self.feature_names_) - set(X.columns)
    if missing:
        raise DataValidationError("features", f"Missing columns: {missing}")
    return self.model_.predict(X)
Pitfall Catching Exception too broadly

A bare except Exception: pass swallows KeyboardInterrupt, MemoryError, and real bugs alike.

Fix Catch the narrowest type you can recover from. Let everything else propagate. Only catch broadly at the top-level entry point where you log and exit.
Pitfall Raising generic ValueError/RuntimeError

Generic exceptions make it impossible to catch specific failures without string matching on error messages.

Fix Define a hierarchy: one base class per domain (DataError, ModelError) with specific subclasses. Callers can catch the base or the specific type.
Pitfall Losing the original exception

raise NewError("msg") inside an except block silently drops the __context__.

Fix Use raise NewError("msg") from original_exc to chain explicitly, or raise NewError("msg") from None to suppress intentionally.

Reuse built-ins when the semantics match: TypeError for wrong type, ValueError for bad value, FileNotFoundError for missing files. Define custom exceptions when callers need to catch your failures specifically — e.g., DataValidationError lets a pipeline catch data failures without catching unrelated ValueErrors from third-party libraries.

Attributes set during fit() — like self.model_, self.feature_names_, self.classes_ — get a trailing underscore by convention. This makes it easy to detect whether fit() has been called: hasattr(self, "model_") returns False before fitting and True after. Always raise ModelNotFittedError (not AttributeError) when this check fails.

Pydantic for ML Config & Validation
BaseModel, field_validator, model_validator, frozen configs for reproducible training runs

Pydantic v2 is the de facto standard for ML config validation and API schemas. It converts raw dicts/JSON into typed Python objects with validation at the boundary — before bad values reach training loops or API handlers. Key patterns: `frozen=True` makes configs hashable (usable as dict keys for caching); `@field_validator` validates individual fields; `@model_validator(mode="after")` validates relationships between fields. `model_dump_json()` serializes configs as reproducible experiment artifacts.

Pydantic v2 for training configs and API schemas
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Literal

class TrainingConfig(BaseModel, frozen=True):
    model_type: Literal["xgb", "lgbm", "rf"]
    n_estimators: int   = Field(100,  ge=1,  le=5_000)
    max_depth:    int   = Field(6,    ge=1,  le=50)
    learning_rate: float = Field(0.1, gt=0.0, le=1.0)
    feature_cols: tuple[str, ...]   # tuple for hashability (frozen)
    target_col: str

    @field_validator("feature_cols")
    @classmethod
    def features_not_empty(cls, v: tuple[str, ...]) -> tuple[str, ...]:
        if not v:
            raise ValueError("feature_cols must not be empty")
        return v

    @model_validator(mode="after")
    def target_not_in_features(self) -> "TrainingConfig":
        if self.target_col in self.feature_cols:
            raise ValueError(
                f"target_col '{self.target_col}' must not be in feature_cols"
            )
        return self

# Load from YAML/env — validated at the boundary
cfg = TrainingConfig.model_validate({
    "model_type": "xgb",
    "feature_cols": ["age", "tenure", "spend"],
    "target_col": "churn",
})
# frozen=True → cfg is hashable → usable as cache key
results_cache: dict[TrainingConfig, float] = {}

# Serialize for experiment tracking (MLflow, W&B)
json_str = cfg.model_dump_json()        # deterministic, round-trips

# API response schema
class EmbeddingResponse(BaseModel):
    vector: list[float]
    model:  str
    tokens: int
Pitfall Using dataclasses for external data

dataclasses.__post_init__ validation only runs at construction — not when attributes are mutated. They also do not validate types from JSON/dict sources.

Fix Use Pydantic when data comes from outside your process (API requests, config files, environment variables). Use dataclasses for pure internal data structures where you control construction.
Pitfall Pydantic v1 vs v2 API differences

Many tutorials still show .dict() and @validator — these are Pydantic v1. v2 uses .model_dump() and @field_validator.

Fix Check your installed version: python -c "import pydantic; print(pydantic.__version__)". In v2: model_dump(), model_dump_json(), model_validate(), @field_validator, @model_validator.
Pitfall Mutable defaults in Pydantic models

feature_cols: list[str] = [] shares the same list across all instances in some edge cases.

Fix Use default_factory=list via Field(default_factory=list), or prefer tuple for immutable sequences. frozen=True prevents mutation after construction.

frozen=True makes Pydantic models immutable and hashable — you can use them as dict keys or set members. This is ideal for experiment configs (cache by config → result) and any key you want to use in a lookup. The trade-off: you cannot update fields in place; use model.model_copy(update={"n_estimators": 200}) instead.

FastAPI uses Pydantic models directly for request body parsing and response serialization. Define a PredictRequest(BaseModel) with your feature fields, and FastAPI validates the JSON, returns 422 with detailed errors on failure, and generates an OpenAPI schema automatically. This means your ML endpoint gets type checking, auto-docs, and validation for free.

Context-Aware Error Handling
Narrow except, structured logging in handlers, ExceptionGroup for concurrent pipeline failures

Three rules for error handling in DS/AI code: (1) catch the narrowest exception you can recover from; (2) log with context at the point of handling, not at the point of raising; (3) use `suppress()` only for genuinely optional operations. Python 3.11 introduced `ExceptionGroup` and `except*` for cases where multiple concurrent tasks each raise — common in async data loading and LLM batch inference.

Structured error handling for data pipelines
import logging
from contextlib import suppress

logger = logging.getLogger(__name__)

def load_and_validate(path: str) -> pd.DataFrame:
    try:
        df = pd.read_parquet(path)
    except FileNotFoundError:
        raise DataValidationError("path", f"File not found: {path}")
    except pa.ArrowInvalid as exc:           # Parquet-specific parse error
        raise DataValidationError("schema", "Parquet schema mismatch") from exc
    except Exception as exc:                 # catch-all only as last resort
        logger.exception("Unexpected error loading %s", path)
        raise PipelineError(f"Failed to load {path}") from exc

    if df.empty:
        raise DataValidationError("rows", "File is empty", n_bad=0)
    return df

# Python 3.11: ExceptionGroup for concurrent failures
import asyncio

async def load_all(paths: list[str]) -> list[pd.DataFrame]:
    tasks = [asyncio.create_task(async_load(p)) for p in paths]
    try:
        return await asyncio.gather(*tasks, return_exceptions=False)
    except* DataValidationError as eg:       # except* matches group members
        failed = [str(e) for e in eg.exceptions]
        logger.error("%d files failed validation: %s", len(failed), failed)
        raise

# suppress: only for truly optional cleanup operations
with suppress(FileNotFoundError):
    cache_path.unlink()                      # clean up stale cache; ignore if absent

# Structured logging in handlers (not in raisers)
try:
    result = model.predict(features)
except DataValidationError as e:
    logger.error(
        "Validation failed",
        extra={"column": e.column, "n_bad": e.n_bad, "path": path}
    )
    return fallback_prediction()
Pitfall Logging inside the raise path

Logging at the point of raising means every intermediate handler also logs, producing duplicate entries for the same error.

Fix Log at the point of handling (where you decide what to do). Raisers just raise — they do not log. The handler closest to recovery is the one that should log.
Pitfall Using suppress() too broadly

suppress(Exception) is equivalent to a bare except pass — it hides real bugs.

Fix Only suppress specific, expected exceptions for genuinely optional operations. If you find yourself suppressing broad exceptions, the code needs restructuring.
Pitfall except* available only in Python 3.11+

ExceptionGroup and except* syntax cause SyntaxError on Python 3.10 and earlier.

Fix Gate with sys.version_info checks or use asyncio.gather(return_exceptions=True) and inspect results manually for Python < 3.11 compatibility.

Catch only when you can take a meaningful recovery action: return a default, retry with backoff, switch to a fallback, or convert to a domain-specific exception. If you cannot do any of these, let it propagate. Catching and re-raising without transformation is usually wrong — use raise (no argument) to re-raise the original exception with its traceback intact.

When you run asyncio.gather() over N LLM API calls and some fail, Python 3.11 collects the failures into an ExceptionGroup. With except* RateLimitError as eg:, you get only the rate-limit failures in eg.exceptions — you can retry those while letting other exception types propagate. This is cleaner than checking return_exceptions=True and manually partitioning the results list.

Type Guards & Runtime Narrowing
TypeGuard, assert_never for exhaustiveness, cast(), match statement for result dispatch

Type narrowing is how static type checkers understand that after `if isinstance(x, np.ndarray)`, `x` is definitely an ndarray. `TypeGuard` lets you write custom narrowing functions. `assert_never` makes mypy enforce that a match/if chain is exhaustive — it fails at type-check time if you add a new union arm without handling it. These patterns are especially useful in ML systems where functions return `str | float | None` (predictions, model outputs) or `np.ndarray | pd.DataFrame` (array-or-dataframe APIs).

Type guards, exhaustiveness, and runtime narrowing
from typing import TypeGuard, assert_never, cast, Never
import numpy as np
import numpy.typing as npt

# TypeGuard: custom narrowing function
def is_float64_array(arr: object) -> TypeGuard[npt.NDArray[np.float64]]:
    """Narrows object → float64 NDArray — validates before consuming embeddings."""
    return (
        isinstance(arr, np.ndarray)
        and arr.dtype == np.float64
        and arr.ndim == 2
    )

embeddings = model_registry.get_embeddings()
if is_float64_array(embeddings):
    # mypy knows: embeddings is NDArray[np.float64] here
    similarity = embeddings @ embeddings.T

# assert_never: exhaustiveness checking
ModelOutput = str | float | None

def to_float(result: ModelOutput) -> float:
    match result:
        case float(v):
            return v
        case str(s):
            return float(s)
        case None:
            return 0.0
        case _:
            assert_never(result)    # mypy error if ModelOutput gains a new arm

# cast(): tell mypy what you verified manually
raw = config_registry["embedding_model"]          # type: object
model = cast(EmbeddingModel, raw)                 # you checked isinstance() above

# isinstance narrowing — works automatically with mypy
def process(data: pd.DataFrame | np.ndarray) -> pd.DataFrame:
    if isinstance(data, np.ndarray):
        # mypy knows: data is np.ndarray here
        return pd.DataFrame(data)
    # mypy knows: data is pd.DataFrame here
    return data
Pitfall Overusing cast()

cast() is a lie to mypy — it performs no runtime check and can mask real type mismatches.

Fix Only use cast() after you have already verified the type with isinstance() or a TypeGuard function. Prefer isinstance() narrowing, which is both runtime-safe and understood by mypy without any cast.
Pitfall match statement not narrowing as expected

match x: / case str(): only matches if x is exactly str, not a subclass by default.

Fix For union narrowing in match, use case str(s): (which binds the matched value) or case _ if isinstance(x, str): for subclass-safe matching.
Pitfall assert_never() raises at runtime too

assert_never(x) raises AssertionError if called at runtime — it is not only a mypy check.

Fix This is usually what you want — a runtime guard for the "impossible" case. If you hit it in production, it means a new union arm was added without updating the handler.

Use plain isinstance() for narrowing to standard types — mypy understands it directly. Use TypeGuard when your narrowing function checks multiple conditions (dtype + ndim + shape) that isinstance() alone cannot express. TypeGuard tells mypy "if this function returns True, the argument has this type."

ML systems frequently evolve their result types — a new model returns a dict where old models returned a float. assert_never() in your dispatch function causes mypy to flag every handler that does not cover the new type, before deployment. At runtime, if somehow an uncovered type reaches the handler, AssertionError fires immediately rather than silently producing wrong output.

Raise early and narrow: the further an exception travels from its source, the harder it is to diagnose. Custom exception types document failure modes as clearly as types document APIs.
08

Testing Data-Intensive Code

Testing ML code is harder than testing pure functions: outputs are probabilistic, data shapes matter, and external APIs need mocking. pytest fixtures and hypothesis make it manageable.

pytest Foundations for DS/AI
conftest.py fixtures with scope, pytest.approx for float metrics, tmp_path for model artifacts

pytest fixtures with scope control how often expensive objects are created. `scope="session"` creates a fixture once for the entire test run — perfect for a 1,000-row synthetic DataFrame or a fitted sklearn Pipeline. `scope="function"` (default) rebuilds every test — right for mutable objects that tests might modify. `pytest.approx()` handles floating-point comparisons correctly: it checks that values are within a relative (1e-6) or absolute tolerance, which is what you need for model probabilities, metric values, and normalized outputs.

conftest.py and pytest patterns for ML code
# conftest.py — shared fixtures across the test suite
import pytest
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

@pytest.fixture(scope="session")
def sample_df() -> pd.DataFrame:
    """Synthetic DataFrame built once per test session."""
    rng = np.random.default_rng(42)
    return pd.DataFrame({
        "age":    rng.integers(18, 80, size=1_000),
        "tenure": rng.integers(0, 120, size=1_000),
        "spend":  rng.exponential(scale=200, size=1_000),
        "churn":  rng.integers(0, 2, size=1_000),
    })

@pytest.fixture(scope="session")
def fitted_pipeline(sample_df: pd.DataFrame) -> Pipeline:
    pipe = Pipeline([("scaler", StandardScaler()), ("clf", LogisticRegression())])
    X = sample_df[["age", "tenure", "spend"]]
    return pipe.fit(X, sample_df["churn"])

# test_model.py
def test_predict_returns_binary(fitted_pipeline, sample_df):
    X = sample_df[["age", "tenure", "spend"]]
    preds = fitted_pipeline.predict(X)
    assert set(preds).issubset({0, 1})

def test_predict_proba_sums_to_one(fitted_pipeline, sample_df):
    X = sample_df[["age", "tenure", "spend"]]
    proba = fitted_pipeline.predict_proba(X)
    # pytest.approx handles float: checks within abs=1e-6
    assert proba.sum(axis=1) == pytest.approx(np.ones(len(X)), abs=1e-6)

def test_model_roundtrips_via_joblib(fitted_pipeline, sample_df, tmp_path):
    import joblib
    path = tmp_path / "model.joblib"
    joblib.dump(fitted_pipeline, path)
    loaded = joblib.load(path)
    X = sample_df[["age", "tenure", "spend"]]
    np.testing.assert_array_equal(loaded.predict(X), fitted_pipeline.predict(X))
Pitfall session-scoped fixtures with mutable state

If a test modifies a session-scoped DataFrame (df["new_col"] = ...), subsequent tests see the mutation and fail non-deterministically.

Fix Either use scope="function" for fixtures that tests mutate, or return df.copy() from the fixture so each test gets its own copy. Mark mutable fixtures with scope="function" by default.
Pitfall Using == for float comparisons in tests

assert 0.1 + 0.2 == 0.3 fails due to IEEE 754. So does assert auc_score == 0.85 even when the model is correct.

Fix Always use pytest.approx() for floats: assert score == pytest.approx(0.85, abs=0.01). For NumPy arrays, use np.testing.assert_allclose(result, expected, rtol=1e-5).
Pitfall Placing fixtures in test files instead of conftest.py

Fixtures defined in a test file are only available in that file. Duplicate fixtures across files drift apart.

Fix Put shared fixtures (sample DataFrames, fitted models, database connections) in conftest.py at the appropriate directory level. pytest auto-discovers conftest.py files.

scope="session" if the model is only read by tests (predict/transform). If any test modifies the model or the pipeline steps, use scope="function". Fitting a large model in every test function is expensive — session scope is a common optimization for read-only fixtures.

Use pytest.raises() as a context manager: with pytest.raises(DataValidationError) as exc_info: / validate(bad_df). After the block, inspect exc_info.value.column or str(exc_info.value) to assert on the exception content. Add match= for a regex check on the message: pytest.raises(ValueError, match="feature_cols must not be empty").

Parametrize & Property-Based Testing
@pytest.mark.parametrize for input variants, hypothesis arrays for adversarial edge cases

`@pytest.mark.parametrize` runs one test function with many input combinations — far cleaner than copy-pasting test functions. It is ideal for testing feature transformers across different input shapes, model types, or config values. `hypothesis` generates test inputs automatically, including adversarial edge cases you would not think to write: empty arrays, all-NaN columns, extreme values, duplicate rows. `hypothesis.extra.numpy.arrays()` generates NumPy arrays with controlled dtype and shape strategies.

parametrize and hypothesis for ML code
import pytest
import numpy as np
import numpy.typing as npt
from hypothesis import given, settings
from hypothesis import strategies as st
from hypothesis.extra.numpy import arrays

# parametrize: multiple input shapes, one test function
@pytest.mark.parametrize("n_rows,n_cols", [
    (100, 5),
    (1_000, 50),
    (1, 10),        # single-row edge case
    (500, 1),       # single-feature edge case
])
def test_feature_engineer_preserves_rows(n_rows: int, n_cols: int) -> None:
    X = np.random.rand(n_rows, n_cols)
    result = feature_engineer(X)
    assert result.shape[0] == n_rows

# parametrize over model types
@pytest.mark.parametrize("model_type", ["xgb", "lgbm", "rf"])
def test_training_config_valid_model_types(model_type: str) -> None:
    cfg = TrainingConfig(
        model_type=model_type,
        feature_cols=("age", "tenure"),
        target_col="churn",
    )
    assert cfg.model_type == model_type

# hypothesis: generates adversarial edge cases automatically
@given(
    arrays(
        dtype=np.float64,
        shape=st.tuples(st.integers(1, 200), st.just(5)),
        elements=st.floats(min_value=-1e6, max_value=1e6, allow_nan=False),
    )
)
@settings(max_examples=300)
def test_normalise_output_in_range(arr: npt.NDArray[np.float64]) -> None:
    result = min_max_normalise(arr)
    assert float(result.min()) >= 0.0 - 1e-9
    assert float(result.max()) <= 1.0 + 1e-9

# hypothesis finds the minimal failing example automatically
@given(st.lists(st.text(min_size=1), min_size=1, max_size=100))
def test_tokeniser_never_returns_empty_tokens(texts: list[str]) -> None:
    tokens = batch_tokenise(texts)
    assert len(tokens) == len(texts)
    assert all(isinstance(t, list) for t in tokens)
Pitfall hypothesis database and flaky tests

hypothesis stores previously failing examples in a local database and replays them on subsequent runs. If the test environment changes (different random seed, different data), replayed examples can appear flaky.

Fix Set a fixed database path in pyproject.toml: [tool.pytest.ini_options] with hypothesis settings. Use @settings(deriving=True) to suppress database replay in CI if needed.
Pitfall Too few parametrize cases

Testing only (100, 5) misses single-row (1, N) and single-feature (N, 1) edge cases that often expose off-by-one errors in reshape calls.

Fix Always include boundary cases: n=1, n=max, empty (where valid). For shape-sensitive code, test (1, N), (N, 1), and (1, 1) explicitly.
Pitfall hypothesis with allow_nan=True breaking downstream asserts

NaN values in generated arrays cause most arithmetic to return NaN, which makes assert result.min() >= 0.0 always fail.

Fix Use elements=st.floats(allow_nan=False, allow_infinity=False) for arrays that will pass through normalization or statistical functions. Test NaN handling separately with explicit NaN inputs.

Use parametrize when you know exactly which cases matter and want to document them explicitly. Use hypothesis when you want to discover edge cases you have not thought of — adversarial inputs, boundary conditions, rare value combinations. hypothesis is especially valuable for data transformation functions where the output contract (values in [0, 1], shape preserved, no NaNs) should hold for any valid input.

When hypothesis finds a failing input, it automatically shrinks it to the smallest/simplest example that still fails — e.g., an array of 47 rows becomes an array of 1 row. This minimization makes the bug obvious. The minimal failing example is stored in the hypothesis database and replayed on every subsequent run until the bug is fixed.

Mocking LLM & External APIs
AsyncMock for async LLM clients, MagicMock(spec=) to catch API drift, side_effect for retry testing

Testing code that calls LLM APIs (OpenAI, Anthropic, Cohere) requires mocking: real calls are slow, expensive, and non-deterministic. The key tools are `unittest.mock.patch` for replacing objects at the call site, `AsyncMock` for async clients, and `MagicMock(spec=SomeClass)` to catch attribute-name typos against the actual API shape. `side_effect` lets a mock raise exceptions or return different values on successive calls — essential for testing retry logic and error handling.

Mocking async LLM calls, S3, and retry logic
from unittest.mock import AsyncMock, MagicMock, patch, call
import pytest

# Mock async OpenAI embeddings call
@pytest.mark.asyncio
async def test_embedder_batches_correctly() -> None:
    mock_resp = MagicMock()
    mock_resp.data = [MagicMock(embedding=[0.1] * 1_536)] * 10

    with patch("myapp.embedder.AsyncOpenAI") as mock_cls:
        mock_client = mock_cls.return_value
        mock_client.embeddings.create = AsyncMock(return_value=mock_resp)

        embedder = OpenAIEmbedder(client=mock_client, batch_size=10)
        result = await embedder.embed(["text"] * 25)     # 25 docs → 3 batches

    assert mock_client.embeddings.create.call_count == 3
    assert len(result) == 25

# MagicMock(spec=) catches attribute typos against the real API
mock_s3 = MagicMock(spec=boto3.client("s3"))
mock_s3.upload_file("local.parquet", "bucket", "key.parquet")   # OK
# mock_s3.uplod_file(...)                                        # → AttributeError

# side_effect: simulate rate-limit then success (retry testing)
@pytest.mark.asyncio
async def test_llm_client_retries_on_rate_limit() -> None:
    from openai import RateLimitError
    attempts = 0

    async def flaky_create(*args, **kwargs):
        nonlocal attempts
        attempts += 1
        if attempts < 3:
            raise RateLimitError("rate limited", response=MagicMock(), body={})
        return MagicMock(choices=[MagicMock(message=MagicMock(content="answer"))])

    with patch.object(llm_client.chat.completions, "create", side_effect=flaky_create):
        result = await call_with_retry(llm_client, prompt="hello")

    assert result == "answer"
    assert attempts == 3         # failed twice, succeeded on third

# Sequence of return values
mock_fn = MagicMock(side_effect=[10, 20, ValueError("done")])
mock_fn()   # → 10
mock_fn()   # → 20
mock_fn()   # → raises ValueError
Pitfall Patching the wrong namespace

patch("openai.AsyncOpenAI") replaces the original definition, not the import in your module. If your code does from openai import AsyncOpenAI, the patch has no effect.

Fix Always patch where the name is looked up: patch("myapp.embedder.AsyncOpenAI") patches the reference in your module. The rule: patch the path your code uses, not where the object is defined.
Pitfall Using MagicMock for async methods without AsyncMock

A MagicMock() is not awaitable. Calling await mock_client.embeddings.create(...) raises TypeError: object MagicMock cannot be used in an await expression.

Fix Use AsyncMock for any attribute that will be awaited: mock_client.embeddings.create = AsyncMock(return_value=...). Or use AsyncMock() as the top-level mock and its async methods auto-configure.
Pitfall Not asserting call arguments

A test that mocks an API but only checks the return value does not verify that the right data was sent to the API.

Fix Assert on mock_fn.call_args or mock_fn.call_args_list: mock_client.embeddings.create.assert_called_with(input=["text"] * 10, model="text-embedding-3-small").

Prefer mocking at the client/transport level (the OpenAI client) rather than your own wrapper functions. This tests that your wrapper correctly calls the underlying API with the right arguments. Mocking your own functions tests nothing meaningful — it just makes the test pass by replacing the code under test.

Use side_effect with the appropriate exception: AsyncMock(side_effect=openai.APITimeoutError("timeout")). Then assert that your function either retries, returns a fallback, or raises an appropriate domain exception. Test both the retry path (side_effect=[TimeoutError, TimeoutError, good_response]) and the give-up path (side_effect=[TimeoutError] * max_retries).

Testing ML Pipelines End-to-End
assert_frame_equal for DataFrame outputs, pipeline contract tests, integration test patterns

ML pipelines have implicit contracts: shapes are preserved, column names are deterministic, dtypes do not change unexpectedly, NaNs are not introduced. `pandas.testing.assert_frame_equal` verifies all of these in one call with configurable tolerance. Integration tests — tests that run the full pipeline from raw data to prediction — catch wiring bugs that unit tests miss. They are slower but essential for validating that preprocessing → feature engineering → model form a consistent chain.

DataFrame contract tests and pipeline integration tests
import pandas as pd
import numpy as np
from pandas.testing import assert_frame_equal, assert_series_equal
from sklearn.pipeline import Pipeline
import pytest

# Contract test: feature pipeline output schema
def test_feature_pipeline_output_schema(sample_df: pd.DataFrame) -> None:
    result = feature_pipeline(sample_df)

    expected_cols = {"age_norm", "tenure_log", "spend_per_day"}
    assert set(result.columns) == expected_cols, (
        f"Missing: {expected_cols - set(result.columns)}, "
        f"Extra: {set(result.columns) - expected_cols}"
    )
    assert result.dtypes["age_norm"] == np.float64
    assert result.isna().sum().sum() == 0       # pipeline must not introduce NaNs
    assert len(result) == len(sample_df)        # rows preserved

# assert_frame_equal: element-wise with float tolerance
def test_normalise_transform(sample_df: pd.DataFrame) -> None:
    result = log_normalise(sample_df[["spend"]].copy())
    expected = pd.DataFrame(
        {"spend": np.log1p(sample_df["spend"])},
        index=sample_df.index,
    )
    assert_frame_equal(
        result, expected,
        check_exact=False,
        atol=1e-9,          # absolute tolerance for float comparison
        check_dtype=True,   # dtype must match exactly
        check_names=True,   # column names must match
    )

# Integration test: raw CSV → prediction
def test_full_pipeline_end_to_end(tmp_path: Path) -> None:
    # Write a minimal test CSV
    csv_path = tmp_path / "test.csv"
    pd.DataFrame({"age": [30, 45], "tenure": [12, 60], "spend": [150.0, 800.0]})       .to_csv(csv_path, index=False)

    predictions = run_pipeline(str(csv_path))   # full pipeline under test

    assert len(predictions) == 2
    assert all(0.0 <= p <= 1.0 for p in predictions)   # probability scores

# Regression test: output must not change from a known-good baseline
def test_pipeline_output_is_deterministic(fitted_pipeline, sample_df):
    X = sample_df[["age", "tenure", "spend"]]
    run1 = fitted_pipeline.predict_proba(X)
    run2 = fitted_pipeline.predict_proba(X)
    np.testing.assert_array_equal(run1, run2)   # exact equality for deterministic model
Pitfall Not testing dtype preservation

A pipeline that silently converts int64 to float64 or object causes subtle downstream bugs — sklearn estimators reject object dtypes silently or with cryptic errors.

Fix Always assert result.dtypes against expected dtypes in feature pipeline tests. Use assert_frame_equal(check_dtype=True) which is the default.
Pitfall Integration tests that depend on external services

An integration test that calls a real database or S3 bucket makes CI non-deterministic and slow.

Fix Use pytest marks: @pytest.mark.integration and run integration tests separately (pytest -m integration). Mock external services in unit tests; only hit real services in dedicated integration test runs.
Pitfall Testing the pipeline with the same data used for fitting

Evaluating fitted_pipeline.predict() on sample_df (the training data) tests memorization, not generalization.

Fix Hold out a test split using a different random seed, or generate a separate test DataFrame in a separate fixture. The training fixture and evaluation fixture should not share data.

assert_frame_equal is DataFrame-aware: it checks column names, dtypes, index alignment, and values — all in one call with informative error messages that show which columns differ. np.testing.assert_allclose only checks numeric values with relative/absolute tolerance. Use assert_frame_equal for DataFrame outputs, np.testing.assert_allclose or np.testing.assert_array_equal for raw NumPy arrays.

Test each stage in isolation (unit tests with mock inputs), then test the full chain (integration test). Unit tests catch logic errors in individual transformers; integration tests catch interface mismatches between stages. A good integration test checks the output contract (shape, dtype, range, no NaNs) rather than exact values — exact values make tests brittle as the model or data evolves.

pytest.approx() is not optional for ML — floating-point arithmetic means 0.333... != 1/3 in a way that breaks naive assertEqual. Testing that probabilities sum to 1.0 without tolerance will fail on legitimate code.
Scientific Python 09–10
09

NumPy — Numerical Computing

NumPy is the foundation of all numerical Python. Understanding arrays, broadcasting, and vectorization separates Data Scientists from script writers.

ndarray & dtypes
shape, strides, memory layout, dtype selection, view vs copy

NumPy arrays are fixed-type, contiguous memory blocks. Each array has a dtype (float64, int32, bool), shape (tuple of dimensions), and strides (bytes to advance in each dimension). dtype choice directly affects memory and speed: float32 halves memory vs float64 with acceptable precision loss for many ML tasks. Strides determine memory layout: C-order (row-major, default) is optimal for row-wise access; Fortran-order for column-wise. A slice returns a view sharing the same memory — modifying it modifies the original. .copy() forces an independent copy.

NumPy — dtypes, shapes, strides, views vs copies
import numpy as np

# dtype selection — memory footprint matters at scale
f64 = np.array([1.0, 2.0, 3.0])                       # float64 default: 24 bytes
f32 = np.array([1.0, 2.0, 3.0], dtype=np.float32)     # float32: 12 bytes
print(f64.dtype, f64.nbytes)  # float64 24
print(f32.dtype, f32.nbytes)  # float32 12

# shape and strides
arr = np.arange(12).reshape(3, 4)
print(arr.shape)    # (3, 4)
print(arr.strides)  # (32, 8) — 32 bytes/row, 8 bytes/element in float64

# C-order vs F-order
c = np.array([[1, 2], [3, 4]], order='C')  # row-major (default)
f = np.asfortranarray(c)                   # column-major
print(c.flags['C_CONTIGUOUS'])   # True
print(f.flags['F_CONTIGUOUS'])   # True

# View vs copy — critical to understand
arr = np.arange(10, dtype=np.float64)
view = arr[2:7]      # slice → view (shares memory)
view[0] = 99.0       # modifies arr[2] in place!
print(arr[2])        # 99.0

copy = arr[2:7].copy()  # independent copy
copy[0] = 0.0           # does NOT affect arr

# astype always returns a copy
arr_int = np.array([1, 2, 3], dtype=np.int32)
arr_f64 = arr_int.astype(np.float64)   # new array, no shared memory

# np.asarray: avoid copy if already the right dtype
def to_float64(x):
    return np.asarray(x, dtype=np.float64)  # copies only if needed

# Structured dtypes — lightweight typed records
dt = np.dtype([('age', np.int32), ('score', np.float64)])
records = np.array([(25, 0.91), (30, 0.85)], dtype=dt)
print(records['age'])    # [25 30]
print(records['score'])  # [0.91 0.85]

# Memory layout check
X = np.random.randn(1000, 512)
print(X.flags['C_CONTIGUOUS'])  # True — row-wise ops are cache-friendly
Pitfall Assuming a slice creates an independent copy

arr[2:5][0] = 99 modifies the original array because slices return views. This causes silent data corruption in preprocessing pipelines that modify slices of a shared array.

Fix Use arr[2:5].copy() whenever you need an independent copy. Use np.may_share_memory(a, b) to check whether two arrays share underlying data.
Pitfall Using float64 for large embedding matrices by default

A 1M × 768 float64 embedding matrix requires 6 GB of RAM. float32 gives the same result for cosine similarity and dot products in most models.

Fix Cast to float32 after loading: embeddings = embeddings.astype(np.float32). Halves memory with negligible precision loss for retrieval and similarity tasks.
Pitfall Transposing a large array changes strides without copying

arr.T returns a view with swapped strides — it is no longer C-contiguous. Operations on a non-contiguous array are slower because they access non-adjacent memory locations.

Fix After transpose, use np.ascontiguousarray(arr.T) if many sequential operations follow. Profile first — the copy cost may be less than the penalty of non-contiguous access in a hot loop.

A view shares the same underlying memory buffer as the original array. Modifying a view modifies the original. Slices always return views; fancy indexing (integer array index) always returns a copy. Check with np.shares_memory(a, b) or arr.base is not None. In preprocessing pipelines, always use .copy() when you modify a subset to avoid corrupting shared training data.

Choose float32 when: (1) storing large embedding matrices — halves RAM; (2) feeding data to a neural network — most models train in float32 anyway; (3) computing cosine similarity — relative magnitudes are preserved. Keep float64 for: statistical computations where small rounding differences accumulate (e.g., computing covariance matrices), metric evaluation code, and financial calculations. Rule: float32 for storage and inference, float64 for statistical analysis.

NumPy operations are fastest on C-contiguous (row-major) arrays because CPU caches load adjacent memory bytes together. For row-wise operations (mean per row), C-order is optimal. For column-wise operations, F-order is optimal. A transposed C-order array is F-contiguous — column operations are cache-friendly. Non-contiguous arrays cause cache misses. Use np.ascontiguousarray() to force a copy into C-order when performing many sequential row-wise ops on a non-contiguous array.

Broadcasting
Rules, mental model, keepdims, newaxis, pairwise distance, softmax

Broadcasting allows operations on arrays with different shapes without materialising intermediate copies. NumPy aligns shapes right-to-left: a dimension of size 1 "stretches" to match the other; incompatible non-1 dimensions raise ValueError. np.newaxis (equivalent to None) inserts a size-1 dimension for explicit control. Broadcasting enables vectorised operations like row-wise normalisation, pairwise distance matrices, and numerically stable softmax — all without Python loops.

NumPy — broadcasting rules, keepdims, newaxis, pairwise distance, softmax
import numpy as np

# Basic: (3,) broadcasts over (4, 3)
matrix = np.arange(12).reshape(4, 3).astype(float)   # (4, 3)
bias   = np.array([10.0, 20.0, 30.0])                 # (3,)
result = matrix + bias                                  # each row += bias → (4, 3)

# Row-wise z-score — keepdims preserves the reduction axis
X = np.random.randn(1000, 512)
mu  = X.mean(axis=1, keepdims=True)   # (1000, 1) — not (1000,)
std = X.std(axis=1,  keepdims=True)   # (1000, 1)
Z   = (X - mu) / std                   # (1000, 512) — broadcast (1000,1)

# Without keepdims, mu shape is (1000,) which won't broadcast against (1000, 512)

# np.newaxis: control dimensions explicitly
col_vec = np.array([1, 2, 3])[:, np.newaxis]   # (3, 1)
row_vec = np.array([10, 20, 30])               # (3,)
outer   = col_vec * row_vec                    # (3, 3) — outer product

# Pairwise L2 distance: (N, D) vs (M, D) → (N, M) without a loop
def pairwise_l2(a: np.ndarray, b: np.ndarray) -> np.ndarray:
    # a: (N, D), b: (M, D)
    diff = a[:, np.newaxis, :] - b[np.newaxis, :, :]  # (N, M, D)
    return np.sqrt((diff ** 2).sum(axis=-1))            # (N, M)

A = np.random.randn(100, 128)
B = np.random.randn(50, 128)
dists = pairwise_l2(A, B)   # (100, 50) — zero Python loops

# Numerically stable softmax (subtract max per row)
logits = np.array([[1.0, 2.0, 3.0], [4.0, 1.0, 0.5]])
shifted = logits - logits.max(axis=1, keepdims=True)  # (2, 3) - (2, 1)
exp     = np.exp(shifted)
probs   = exp / exp.sum(axis=1, keepdims=True)         # (2, 3)
print(probs.sum(axis=1))   # [1.0, 1.0] — correct

# Shape error diagnosis — always print shapes
try:
    np.array([1, 2, 3]) + np.array([1, 2])
except ValueError as e:
    print(e)   # operands could not be broadcast together with shapes (3,) (2,)
Pitfall Forgetting keepdims=True after a reduction

X.mean(axis=1) returns shape (N,), not (N,1). Subtracting it from (N,D) triggers unexpected broadcasting — NumPy aligns right-to-left, so (N,) broadcasts against the D dimension, not the N dimension.

Fix Always use keepdims=True when you intend to broadcast the result back: X - X.mean(axis=1, keepdims=True). Check output shapes with print(result.shape) after each step during development.
Pitfall Large intermediate arrays from broadcasting cause OOM

pairwise_l2 with a[:, np.newaxis, :] - b[np.newaxis, :, :] creates a (N, M, D) intermediate array. For N=M=10000, D=768, that is 576 GB — OOM on any machine.

Fix For large pairwise distances, use scipy.spatial.distance.cdist which is memory-efficient, or compute in chunks. For cosine similarity, use a @ b.T / (norms_a * norms_b.T) which avoids the (N, M, D) expansion.
Pitfall Scalar vs 0-d array difference in broadcasting

np.array(5) is a 0-dimensional array, not a scalar. It broadcasts like a scalar but .shape is () — this breaks code that checks ndim or assumes scalars are Python ints.

Fix Use float(arr) or int(arr) to convert 0-d arrays to Python scalars when needed. Check with arr.ndim == 0 or np.isscalar(x) to detect 0-d arrays.

Align shapes right-to-left: (1000, 1, 512) and (1, 100, 512). Dim -1: 512 == 512 — no stretch. Dim -2: 1 vs 100 — the 1 stretches to 100. Dim -3: 1000 vs 1 — the 1 stretches to 1000. Result: (1000, 100, 512). This pattern computes pairwise differences between 1000 and 100 vectors of dimension 512 without a Python loop.

np.exp(1000) overflows to inf; exp(-1000) underflows to 0. Subtracting the row max shifts all logits into (-inf, 0], so exp(logit - max) is in (0, 1]. The softmax value is unchanged because the max cancels: exp(x - max) / sum(exp(x - max)) == exp(x) / sum(exp(x)). This numerical stability trick is standard in every production softmax implementation.

Normalise each row to unit length first: X_norm = X / np.linalg.norm(X, axis=1, keepdims=True). Then cosine_similarity = X_norm @ X_norm.T — an (N, N) matrix of dot products, which equals cosine similarities for unit vectors. This avoids the OOM (N, N, D) intermediate from naive broadcasting. For N=10,000 and D=768, the result is a 800 MB (N, N) float32 matrix — compute in blocks if that is too large.

Vectorization & Indexing
ufuncs, boolean masking, fancy indexing, np.where, argsort, argpartition

Vectorization replaces Python loops with C-level ufunc calls — np.exp, np.log, np.sqrt operate element-wise at C speed. Boolean masking selects elements by condition. Fancy indexing uses an integer array to select arbitrary elements and always returns a copy. np.where(cond, x, y) is the vectorised ternary. np.argsort returns index ranks; np.argpartition returns the indices of the top-k elements in O(n) without sorting everything.

NumPy — ufuncs, masking, fancy indexing, where, argsort, argpartition
import numpy as np

# Ufuncs: C-speed element-wise ops — no Python loop
scores = np.array([0.92, 0.45, 0.78, 0.31, 0.87])
log_odds = np.log(scores / (1 - scores))   # logit transform, vectorised

# Boolean masking — returns 1D array of matching elements
high    = scores[scores > 0.75]            # [0.92, 0.78, 0.87]
indices = np.where(scores > 0.75)[0]       # [0, 2, 4] — indices, not values

# np.where(cond, x, y): vectorised if-else
clamped = np.where(scores > 0.9, 0.9, scores)   # cap at 0.9
labels  = np.where(scores > 0.5, "pos", "neg")  # string labels

# Fancy indexing — select arbitrary elements (always a copy)
order     = np.array([2, 0, 4, 1, 3])
reordered = scores[order]                  # [0.78, 0.92, 0.87, 0.45, 0.31]

# 2D fancy indexing: select specific rows
X     = np.random.randn(1000, 128)
idx   = np.array([0, 5, 99, 500])
batch = X[idx]                             # (4, 128) — selected rows

# argsort: rank all elements
ranks = np.argsort(scores)[::-1]          # descending: [0, 4, 2, 1, 3]
top3_vals = scores[ranks[:3]]             # [0.92, 0.87, 0.78]

# argpartition: O(n) top-k — faster than full sort when k << n
k    = 3
part = np.argpartition(scores, -k)[-k:]  # indices of top-3 (unordered)
top3 = scores[part][np.argsort(scores[part])[::-1]]  # sort only k items

# Vectorised vs apply_along_axis
X = np.random.randn(10_000, 512)

# SLOW: apply_along_axis is a Python-level loop
# result = np.apply_along_axis(lambda row: (row - row.mean()) / row.std(), 1, X)

# FAST: broadcast operations — all in C
mu  = X.mean(axis=1, keepdims=True)
std = X.std(axis=1, keepdims=True)
result = (X - mu) / std   # same result, ~50× faster
Pitfall Fancy indexing modifying the result does not modify the original

X[[0,1,2]] = 0.0 does work (fancy indexing on the left side of assignment). But result = X[[0,1,2]]; result[:] = 0.0 does not affect X — the assignment modifies the copy.

Fix Use X[[0,1,2]] = 0.0 for in-place modification via fancy indexing. On the right side of an assignment, fancy indexing always creates a copy.
Pitfall Boolean indexing on 2D arrays returns 1D when the mask selects rows

X[X[:, 0] > 0] selects rows where the first column is positive — result shape is (k, D) which is correct. But X[X > 0] applies a 2D boolean mask and returns a 1D array of all matching elements.

Fix For row selection, build a 1D boolean mask on a single column or all-row criterion: mask = (X > 0).all(axis=1); X[mask]. For element selection, a 2D mask is intentional — be explicit about which you need.
Pitfall np.apply_along_axis for numeric operations — 100× slower than broadcasting

apply_along_axis is implemented as a Python-level loop over rows/columns. For a (10000, 512) array it calls the function 10,000 times from Python.

Fix Replace with vectorised broadcast: (X - X.mean(axis=1, keepdims=True)) / X.std(axis=1, keepdims=True). Reserve apply_along_axis only for functions that genuinely cannot be vectorised.

Boolean masking (X[X > 0]) selects and returns matching elements — the output shape is variable (depends on how many elements match). np.where(cond, x, y) is a ternary that returns an array of the same shape as cond: where cond is True it takes from x, where False it takes from y. Use boolean masking when you want to extract a subset. Use np.where when you want to replace or conditionally choose between two values element-wise.

argpartition uses the introselect algorithm (a variant of quickselect). It partially sorts the array so that the k-th element is in its final sorted position, elements before it are smaller, and elements after it are larger — all in O(n) expected time. The top-k indices are in positions [-k:], but in arbitrary order. To rank them: sort only those k indices, which is O(k log k) not O(n log n). Total: O(n + k log k) — dramatically faster than full argsort for small k.

np.where(cond, x, y) returns a new array — no in-place mutation. Use it to conditionally build a new array. Fancy indexing assignment (arr[mask] = value) modifies in place — no new array created. Use it to replace specific elements of an existing array. For a preprocessing step that clips outliers: arr = np.where(arr > 99, 99, arr) is functional (new array); arr[arr > 99] = 99 is in-place (modifies arr).

Linear Algebra
@, dot, linalg.norm, solve vs inv, eigh for PCA, einsum for tensor ops

NumPy's linalg module provides matrix operations essential for ML: @ (matmul) for matrix-matrix products, np.linalg.norm for vector/matrix norms, np.linalg.solve for systems of equations (faster and more stable than computing inv), np.linalg.eigh for symmetric matrices (PCA covariance decomposition). np.einsum is the Swiss Army knife — it expresses any tensor contraction in index notation and avoids intermediate arrays.

NumPy — matmul @, norm, solve, eigh for PCA, einsum
import numpy as np

# @ operator: matrix-vector and matrix-matrix products
W = np.random.randn(512, 256)   # weight matrix
x = np.random.randn(512)        # input vector

y = W.T @ x                          # matrix-vector → (256,)
Y = np.random.randn(100, 512) @ W    # batch → (100, 256)

# norms: L1, L2, Frobenius
v = np.array([3.0, 4.0])
print(np.linalg.norm(v))          # L2 = 5.0
print(np.linalg.norm(v, ord=1))   # L1 = 7.0

# Batch row norms — vectorised
X = np.random.randn(1000, 128)
row_norms = np.linalg.norm(X, axis=1)          # (1000,)
X_unit    = X / row_norms[:, np.newaxis]        # unit vectors

# Cosine similarity between two vectors
def cosine(a, b):
    return (a @ b) / (np.linalg.norm(a) * np.linalg.norm(b))

# Solve Ax = b — NEVER use inv for this
A = np.array([[3.0, 1.0], [1.0, 2.0]])
b = np.array([9.0, 8.0])
x = np.linalg.solve(A, b)   # [2.0, 3.0] — numerically stable
# AVOID: x = np.linalg.inv(A) @ b  — slower, less stable

# PCA via eigendecomposition of covariance
X = np.random.randn(500, 50)
cov = np.cov(X.T)                              # (50, 50) — symmetric
eigenvalues, eigenvectors = np.linalg.eigh(cov)  # eigh for symmetric
idx = np.argsort(eigenvalues)[::-1]            # descending by variance
components = eigenvectors[:, idx]              # (50, 50) — sorted PCs

n_components = 10
X_pca = X @ components[:, :n_components]      # (500, 10) — projected

# einsum: versatile tensor contraction
A = np.random.randn(32, 128)
B = np.random.randn(32, 128)

dots   = np.einsum('ij,ij->i', A, B)    # per-row dot product → (32,)
outer  = np.einsum('i,j->ij', A[0], B[0])  # outer product → (128, 128)
matmul = np.einsum('ik,jk->ij', A, B)   # A @ B.T → (32, 32)

# Attention scores: (batch, heads, seq, d_k) — typical transformer op
Q = np.random.randn(2, 8, 64, 64)   # (batch, heads, seq, d_k)
K = np.random.randn(2, 8, 64, 64)
scores = np.einsum('bhqd,bhkd->bhqk', Q, K)  # (2, 8, 64, 64)
Pitfall Using np.linalg.inv to solve a linear system

x = inv(A) @ b is mathematically correct but numerically less stable and slower than np.linalg.solve(A, b). inv compounds floating-point errors; solve uses LU decomposition directly.

Fix Always use np.linalg.solve(A, b) instead of inv(A) @ b. For multiple right-hand sides: np.linalg.solve(A, B) where B is a matrix. For least-squares: np.linalg.lstsq(A, b, rcond=None).
Pitfall Using np.linalg.eig instead of eigh for symmetric matrices

eig handles general (non-symmetric) matrices and returns complex eigenvalues. For a covariance matrix (symmetric positive semi-definite), eigenvalues are always real — eig wastes time on complex arithmetic and may return imaginary parts due to floating-point errors.

Fix Use np.linalg.eigh for symmetric/Hermitian matrices: it is faster, guarantees real eigenvalues, and returns them in ascending order. Use eig only for non-symmetric matrices.
Pitfall np.dot behaviour differs for 1D vs 2D inputs

np.dot(a, b) for 2D arrays is matrix multiply; for 1D arrays it is the inner (dot) product (returning a scalar). np.dot(W, x) where W is (m,n) and x is (n,) returns (m,) — but np.dot(x, W) for x:(n,) returns (m,) via transposed dot.

Fix Use @ for matrix operations — its behaviour is consistent: always matrix product. np.dot is a legacy API with overloaded behaviour. Reserve np.dot only for explicit 1D dot products.

solve(A, b) computes the solution directly via LU decomposition in O(n³). inv(A) @ b also takes O(n³) but performs twice the work: first compute inv(A) (itself an O(n³) solve), then multiply. More importantly, matrix inversion amplifies floating-point errors — small perturbations in A produce large errors in inv(A) for ill-conditioned matrices. solve is both faster and more numerically stable. Rule: if you need Ax = b, use solve. Only explicitly compute inv when you need A⁻¹ itself (e.g., Woodbury identity).

einsum is preferred when: (1) the operation cannot be expressed as a single matmul — e.g., per-batch inner products, outer products, or multi-head attention scores; (2) you want to avoid explicitly reshaping tensors; (3) you want einsum to choose the optimal contraction order (use opt_einsum library for this). matmul/@ is preferred for standard matrix products because it is more readable and NumPy can delegate to optimised BLAS routines. In practice: use @ for clarity, einsum for anything requiring index manipulation.

Compute the L2 norm per row and divide: norms = np.linalg.norm(X, axis=1, keepdims=True); X_norm = X / norms. Add a small epsilon to avoid division by zero for zero vectors: norms = np.maximum(norms, 1e-12). This is O(n×d) — one pass. Alternatively, scipy.preprocessing.normalize(X, norm="l2") does the same thing. After normalisation, cosine similarity reduces to dot product: X_norm @ X_norm.T.

A Python loop over a NumPy array is ~100× slower than a vectorized operation. If you are looping, you are probably doing it wrong.
10

Pandas — Data Analysis

Pandas is the lingua franca of data work. Knowing the difference between loc/iloc, understanding GroupBy, and avoiding SettingWithCopyWarning separates experts from beginners.

Series & DataFrame
loc, iloc, at, iat, boolean indexing, SettingWithCopyWarning, dtype inspection

Series is a 1D labelled array; DataFrame is a 2D table of named columns. loc uses label-based indexing (inclusive on both ends), iloc uses integer-position indexing (exclusive on right end). at/iat are O(1) single-element access without the overhead of loc/iloc. The SettingWithCopyWarning fires when you attempt to modify a slice — always use df.loc[mask, col] for in-place assignment, or .copy() for an independent subset.

Pandas — loc/iloc, at/iat, boolean indexing, SettingWithCopyWarning
import pandas as pd
import numpy as np

df = pd.DataFrame({
    'age':   [25, 30, 35, 40],
    'score': [0.91, 0.75, 0.88, 0.62],
    'label': ['pos', 'neg', 'pos', 'neg'],
}, index=['a', 'b', 'c', 'd'])

# loc: label-based, both ends inclusive
print(df.loc['b':'c', 'age':'score'])    # rows b,c — columns age,score

# iloc: position-based, right end exclusive
print(df.iloc[1:3, 0:2])               # rows 1-2, cols 0-1 (same result)

# at / iat: single-element, O(1) — no full loc overhead
print(df.at['b', 'age'])    # 30  — label lookup
print(df.iat[1, 0])         # 30  — position lookup

# Boolean indexing
high   = df[df['score'] > 0.8]
pos_hi = df[(df['label'] == 'pos') & (df['score'] > 0.8)]

# SettingWithCopyWarning — BAD pattern
subset = df[df['label'] == 'pos']   # may be a view OR copy — ambiguous
# subset['score'] = 0.0             # raises SettingWithCopyWarning — do not do this

# CORRECT: modify via .loc on the original DataFrame
df.loc[df['label'] == 'pos', 'score'] = 0.99

# CORRECT: .copy() when you need an independent subset to modify
subset = df[df['label'] == 'pos'].copy()
subset['score'] = 0.0               # safe — only affects the copy

# dtype inspection and casting
print(df.dtypes)
num_cols = df.select_dtypes(include='number').columns.tolist()

# Nullable integer type — keeps int without float promotion on NaN
s = pd.Series([1, 2, None], dtype=pd.Int64Dtype())
print(s.isna())    # [False, False, True]
print(s.dtype)     # Int64 — not float64

# pd.NA vs np.nan
pd.isna(pd.NA)     # True
pd.isna(np.nan)    # True  — pd.isna handles both uniformly
Pitfall Chained indexing: df[cond][col] = val silently fails

df[df["score"] > 0.5]["label"] = "high" triggers SettingWithCopyWarning and may not modify df at all — Pandas may create a temporary copy for the first index, and the assignment goes into the copy.

Fix Always use df.loc[condition, column] = value for conditional assignment: df.loc[df["score"] > 0.5, "label"] = "high". This is unambiguous and always modifies df in place.
Pitfall Using loc with integer index when the index is not a RangeIndex

df.loc[0] on a DataFrame whose index is ["a","b","c"] raises KeyError — loc uses label lookup, not position. After a reset_index() the integer index works, but this surprises users.

Fix Use iloc[0] for position-based access regardless of index type. Use loc[label] only when you know the index contains that label. After operations that reset the index, always check df.index before using loc with integers.
Pitfall Modifying a column with at[] in a loop is slow

for i in range(len(df)): df.at[i, "col"] = f(i) — even though at[] is O(1) per call, calling it n times from Python is n Python→C transitions and is much slower than a vectorised operation.

Fix Compute the entire column at once: df["col"] = df["existing"].apply(f) or, better, a vectorised expression like df["col"] = np.log1p(df["income"]). Use at/iat only for truly isolated single-cell updates.

Pandas internally decides whether df[condition] returns a view or a copy depending on the data layout — the decision is not always predictable. When you chain an assignment on this ambiguous result (df[cond][col] = val), Pandas warns that you may be writing to a temporary copy. Fix: always use df.loc[cond, col] = val for conditional assignment. Use .copy() when you intentionally want a separate modifiable subset. Silencing the warning with pd.options.mode.chained_assignment = None hides a real problem.

at[] is O(1) for a single scalar — it skips the full index-alignment logic of loc[]. Use at[] when you need to read or write a single cell many times (e.g., accumulating results in a result DataFrame inside a loop). In practice, most loops should be replaced with vectorised operations — but for genuinely unavoidable single-cell updates, at[] is the right tool.

df.loc[0] performs label lookup — if "0" or 0 is not in the index, it raises KeyError. df.iloc[0] always returns the first row by position, regardless of index labels. If df.index = ["a","b","c"], iloc[0] returns row "a"; loc[0] raises KeyError. After a reset_index(), integer labels exist and loc[0] works — but this is fragile. Always prefer iloc for positional access to avoid confusion between position and label.

GroupBy & Aggregation
groupby, agg with named columns, transform for group features, filter, apply pitfalls

groupby splits a DataFrame by key columns, applies a function to each group, and combines results. agg with keyword arguments produces named columns cleanly. transform returns a same-shape result — use it to compute per-group features that broadcast back to each original row (e.g., "score relative to group mean"). filter removes entire groups by predicate. apply is the escape hatch for complex multi-column logic but falls back to a slow Python loop — always prefer agg/transform for standard aggregations.

Pandas — groupby agg, transform, filter, named aggregation, apply
import pandas as pd
import numpy as np

df = pd.DataFrame({
    'model':   ['rf','rf','xgb','xgb','lgbm','lgbm'],
    'split':   ['train','val','train','val','train','val'],
    'auc':     [0.92, 0.88, 0.95, 0.91, 0.93, 0.90],
    'f1':      [0.87, 0.84, 0.91, 0.87, 0.89, 0.86],
    'fit_sec': [10,   0,    45,   0,    12,   0],
})

# Named aggregation — explicit, no column renaming step
result = df.groupby('model').agg(
    mean_auc  = ('auc', 'mean'),
    best_auc  = ('auc', 'max'),
    total_fit = ('fit_sec', 'sum'),
).reset_index()

# Multiple functions on one column
stats = df.groupby('model')['auc'].agg(['mean', 'std', 'min', 'max'])

# transform: per-group stats broadcast back to original rows
df['auc_dev']     = df.groupby('model')['auc'].transform(lambda x: x - x.mean())
df['auc_rank']    = df.groupby('model')['auc'].transform('rank', ascending=False)
df['group_count'] = df.groupby('model')['auc'].transform('count')

# filter: keep only groups whose mean AUC > threshold
good = df.groupby('model').filter(lambda g: g['auc'].mean() > 0.90)

# Multi-key groupby with unstack → wide format
pivot = df.groupby(['model', 'split'])['auc'].mean().unstack()
# columns are train/val values — each model is a row

# apply: flexible but slow — only for complex multi-column logic
def eval_summary(group: pd.DataFrame) -> pd.Series:
    return pd.Series({
        'n':        len(group),
        'mean_auc': group['auc'].mean(),
        'auc_f1_ratio': group['auc'].mean() / group['f1'].mean(),
    })

summary = df.groupby('model').apply(eval_summary).reset_index()

# value_counts for quick label distribution
df['model'].value_counts(normalize=True, dropna=False)
Pitfall Using apply for simple aggregations instead of agg

df.groupby("model").apply(lambda g: g["auc"].mean()) is correct but invokes the function as a Python callback for every group — much slower than df.groupby("model")["auc"].agg("mean") which delegates to a C-level Cython path.

Fix Use agg("mean"), agg("sum"), agg(["min","max"]) for standard aggregations. Use transform for broadcast-back group statistics. Reserve apply for multi-column operations with no agg equivalent.
Pitfall Forgetting reset_index() after groupby leaves a MultiIndex

result = df.groupby("model")["auc"].mean() — result.index is a categorical index of model names, not a RangeIndex. Downstream merges or column assignments fail with "key not found" errors.

Fix Chain .reset_index() to make the groupby keys into regular columns: df.groupby("model")["auc"].mean().reset_index(). Name the aggregated column simultaneously: .agg(mean_auc=("auc","mean")).reset_index().
Pitfall transform returning variable-length result raises ValueError

df.groupby("model")["auc"].transform(lambda x: x.nlargest(2)) — nlargest returns fewer elements than the group size, so transform cannot broadcast it back to the original shape.

Fix transform functions must return the same number of elements as the group (for Series transform) or the same shape (for DataFrame transform). For variable-length results, use apply instead.

agg reduces each group to a scalar (or small summary) — the output has fewer rows than the input. transform applies a function and returns the same shape as the input — each row gets its group statistic. apply is the general case — it can return any shape, including different-length results per group (use with care). Rule: if you need a summary table, use agg. If you need to add a per-group feature back to the original DataFrame (e.g., "revenue / group total"), use transform. If neither fits, use apply.

Use transform: df["auc_frac"] = df["auc"] / df.groupby("model")["auc"].transform("max"). transform("max") broadcasts the group maximum back to each row. The division is then element-wise. This pattern generalises to any per-group normalisation: (x - group_min) / (group_max - group_min) for min-max scaling within groups.

groupby.apply is slow whenever the function is a Python lambda or a function that returns different-length results. Pandas must call it once per group as a Python callback, which adds Python-level overhead proportional to the number of groups. With include_groups=False (Pandas 2.2+), the group key columns are excluded from the DataFrame passed to the function. For large DataFrames with many groups, apply can be 100× slower than equivalent agg/transform. Profile with %timeit to confirm before committing to apply.

Merge, Join & Concat
merge how types, validate, indicator, merge_asof, concat axis, key deduplication

merge joins by column values (SQL-style JOIN); join aligns by index; concat stacks along an axis. The how parameter controls which rows survive: inner keeps only matched rows, left keeps all left rows, outer keeps all rows from both with NaN for mismatches. validate catches cardinality bugs before they silently produce wrong row counts. indicator=True adds a _merge column for data quality audits. merge_asof handles nearest-key joins on time-series data.

Pandas — merge with validate/indicator, concat, merge_asof
import pandas as pd
import numpy as np

users  = pd.DataFrame({'user_id': [1, 2, 3], 'name': ['Alice','Bob','Carol']})
orders = pd.DataFrame({'user_id': [1, 1, 2, 4], 'amount': [100, 200, 150, 300]})

# Inner join — only matched rows
inner = users.merge(orders, on='user_id', how='inner')
# user_id 3 (Carol) and 4 (unknown order) are dropped

# Left join — all users, NaN for users without orders
left = users.merge(orders, on='user_id', how='left')

# validate: fail early if cardinality assumption is wrong
users.merge(orders, on='user_id', how='left', validate='one_to_many')
# Raises MergeError if user_id is duplicated in users

# indicator: audit which rows matched
merged = users.merge(orders, on='user_id', how='outer', indicator=True)
left_only  = merged[merged['_merge'] == 'left_only']    # users without orders
right_only = merged[merged['_merge'] == 'right_only']   # orders without users

# merge on multiple keys
df1 = pd.DataFrame({'date': ['2024-01','2024-02'], 'store': ['A','A'], 'sales': [100, 120]})
df2 = pd.DataFrame({'date': ['2024-01','2024-02'], 'store': ['A','A'], 'cost': [40, 50]})
joined = df1.merge(df2, on=['date','store'], how='inner')

# concat: stack DataFrames vertically
parts   = [pd.read_parquet(f'part_{i}.parquet') for i in range(3)]
full_df = pd.concat(parts, ignore_index=True)   # reset to RangeIndex

# concat horizontal: add columns side-by-side
X = pd.concat([df_features, df_extra_features], axis=1)

# merge_asof: nearest-key join for time-series (must be sorted)
events = pd.DataFrame({'time': pd.to_datetime(['09:00','09:05','09:10']),
                        'price': [100, 102, 101]})
trades = pd.DataFrame({'time': pd.to_datetime(['09:02','09:07']), 'qty': [10, 20]})

matched = pd.merge_asof(
    trades.sort_values('time'),
    events.sort_values('time'),
    on='time',
    direction='backward'    # match nearest event at or before trade time
)
Pitfall Many-to-many merge silently multiplies rows

Merging two DataFrames where the key has duplicates in both produces a cartesian product for each key value. If "user_id" appears 3 times in df1 and 4 times in df2, you get 12 rows for that user_id — silently.

Fix Add validate="one_to_one" or "one_to_many" or "many_to_one" to catch this before it propagates. Use df.duplicated(subset=["key"]) to check for duplicate keys before merging.
Pitfall pd.concat with mismatched column names fills missing with NaN silently

Concatenating DataFrames from different data sources where column A is called "user_id" in one and "UserID" in another produces two separate columns, both mostly NaN.

Fix Standardise column names before concat: df.rename(columns=str.lower) or an explicit mapping. Use pd.concat(dfs, join="inner") to keep only columns present in all DataFrames.
Pitfall merge on string columns fails silently due to whitespace differences

"New York" vs "New York " (trailing space) — the merge produces NaN for the non-matching row instead of joining. No error is raised.

Fix Strip and normalise key columns before merging: df["city"] = df["city"].str.strip().str.lower(). Apply the same normalisation to both DataFrames. Inspect unmatched rows with indicator=True after the merge.

"one_to_many" asserts that the left key is unique and the right key may have duplicates — a typical foreign-key join (one user, many orders). If the left key has duplicates, pandas raises MergeError with a message showing the duplicate values. Use it whenever you expect the left DataFrame to be a "dimension" table (users, products) and the right to be a "fact" table (orders, events). It is cheap insurance against row-multiplication bugs that silently inflate training set size.

Add indicator=True to the merge — it appends a _merge column with values "left_only", "right_only", or "both". Filter merged[merged["_merge"] == "left_only"] to see rows from the left DataFrame with no match. This is the standard data quality audit for joins. For large DataFrames, check len(merged[merged["_merge"] != "both"]) against your expectations before proceeding with the pipeline.

merge joins on column values (by default). join aligns by index — df1.join(df2) merges on the index of both DataFrames. You can also join on a column by passing on= to join. merge is more explicit and flexible — it handles any column combination, any how type, and validate. join is a convenience wrapper for index-based alignment. In practice, merge is preferred for clarity, especially in complex multi-key joins. join is useful when DataFrames are naturally aligned by index (e.g., after a groupby + set_index).

Performance & Memory
category dtype, chunked reading, query/eval, pipe, vectorize vs apply, Polars/DuckDB

Three practical Pandas performance levers: (1) category dtype for low-cardinality string columns — reduces memory 10-100× and speeds up groupby; (2) chunked reading for files larger than RAM — pd.read_csv(chunksize=N) yields DataFrames; (3) query() and eval() for large-DataFrame filter/compute using numexpr, avoiding Python-level intermediate arrays. For truly large data, Polars (Rust, lazy, multi-threaded) and DuckDB (SQL over Parquet) are the modern alternatives.

Pandas — category dtype, chunksize, query, eval, pipe, memory audit
import pandas as pd
import numpy as np

# ── category dtype: 10-100× memory reduction ─────────────────
N = 400_000
df = pd.DataFrame({'city': np.random.choice(['London','New York','Paris'], size=N)})
print(df.memory_usage(deep=True)['city'])   # ~26 MB as object

df['city'] = df['city'].astype('category')
print(df.memory_usage(deep=True)['city'])   # ~0.4 MB as category
# groupby on category is faster too
df.groupby('city').size()

# Safe category conversion that handles unseen values at inference
train_cats = pd.Categorical(df['city'])     # fit on training data
# At inference — unknown city becomes NaN, not an error
df_test = pd.DataFrame({'city': ['Tokyo']})
df_test['city'] = pd.Categorical(df_test['city'], categories=train_cats.categories)

# ── chunked reading for files larger than RAM ─────────────────
chunks = []
for chunk in pd.read_csv('large.csv', chunksize=100_000):
    filtered = chunk[chunk['score'] > 0.5]
    chunks.append(filtered)
result = pd.concat(chunks, ignore_index=True)

# ── query() — compiled expression, uses numexpr if installed ──
df_big = pd.DataFrame({'age': np.random.randint(18, 80, 1_000_000),
                        'score': np.random.rand(1_000_000)})

# Slower: boolean indexing creates three temporary arrays
r = df_big[(df_big['age'] > 30) & (df_big['score'] > 0.7)]

# Faster: query parses and evaluates in one compiled pass
r = df_big.query('age > 30 and score > 0.7')

# eval(): compute column without intermediate array
df_big.eval('adjusted = score * (age / 50.0)', inplace=True)

# ── pipe() for readable method chains ─────────────────────────
def clip_outliers(df, col, q_lo=0.01, q_hi=0.99):
    lo, hi = df[col].quantile([q_lo, q_hi])
    return df.assign(**{col: df[col].clip(lo, hi)})

result = (
    df_big
    .query('age >= 18')
    .pipe(clip_outliers, 'score')
    .assign(log_score=lambda d: np.log1p(d['score']))
    .groupby('age')['log_score'].mean()
)

# ── Memory audit ──────────────────────────────────────────────
print(df_big.dtypes)
print(df_big.memory_usage(deep=True).sum() / 1e6, "MB")
df_big.info(memory_usage='deep')
Pitfall apply(lambda) for numeric column transforms — 100× slower than vectorised ops

df["col"].apply(lambda x: x * 2) invokes the lambda once per row in Python. For a 1M-row column, that is 1M Python function calls vs one C-level NumPy multiplication.

Fix Use vectorised ops: df["col"] * 2 or np.log1p(df["col"]). Reserve apply() for string processing or complex custom logic with no vectorised equivalent. Always ask: "does NumPy/Pandas have a built-in for this?" before writing apply.
Pitfall category dtype breaks on unseen values at inference

After training, the category column knows only training categories. At inference a new value (e.g., a new city) causes ValueError when you try to assign it to a categorical column.

Fix Create the categorical with a fixed set of categories: pd.Categorical(series, categories=KNOWN_CATS). Unseen values become NaN and can be handled with fillna() rather than crashing the pipeline.
Pitfall Loading a 10GB CSV with pd.read_csv() without chunking

read_csv on a large file loads all rows into a single DataFrame, requiring the full file in RAM. On a machine with 16GB RAM, a 10GB CSV uses all available memory and triggers swapping or OOM.

Fix Use pd.read_csv(path, chunksize=100_000) for an iterator of DataFrames. For analytical queries, DuckDB is often the best answer: duckdb.sql("SELECT * FROM 'file.csv' WHERE label=1") processes the file without loading it into memory.

Category dtype is most effective when the column has low cardinality — few unique values relative to total rows. A string column with 5 unique cities in 1M rows stores 1M pointers to 5 strings instead of 1M string objects — 99.9% savings. For a column where every row is unique (user IDs, UUIDs), category offers no benefit and adds overhead from the category mapping. Rule of thumb: if unique values / total rows < 0.05, category is almost always worth it.

Boolean indexing (df[mask1 & mask2]) creates three temporary arrays: the result of mask1, the result of mask2, and their boolean AND. query() with numexpr installed parses the expression and evaluates it in a single compiled C pass with no intermediate arrays — significantly less memory and fewer cache misses for large DataFrames. For small DataFrames (< 10,000 rows), the overhead of expression parsing makes query() slower. For large DataFrames (> 100,000 rows), query() is typically 2-5× faster.

Three main alternatives: (1) Polars — Rust-backed, lazy evaluation, multi-threaded by default, no GIL, API similar to Pandas but with a lazy frame (scan_csv → filter → collect). Often 5-50× faster on multi-core machines. (2) DuckDB — SQL engine that queries Parquet/CSV/Arrow directly with vectorised execution; integrates with Pandas via df = duckdb.sql("...").df(). (3) Dask — parallelises Pandas operations across a cluster; same API as Pandas. Use Polars for local large-file processing, DuckDB for analytical queries on files, Dask for distributed ETL on a cluster.

Most Pandas performance problems come from using .apply() when a vectorized operation exists, or from creating unnecessary copies.
Concurrency & Performance 11–12
11

Async Python & Concurrency

asyncio for I/O-bound work, multiprocessing for CPU-bound work, threading for legacy libraries. Knowing which to reach for is the interview question.

asyncio Fundamentals
coroutines, tasks, gather, TaskGroup, event loop, run_in_executor

asyncio implements cooperative multitasking in a single thread. A coroutine (async def) suspends at each await, allowing other coroutines to run. The event loop runs one coroutine at a time — there is no parallelism, only concurrency. asyncio.gather() runs multiple coroutines concurrently by scheduling all as Tasks. TaskGroup (Python 3.11+) cancels all sibling tasks if any fails — structured cancellation. asyncio.run() is the standard entry point for running async code from synchronous code.

asyncio — coroutines, gather, TaskGroup, Semaphore, run_in_executor
import asyncio
import httpx
import time

# Coroutine: defined with async def, runs when awaited
async def fetch_embedding(client: httpx.AsyncClient, text: str) -> list[float]:
    resp = await client.post(
        "https://api.openai.com/v1/embeddings",
        json={"input": text, "model": "text-embedding-3-small"},
        headers={"Authorization": f"Bearer {OPENAI_KEY}"},
    )
    return resp.json()["data"][0]["embedding"]

# Sequential: 10 calls × 500ms = 5 000ms
async def embed_sequential(texts: list[str]) -> list[list[float]]:
    async with httpx.AsyncClient() as client:
        return [await fetch_embedding(client, t) for t in texts]

# Concurrent: all calls fire simultaneously → ~500ms total
async def embed_concurrent(texts: list[str]) -> list[list[float]]:
    async with httpx.AsyncClient() as client:
        return await asyncio.gather(
            *[fetch_embedding(client, t) for t in texts]
        )

# TaskGroup (Python 3.11+): structured cancellation on first failure
async def embed_with_group(texts: list[str]) -> list[list[float]]:
    results: list[asyncio.Task] = []
    async with httpx.AsyncClient() as client:
        async with asyncio.TaskGroup() as tg:    # cancels all on any failure
            for text in texts:
                results.append(tg.create_task(fetch_embedding(client, text)))
    return [t.result() for t in results]

# Semaphore: cap concurrent API calls
async def embed_rate_limited(texts: list[str], max_concurrent: int = 5):
    sem = asyncio.Semaphore(max_concurrent)
    async def bounded(text: str):
        async with sem:                          # blocks if max_concurrent running
            async with httpx.AsyncClient() as c:
                return await fetch_embedding(c, text)
    return await asyncio.gather(*[bounded(t) for t in texts])

# run_in_executor: offload blocking calls to a thread pool
async def load_and_embed(path: str) -> list[float]:
    loop = asyncio.get_event_loop()
    df   = await loop.run_in_executor(None, pd.read_csv, path)  # blocking I/O → thread
    text = df["text"].iloc[0]
    return await fetch_embedding_async(text)

# Entry point
if __name__ == "__main__":
    texts = ["RAG retrieval", "LLM prompts", "embeddings"]
    embeddings = asyncio.run(embed_concurrent(texts))
Pitfall Calling a blocking function inside a coroutine freezes the event loop

await-ing is not enough — calling requests.get() or time.sleep() (not asyncio.sleep()) inside a coroutine blocks the entire event loop. All other coroutines are frozen until the call returns.

Fix Use async libraries (httpx, aiohttp, aiofiles) for I/O. For blocking code you cannot replace, use await loop.run_in_executor(None, blocking_fn, *args) to run it in a thread pool without blocking the loop.
Pitfall asyncio.gather() does not cancel remaining tasks on the first failure by default

If one task raises an exception, gather() with return_exceptions=False re-raises immediately but the other tasks continue running in the background — resource leak.

Fix Use asyncio.TaskGroup (Python 3.11+) which cancels all sibling tasks on any failure. For Python < 3.11, use asyncio.wait() with FIRST_EXCEPTION and then explicitly cancel remaining tasks.
Pitfall Creating coroutines without awaiting them — they never run

fetch_embedding(text) creates a coroutine object but does not schedule it. Without await or asyncio.create_task(), the coroutine is never executed. Python 3.12+ warns "coroutine was never awaited".

Fix Always await coroutines or wrap them in a Task: asyncio.create_task(fetch_embedding(text)). Use asyncio.gather() to schedule multiple coroutines concurrently.

A coroutine object (result of calling async def fn()) is not yet scheduled — it is just a paused function. A Task wraps a coroutine and schedules it on the event loop: asyncio.create_task(coro) returns a Task that starts running immediately (at the next await point). asyncio.gather() accepts both — it wraps plain coroutines in Tasks internally. Key: if you need to start multiple coroutines and wait for all, use gather. If you need to fire-and-forget (start without waiting), create a Task explicitly.

gather: available in all Python 3.7+ versions, returns results in input order, configurable error handling (return_exceptions=True to collect all results including errors). TaskGroup (Python 3.11+): structured concurrency — cancels all sibling tasks if any raises, prevents background task leaks, integrates with the nursery pattern from Trio. Rule: use TaskGroup for new Python 3.11+ code — it is safer because it prevents partially-completed state when one task fails. Use gather when you need Python 3.7+ compatibility or want return_exceptions=True behaviour.

asyncio uses non-blocking I/O via the OS event notification system (epoll on Linux, kqueue on macOS). When a coroutine awaits an HTTP response, the event loop registers the socket with the OS and suspends the coroutine — no thread blocks. When the OS signals that data is ready, the event loop resumes the coroutine. One thread manages all sockets via a single select/poll/epoll call that watches all pending sockets simultaneously. Memory per coroutine is a few KB (vs 1 MB per OS thread), enabling true thousands-of-connections concurrency.

asyncio Primitives
Queue, Semaphore, Lock, Event — producer-consumer, rate limiting, signalling

asyncio provides synchronisation primitives analogous to threading but cooperative — they yield to the event loop while waiting rather than blocking a thread. Queue enables bounded producer-consumer pipelines with backpressure. Semaphore limits concurrent API calls to avoid rate limits. Lock protects shared state (caches, counters). Event signals between coroutines (model loaded, batch ready). These primitives are the building blocks of production async ML data pipelines.

asyncio — Queue, Semaphore, Lock, Event for ML pipelines
import asyncio
from asyncio import Queue, Semaphore, Lock, Event
from typing import Any

# ── Queue: async producer-consumer with backpressure ─────────
async def producer(queue: Queue, texts: list[str]) -> None:
    for text in texts:
        await queue.put(text)        # blocks if queue full — backpressure
    await queue.put(None)            # sentinel: signal end of stream

async def consumer(queue: Queue, results: list) -> None:
    while True:
        item = await queue.get()     # suspends until item available
        if item is None:
            break
        result = await embed(item)   # async embedding call
        results.append(result)
        queue.task_done()            # signal item processed

async def pipeline(texts: list[str]) -> list:
    queue:   Queue = asyncio.Queue(maxsize=20)   # bounded — producer blocks when full
    results: list  = []
    await asyncio.gather(
        producer(queue, texts),
        consumer(queue, results),
        consumer(queue, results),   # two consumers for throughput
    )
    return results

# ── Semaphore: cap concurrent LLM API calls ───────────────────
RATE_LIMIT = Semaphore(5)   # at most 5 concurrent requests at a time

async def call_llm(prompt: str) -> str:
    async with RATE_LIMIT:          # blocks if 5 already running
        return await _make_api_call(prompt)

async def batch_inference(prompts: list[str]) -> list[str]:
    return await asyncio.gather(*[call_llm(p) for p in prompts])
    # asyncio schedules all but Semaphore ensures max 5 run concurrently

# ── Lock: protect a shared in-memory cache ───────────────────
_cache: dict[str, list[float]] = {}
_lock  = Lock()

async def get_or_embed(text: str) -> list[float]:
    async with _lock:
        if text in _cache:
            return _cache[text]
    embedding = await embed(text)      # compute outside lock — don't hold lock during I/O
    async with _lock:
        _cache[text] = embedding
    return embedding

# ── Event: one-time signal (model loaded, data ready) ─────────
model_ready = Event()

async def load_model() -> None:
    await asyncio.sleep(0)           # simulate async model load
    global model
    model = await async_load_weights("model.pt")
    model_ready.set()               # notify all waiters

async def serve(prompt: str) -> str:
    await model_ready.wait()         # blocks until model_ready.set() called
    return await model.predict(prompt)
Pitfall Holding a Lock while awaiting I/O — serialises all consumers

async with _lock: result = await embed(text) holds the lock for the entire embedding call. Every other coroutine that needs the lock blocks for the duration of the network round-trip, eliminating concurrency.

Fix Check the cache inside the lock, release the lock, do the I/O, then re-acquire the lock to write. Double-checked locking pattern: async with lock: check; compute outside; async with lock: write.
Pitfall Unbounded Queue allows unbounded memory growth

asyncio.Queue() with no maxsize — if the producer runs faster than consumers, the queue accumulates unboundedly. For a producer reading a 10M-row file, all rows end up in memory.

Fix Always set maxsize: asyncio.Queue(maxsize=N). With a bounded queue, await queue.put() suspends the producer when the queue is full, creating natural backpressure. Choose maxsize based on memory budget and batch size.
Pitfall Not calling queue.task_done() causes queue.join() to hang

If queue.task_done() is never called after processing, queue.join() (which waits for all items to be marked done) blocks forever.

Fix Call queue.task_done() exactly once for each item retrieved via queue.get(). Use try/finally to ensure it is called even if processing raises an exception.

Semaphore(N) maintains a counter initialised to N. async with sem: decrements the counter; if it reaches 0, the coroutine suspends. When any coroutine exits the async with block, the counter increments and one waiting coroutine is resumed. For 100 concurrent prompts with Semaphore(5), at most 5 requests are in-flight at any moment — spreading load over time. This prevents the API from seeing a burst of 100 simultaneous requests and returning 429 rate-limit errors.

A plain list has no await semantics — putting items in and taking them out requires polling (checking repeatedly in a loop), which either busy-waits (burning CPU) or sleeps (misses items). asyncio.Queue.put() suspends if the queue is full and resumes when space is available; get() suspends if empty and resumes when an item arrives — all without a polling loop. Queue also has maxsize for backpressure, task_done/join for completion tracking, and is safe to use across coroutines.

Event: one-time broadcast signal — a flag is set once and all waiters are woken simultaneously. Ideal for "model loaded", "data ready", "shutdown requested" — binary state. Condition: combines a Lock with a multi-condition wait/notify mechanism. Waiters can specify a condition predicate; notify() wakes one waiter, notify_all() wakes all. Use Condition when you need guarded waiting with a complex predicate (e.g., "wait until queue has at least N items"). For simple on/off signals, Event is always simpler.

threading
ThreadPoolExecutor, as_completed, Lock, run_in_executor, thread-local storage

Python threads share a process's memory but the GIL prevents true parallel CPU execution in CPython. Threading is appropriate for I/O-bound work with legacy sync libraries (requests, psycopg2), for running blocking code alongside asyncio via run_in_executor, and for any work where C extensions release the GIL. ThreadPoolExecutor from concurrent.futures provides a clean pool API with Future objects. Lock prevents race conditions on shared mutable state.

threading — ThreadPoolExecutor, as_completed, Lock, run_in_executor, thread-local
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

# ThreadPoolExecutor: clean pool API
def call_api_sync(url: str) -> dict:
    import requests
    return requests.get(url, timeout=10).json()

urls = [f"https://api.example.com/record/{i}" for i in range(50)]

# Submit all tasks; collect as they complete (not in submission order)
with ThreadPoolExecutor(max_workers=10) as pool:
    futures = {pool.submit(call_api_sync, url): url for url in urls}
    for future in as_completed(futures):
        url = futures[future]
        try:
            data = future.result()
            process(data)
        except Exception as e:
            print(f"Failed: {url} — {e}")

# map: simpler when order matters and all must succeed
with ThreadPoolExecutor(max_workers=10) as pool:
    results = list(pool.map(call_api_sync, urls, timeout=30))  # preserves order

# Lock: protect shared mutable state
counter = 0
lock    = threading.Lock()

def increment(n: int) -> None:
    global counter
    for _ in range(n):
        with lock:          # only one thread modifies at a time
            counter += 1

threads = [threading.Thread(target=increment, args=(10_000,)) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(counter)   # 100_000 — correct; without lock: unpredictable

# run_in_executor: offload blocking work inside asyncio
import asyncio

async def async_pipeline(paths: list[str]) -> list:
    loop = asyncio.get_event_loop()
    results = []
    for path in paths:
        # pd.read_csv is blocking — offload to thread pool, don't block event loop
        df = await loop.run_in_executor(None, pd.read_csv, path)
        results.append(df)
    return results

# Thread-local storage: per-thread DB connections
_local = threading.local()

def get_connection():
    """Each thread gets its own DB connection — no sharing needed."""
    if not hasattr(_local, 'conn'):
        _local.conn = create_db_connection()
    return _local.conn
Pitfall Using ThreadPoolExecutor for CPU-bound Python code expecting speedup

sum([x**2 for x in range(10_000_000)]) with 4 threads runs no faster than 1 thread — the GIL ensures only one thread executes Python bytecode at a time. The overhead of thread management may even make it slower.

Fix Use ProcessPoolExecutor for CPU-bound pure Python code. Use ThreadPoolExecutor only for I/O-bound work or C-extension code that releases the GIL (NumPy linalg, Pandas IO, database drivers).
Pitfall Daemon threads are killed silently when the main thread exits

threading.Thread(target=fn, daemon=True) — if the main script finishes, daemon threads are terminated immediately without cleanup. Work in progress is lost silently.

Fix Use daemon=True only for background monitoring threads where abrupt termination is acceptable. For work that must complete, use non-daemon threads (default) or a ThreadPoolExecutor which waits for all futures on __exit__.
Pitfall Shared mutable state without a Lock causes race conditions

counter += 1 is not atomic — it is three bytecode operations (LOAD, ADD, STORE). Two threads can read the same value, both increment, and write back the same result — a lost update.

Fix Protect all shared mutable state with threading.Lock() or use thread-safe data structures (queue.Queue). For simple counters, threading.local() gives each thread its own counter — combine at the end.

CPython's GIL (Global Interpreter Lock) ensures only one thread executes Python bytecode at a time. Even with 8 threads on 8 cores, each thread must wait for the GIL before executing any Python instruction. The GIL is released between bytecodes and during I/O and C extension calls. For pure Python loops, threading adds synchronisation overhead without enabling parallelism — you can even see slowdowns vs a single thread. Solution: multiprocessing for CPU-bound Python, or NumPy/Cython for computations that release the GIL.

Use asyncio when: the library has an async client (httpx, aiohttp, openai async), you need maximum concurrency (thousands of connections), or you are already in an async codebase. Use ThreadPoolExecutor when: you must use a sync library (requests, legacy SOAP clients), the library is not thread-safe but you can manage with connection-per-thread (thread-local), or you are integrating with sync code and cannot add async. In a FastAPI or asyncio application, use run_in_executor to offload sync blocking calls to a thread pool without blocking the event loop.

A race condition occurs when the result of a computation depends on the interleaving of operations from multiple threads. Example: two threads both read counter=5, both compute 5+1=6, both write 6 — net result is 6 not 7. Prevention: (1) threading.Lock — wraps the read-modify-write in a critical section; (2) thread-local storage — each thread has its own copy, combined at the end; (3) immutable shared data — pure functions operating on values passed as arguments, no shared mutable state; (4) queue.Queue — thread-safe producer-consumer without explicit locking.

multiprocessing
ProcessPoolExecutor, Pool.starmap, shared memory for NumPy, spawn vs fork

multiprocessing spawns separate Python interpreter processes — each with its own GIL, heap, and memory space. This enables true CPU parallelism for compute-heavy tasks: feature engineering on large DataFrames, hyperparameter search, batch inference. ProcessPoolExecutor from concurrent.futures is the modern high-level API. For large NumPy arrays, shared_memory (Python 3.8+) enables zero-copy data access across processes without pickling.

multiprocessing — ProcessPoolExecutor, starmap, SharedMemory, spawn guard
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import numpy as np

# Must be inside if __name__ == "__main__" on Windows (spawn start method)
if __name__ == "__main__":

    # ProcessPoolExecutor: clean API, prefers over mp.Pool for new code
    def softmax_chunk(chunk: np.ndarray) -> np.ndarray:
        exp = np.exp(chunk - chunk.max(axis=1, keepdims=True))
        return exp / exp.sum(axis=1, keepdims=True)

    X      = np.random.randn(10_000, 512)
    chunks = np.array_split(X, 8)              # split for 8 workers

    with ProcessPoolExecutor(max_workers=8) as pool:
        results = list(pool.map(softmax_chunk, chunks))
    output = np.vstack(results)                # (10_000, 512)

    # starmap: multiple arguments per call
    def cross_val(n_est: int, depth: int, X, y) -> float:
        from sklearn.ensemble import RandomForestClassifier
        from sklearn.model_selection import cross_val_score
        clf = RandomForestClassifier(n_estimators=n_est, max_depth=depth, n_jobs=1)
        return cross_val_score(clf, X, y, cv=3).mean()

    configs = [(100, 5), (200, 10), (500, 20)]
    with mp.Pool(processes=3) as pool:
        scores = pool.starmap(cross_val, [(n, d, X_train, y_train) for n, d in configs])

    # SharedMemory: zero-copy large array sharing (Python 3.8+)
    from multiprocessing import shared_memory

    arr = np.random.randn(100_000, 512).astype(np.float32)
    shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
    shared = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
    np.copyto(shared, arr)             # copy once into shared memory

    def worker(shm_name: str, shape, dtype):
        existing = shared_memory.SharedMemory(name=shm_name)
        view = np.ndarray(shape, dtype=dtype, buffer=existing.buf)
        result = view.mean(axis=1)    # read without copying — zero-copy
        existing.close()
        return result

    with ProcessPoolExecutor(max_workers=4) as pool:
        futs = [pool.submit(worker, shm.name, arr.shape, arr.dtype) for _ in range(4)]
        results = [f.result() for f in futs]

    shm.close()
    shm.unlink()   # always clean up — leaked shared memory persists across restarts
Pitfall Passing large DataFrames or arrays as arguments — pickle overhead

pool.map(fn, [big_df] * 8) pickles big_df eight times and sends it through a pipe to each worker process. For a 1GB DataFrame, that is 8GB of serialisation and IPC — slower than sequential.

Fix Use shared_memory for NumPy arrays (zero-copy). For DataFrames, save to Parquet first and pass the file path to workers. Or use initializer= to set up data in each worker at pool creation time.
Pitfall Missing if __name__ == "__main__" guard on Windows/macOS (spawn)

On Windows and macOS, multiprocessing uses the spawn start method — each worker imports the module from scratch. Without the guard, spawning workers re-executes the top-level script code, creating new workers recursively until the OS runs out of resources.

Fix Always wrap pool creation and usage in if __name__ == "__main__":. On Linux, fork is the default start method (workers inherit the parent process state), but spawn is safer — use it explicitly: mp.set_start_method("spawn").
Pitfall Not calling shm.unlink() leaks shared memory across process restarts

SharedMemory is a POSIX IPC resource. If the process crashes without calling shm.unlink(), the shared memory segment persists in /dev/shm and is not released until the OS reboots or it is explicitly unlinked.

Fix Always call shm.close() (in the owning process) and shm.unlink() (only in the creating process) in a finally block. Use try/finally or a context manager wrapper to guarantee cleanup even on exceptions.

Pool is the older API from the multiprocessing module — provides map, starmap, apply_async, imap, imap_unordered. ProcessPoolExecutor from concurrent.futures is the modern API — provides submit() returning Future objects, map() with an iterator interface, and integrates with asyncio via loop.run_in_executor(). ProcessPoolExecutor is preferred for new code: it has cleaner error propagation (exceptions are re-raised on future.result()), better integration with the concurrent.futures ecosystem, and simpler context manager semantics.

Use multiprocessing.shared_memory (Python 3.8+): create a SharedMemory block of size arr.nbytes, copy the array into it once, and pass the shm.name string (not the array) to worker processes. Each worker opens the named shared memory block and wraps it in a NumPy ndarray using the buffer protocol — zero-copy access to the same physical memory. Always close the handle in each process and unlink in the owning process when done. For read-only data, this pattern is ideal. For writable data, add synchronisation with a multiprocessing.Lock.

asyncio when: the bottleneck is I/O — waiting for API responses, database queries, file reads; one thread is sufficient; you need thousands of concurrent operations with low memory overhead. multiprocessing when: the bottleneck is CPU — NumPy operations, model inference, feature engineering; you need to bypass the GIL for Python-level computation; each task is independent and can be chunked. In practice: use asyncio for the LLM call layer (concurrent API requests), multiprocessing for the feature engineering layer (CPU-bound transforms), and threading for legacy sync I/O libraries.

asyncio is single-threaded cooperative multitasking — one task runs at a time, voluntarily yielding at each await. It scales to thousands of concurrent I/O operations without threads.
12

Performance, Memory & Profiling

Measure first, optimise second. Python has many levers — the GIL, __slots__, generators, Cython, and compiled extensions — but only profiling tells you which one matters.

Profiling Tools
cProfile workflow, timeit for micro-benchmarks, line_profiler, memory_profiler, py-spy

Profile before optimising — intuition about Python bottlenecks is usually wrong. The workflow: cProfile identifies which functions consume the most cumulative time (O(n) overhead per call). line_profiler (@profile decorator) shows time per line within a known bottleneck. memory_profiler tracks memory growth line-by-line. py-spy is a sampling profiler that attaches to a running process without code changes — essential for production profiling. timeit measures small snippets precisely for benchmarking optimisations.

cProfile, timeit, line_profiler, memory_profiler, py-spy
import cProfile, pstats, io
import timeit
import numpy as np

# ── cProfile: find the hot function ──────────────────────────
def profile(fn, *args, n_shown=20, **kwargs):
    pr = cProfile.Profile()
    pr.enable()
    result = fn(*args, **kwargs)
    pr.disable()
    s  = io.StringIO()
    ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
    ps.print_stats(n_shown)
    print(s.getvalue())
    return result

profile(train_model, X_train, y_train)

# CLI: python -m cProfile -s cumtime train.py | head -30

# ── timeit: precise micro-benchmarks ─────────────────────────
data = np.random.randn(10_000)

loop_ms = timeit.timeit(
    '[x**2 for x in data]',
    globals={'data': data}, number=500
) * 2   # ms per call

numpy_ms = timeit.timeit(
    'data**2',
    globals={'data': data, 'np': np}, number=500
) * 2

print(f"List comp: {loop_ms:.2f}ms  NumPy: {numpy_ms:.2f}ms")

# ── line_profiler: line-level timing ─────────────────────────
# pip install line_profiler
# Decorate the function you already know is slow (from cProfile)
# from line_profiler import profile  (or use @profile with kernprof)
#
# @profile
def feature_extraction(df):
    df = df.copy()
    df['log_income'] = np.log1p(df['income'])   # line-level time shown
    df['age_sq']     = df['age'] ** 2
    df['interact']   = df['age'] * df['income']
    return df
# Run: kernprof -l -v script.py

# ── memory_profiler: memory growth per line ───────────────────
# pip install memory_profiler
# from memory_profiler import profile
#
# @profile
def load_and_process(path: str) -> np.ndarray:
    df      = pd.read_csv(path)         # peak shown per line
    df      = df.dropna()
    features= df.select_dtypes('number').values
    return features.astype(np.float32)  # peak after cast
# Run: python -m memory_profiler script.py

# ── py-spy: attach to a running process without code changes ──
# sudo py-spy top --pid <PID>            # live top-like view
# sudo py-spy record -o prof.svg --pid <PID> --duration 30   # flamegraph

# ── tracemalloc: Python-level allocation tracking ─────────────
import tracemalloc

tracemalloc.start()
result = run_feature_pipeline(df)
current, peak = tracemalloc.get_traced_memory()
print(f"Peak: {peak / 1e6:.1f} MB")
tracemalloc.stop()
Pitfall Optimising before profiling — fixing the wrong bottleneck

A developer rewrites a loop in Cython, reducing its time from 0.1s to 0.01s. The overall pipeline still takes 45s because the bottleneck was a database query that now runs 44.9s.

Fix Always run cProfile first. Sort by "cumulative" time to see the full call tree. Fix only the top-1 bottleneck, measure again, repeat. The fix must be verified to actually change the overall pipeline time.
Pitfall timeit disables garbage collection by default — skews memory-heavy benchmarks

timeit.timeit() calls gc.disable() before the benchmark to prevent GC pauses from affecting timing. If the code being benchmarked allocates many objects, GC work is deferred — the benchmark looks faster than it really is in production.

Fix Pass setup="import gc; gc.enable()" to timeit if your benchmark exercises allocation-heavy code: timeit.timeit(..., setup="import gc; gc.enable()").
Pitfall cProfile overhead distorts hot-path timing

cProfile adds a Python function call hook — every function call incurs overhead. For functions called millions of times (e.g., a per-token operation), cProfile may report 10× the true time.

Fix Use cProfile to identify the bottleneck function (order is correct even if absolute times are inflated). Use timeit for accurate before/after comparison of a candidate fix. Use py-spy (statistical sampling) for production profiling with zero code-level overhead.

Run cProfile: python -m cProfile -s cumulative train.py | head -30. Sort by cumulative time to see which functions account for the most wall-clock time including their callees. Alternatively, wrap with profile() programmatically. The top entry after the script itself is the first bottleneck to investigate. Once you know the function, use line_profiler (kernprof -l -v) to see which lines inside it are responsible.

cProfile is a deterministic profiler — it hooks every function call and precisely records call counts and time. Overhead: adds 10-100% to execution time. Cannot profile running production processes. py-spy is a statistical (sampling) profiler — it periodically samples the call stack of a running process (typically 100 samples/second) without modifying it. Near-zero overhead (~1%). Can attach to a live production process by PID. Trade-off: cProfile gives exact counts and call graphs; py-spy gives approximate hot paths safe enough for production.

Three options: (1) memory_profiler @profile decorator — shows RSS growth per line; run with python -m memory_profiler script.py. (2) tracemalloc — tracks Python-level allocations, not RSS: start() before, get_traced_memory() after. (3) For quick top-level measurement: import resource; print(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) before and after. For NumPy/Pandas, tracemalloc is more accurate than RSS because it tracks Python object allocations specifically.

GIL & Bypassing It
What the GIL is, I/O vs CPU bound, NumPy releases the GIL, multiprocessing, Cython

The Global Interpreter Lock (GIL) is a mutex in CPython that ensures only one thread executes Python bytecode at a time. It prevents data corruption from concurrent object modification but also prevents true CPU parallelism within threads. C extensions (NumPy, Pandas, sklearn) can and do release the GIL during computation — that's why NumPy matrix ops genuinely run in parallel across threads. To bypass the GIL for Python code: use multiprocessing, write C extensions (Cython, ctypes), or use Python 3.13's experimental free-threaded build.

GIL — threading vs multiprocessing, NumPy GIL release, Cython
import threading, time
import numpy as np
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

N = 10_000_000

# ── Pure Python CPU work: threading gives NO speedup (GIL) ───
def count_up(n):
    total = 0
    for i in range(n): total += i
    return total

t0 = time.perf_counter()
count_up(N); count_up(N)
seq_time = time.perf_counter() - t0

t0 = time.perf_counter()
with ThreadPoolExecutor(max_workers=2) as pool:
    list(pool.map(count_up, [N, N]))
thread_time = time.perf_counter() - t0

print(f"Sequential: {seq_time:.2f}s  Threaded: {thread_time:.2f}s")
# Both ~same — GIL prevents parallel execution

# ── Multiprocessing: true parallelism (separate GIL per process)
t0 = time.perf_counter()
with ProcessPoolExecutor(max_workers=2) as pool:
    list(pool.map(count_up, [N, N]))
mp_time = time.perf_counter() - t0
print(f"Multiprocessing: {mp_time:.2f}s")  # ~2× faster

# ── NumPy releases the GIL: threading DOES help ──────────────
def numpy_svd(matrix):
    return np.linalg.svd(matrix)    # LAPACK call releases GIL

matrices = [np.random.randn(500, 500) for _ in range(4)]

t0 = time.perf_counter()
[numpy_svd(m) for m in matrices]
numpy_seq = time.perf_counter() - t0

t0 = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as pool:
    list(pool.map(numpy_svd, matrices))
numpy_threaded = time.perf_counter() - t0

print(f"NumPy sequential: {numpy_seq:.2f}s  Threaded: {numpy_threaded:.2f}s")
# Threaded is genuinely faster — LAPACK runs in parallel!

# ── Cython: type declarations allow GIL-free C code ──────────
# Save as fast_sum.pyx, compile with: cythonize -i fast_sum.pyx
#
# cdef double fast_sum(double[:] arr) nogil:
#     cdef double total = 0.0
#     cdef int i
#     for i in range(arr.shape[0]):
#         total += arr[i]
#     return total
#
# Python 3.13 experimental free-threaded build:
# Build CPython with --disable-gil for truly parallel Python threads
Pitfall Expecting ThreadPoolExecutor to speed up a NumPy apply_along_axis loop

apply_along_axis calls a Python function per row — Python-level execution. Threading adds overhead without CPU parallelism because each Python callback requires the GIL.

Fix Replace apply_along_axis with broadcast operations (which run in C, releasing the GIL automatically). If the operation truly cannot be vectorised, use ProcessPoolExecutor or rewrite the critical function in Cython/Numba.
Pitfall Assuming the GIL protects all data structures from concurrent access

The GIL prevents concurrent bytecode execution but not data races on C-level operations. Two threads can interleave during a C-level list resize or dict rehash that spans multiple internal steps — causing corruption in rare cases.

Fix Use threading.Lock for any shared mutable state, even simple lists and dicts, when correctness depends on the order of operations. The GIL is not a substitute for explicit synchronisation.
Pitfall Free-threaded Python 3.13 breaks thread-safety assumptions

Code that relied on the GIL for implicit thread-safety (e.g., appending to a list from multiple threads without a Lock) may have data races in free-threaded Python where the GIL is disabled.

Fix Add explicit Lock protection around all shared mutable state now — even if "it works" under the current GIL. Code that is explicitly thread-safe will work correctly under both GIL and no-GIL builds.

Each iteration of a Python for loop acquires the GIL, executes one or a few bytecode instructions, and may release the GIL periodically (every 100 bytecodes by default in CPython). With two threads, one runs while the other waits for the GIL — they alternate rather than run simultaneously. The GIL acquisition/release overhead adds cost on top of no parallelism. Fix: use NumPy vectorised operations (C code, GIL released for the duration), or use multiprocessing (separate processes with separate GILs).

I/O-bound: the thread spends most of its time waiting for external events (disk, network, database). During I/O waits, the GIL is released — other threads run freely. Threading works well here. CPU-bound: the thread is actively computing Python bytecodes (for loops, arithmetic). The GIL is rarely released — threads cannot execute in parallel. Use multiprocessing or C extensions. Quick test: if replacing time.sleep() with the actual work speeds things up when threaded, it is I/O-bound. If not, it is CPU-bound.

NumPy operations like np.linalg.svd, np.dot, and np.fft delegate to BLAS/LAPACK C libraries. These libraries explicitly release the GIL at the start of the computation and reacquire it at the end. During the C-level computation, other Python threads (or other NumPy threads) can run freely. OpenBLAS and MKL also use internal multi-threading for large matrix operations — a single np.dot call on a large matrix uses all available CPU cores. This is why threading works for NumPy-heavy workloads even though it does not help for pure Python loops.

__slots__ & Memory
__dict__ overhead, slots savings at scale, @dataclass(slots=True), tracemalloc

By default, Python objects store instance attributes in a __dict__ dictionary — adding 200-400 bytes overhead per object. __slots__ declares a fixed set of attributes, replacing __dict__ with a compact C-level array. This saves 50-80% memory for objects with a fixed attribute set. It matters when creating millions of lightweight objects: prediction records, feature rows, tree nodes. Python 3.10+ supports @dataclass(slots=True) for the slots benefit with dataclass ergonomics.

__slots__ memory savings, dataclass slots, tracemalloc, gc control
import sys, tracemalloc
from dataclasses import dataclass

# Without __slots__: per-instance __dict__ overhead
class Point:
    def __init__(self, x: float, y: float):
        self.x = x
        self.y = y

# With __slots__: compact attribute storage, no __dict__
class PointSlots:
    __slots__ = ('x', 'y')
    def __init__(self, x: float, y: float):
        self.x = x
        self.y = y

p1 = Point(1.0, 2.0)
p2 = PointSlots(1.0, 2.0)

print(sys.getsizeof(p1))           # ~48 bytes + __dict__ (~200 bytes)
print(sys.getsizeof(p2))           # ~56 bytes — no __dict__
print(hasattr(p1, '__dict__'))     # True
print(hasattr(p2, '__dict__'))     # False

# Measure savings at scale with tracemalloc
N = 1_000_000

tracemalloc.start()
pts_dict  = [Point(float(i), float(i)) for i in range(N)]
_, peak_d = tracemalloc.get_traced_memory()
tracemalloc.clear_traces()

pts_slots = [PointSlots(float(i), float(i)) for i in range(N)]
_, peak_s = tracemalloc.get_traced_memory()
tracemalloc.stop()

print(f"dict:  {peak_d / 1e6:.0f} MB")
print(f"slots: {peak_s / 1e6:.0f} MB")  # typically 50-70% less

# @dataclass with slots=True (Python 3.10+)
@dataclass(slots=True)
class Prediction:
    label:      str
    confidence: float
    latency_ms: float

# __slots__ with inheritance — must redeclare at every level
class Base:
    __slots__ = ('x',)

class Child(Base):
    __slots__ = ('y',)    # omitting this adds __dict__ back to Child

# Weak references: add __weakref__ explicitly if needed
class Cacheable:
    __slots__ = ('value', '__weakref__')   # required for weakref.ref(obj)

# gc: disable for acyclic object graphs (rare, but powerful)
import gc
gc.disable()              # only safe if no reference cycles in your objects
build_many_records()
gc.enable()
gc.collect()
Pitfall Subclass without __slots__ re-introduces __dict__

class Child(BaseWithSlots): pass — Child has no __slots__ declaration, so Python gives it __dict__ anyway. The memory savings from the base class are lost for instances of Child.

Fix Declare __slots__ = () (empty tuple if no new attributes) in every subclass that extends a slotted base. This preserves the no-__dict__ property throughout the hierarchy.
Pitfall @dataclass without slots=True creates __dict__ even with __slots__ in the body

Mixing @dataclass with a manually written __slots__ = (...) in the class body does not work correctly in Python < 3.10 — the generated __init__ assigns attributes that conflict with slots.

Fix Use @dataclass(slots=True) (Python 3.10+) — it generates a new class with proper __slots__. For Python 3.9 and earlier, inherit from a manually written slotted base and do not use @dataclass directly.
Pitfall Slotted objects cannot have dynamic attributes

obj.new_attr = "value" on a slotted object raises AttributeError — only declared slots are allowed. Code that dynamically attaches attributes (common in testing and monkey-patching) breaks.

Fix If you need occasional dynamic attributes, add "__dict__" to __slots__ explicitly — this preserves the dict but gives you the other slot benefits for the fixed attributes. Or add a data: dict slot to hold overflow attributes.

__slots__ is worth considering when: (1) you create millions of instances of the same class — prediction records, feature row objects, graph nodes; (2) memory is a constraint — embedding stores, in-memory caches; (3) attribute access speed matters — __slots__ attribute lookup skips the __dict__ hash lookup. Rule of thumb: if a class has fewer than 10 fixed attributes and you create more than 100,000 instances, __slots__ is worth the added verbosity. For normal business-logic classes with a handful of instances, __slots__ is premature optimisation.

__slots__ limitations: (1) no dynamic attributes — only declared slots are allowed; (2) no __dict__ by default — tools that inspect __dict__ (some ORMs, serialisers) break; (3) no weak references without __weakref__ in slots; (4) multiple inheritance with slots is complex — all bases must have __slots__ for the savings to apply; (5) pickle support requires __getstate__/__setstate__ if custom __slots__ are used without __dict__. Mitigations: add __dict__ or __weakref__ to __slots__ explicitly when needed.

Python uses reference counting as its primary GC mechanism — objects are freed when their reference count reaches zero. The cyclic GC (gc module) handles reference cycles. __slots__ does not affect reference counting or cycle detection — slotted objects participate in both normally. However, slotted objects with no __dict__ are slightly cheaper for the GC to scan because there is no dict to traverse. Disabling the cyclic GC (gc.disable()) is only safe when you are certain no reference cycles exist — useful for acyclic data structures like trees in hot processing loops.

Caching Strategies
lru_cache, functools.cache, joblib.Memory, Redis with TTL, cached_property, cache invalidation

Four caching tiers for Python DS/AI systems: (1) in-process lru_cache for pure functions with hashable arguments — zero serialisation cost; (2) joblib.Memory for disk-level caching of expensive computations across process restarts; (3) Redis for cross-process and cross-machine shared caches — embedding lookups, feature stores; (4) functools.cached_property for per-instance computed attributes. Cache invalidation is the hard part: always include the model version and data version in cache keys.

lru_cache, joblib.Memory, Redis TTL cache, cached_property, invalidation
import functools
from functools import lru_cache, cache, cached_property
import numpy as np

# ── functools.cache: unlimited in-process memoisation ────────
@cache                          # equivalent to @lru_cache(maxsize=None)
def expensive(n: int) -> int:
    return sum(range(n))        # pure function — same input → same output

# ── lru_cache: bounded in-process memoisation ────────────────
@lru_cache(maxsize=10_000)      # evicts LRU when full
def get_embedding(text: str) -> tuple[float, ...]:
    vec = embedding_model.encode(text)
    return tuple(vec)           # tuple is hashable; list is not

print(get_embedding.cache_info())
# CacheInfo(hits=42, misses=100, maxsize=10000, currsize=100)
get_embedding.cache_clear()     # call when model version changes

# ── joblib.Memory: persistent disk cache ─────────────────────
from joblib import Memory

mem = Memory(location='.cache', verbose=0)

@mem.cache
def extract_features(data_path: str, feature_version: str) -> np.ndarray:
    """Cached by (data_path, feature_version) — bump version to invalidate."""
    df = pd.read_parquet(data_path)
    return compute_features(df)   # expensive operation, cached to disk

feats = extract_features('data/train.parquet', feature_version='v3')
# Second call: loads from .cache/ instantly

# ── Redis: cross-process, distributed cache ───────────────────
import redis, json, hashlib

r = redis.Redis(host='localhost', port=6379, db=0)

def get_cached_embedding(text: str, model_version: str) -> list[float]:
    # Include model_version in key — invalidates on model upgrade
    key = f"emb:{model_version}:{hashlib.md5(text.encode()).hexdigest()}"
    cached = r.get(key)
    if cached:
        return json.loads(cached)
    embedding = model.encode(text).tolist()
    r.setex(key, time=3600, value=json.dumps(embedding))  # TTL 1 hour
    return embedding

# Bulk invalidation when model changes
def invalidate_model_cache(old_version: str) -> int:
    pattern = f"emb:{old_version}:*"
    keys    = r.keys(pattern)
    if keys:
        r.delete(*keys)
    return len(keys)

# ── cached_property: memoised instance attribute ──────────────
class Dataset:
    def __init__(self, path: str):
        self._path = path

    @cached_property
    def features(self) -> np.ndarray:
        return pd.read_parquet(self._path).values  # called once, stored in __dict__

    def reload(self) -> None:
        """Invalidate cached_property by deleting it from __dict__."""
        self.__dict__.pop('features', None)
Pitfall @lru_cache on instance methods causes memory leaks

When applied directly to an instance method, self is one of the cache key arguments. The cache holds a strong reference to self — the object is never garbage collected even when all other references are gone.

Fix Cache at the class level or module level with string/hashable arguments instead of self. Or use functools.cached_property for instance-level caching — it stores in __dict__ and is released when the instance is collected.
Pitfall joblib cache returns stale features after upgrading a preprocessing library

joblib.Memory caches by argument hash — it does not detect that sklearn version changed or that a dependency function changed. After a library upgrade, the cache may silently return features computed with the old code.

Fix Include a version parameter in every @mem.cache function: def extract_features(path, feature_version="v3"). Bump feature_version when any dependency changes. This forces re-computation for the new version while keeping old cache entries until explicitly deleted.
Pitfall Redis cache missing model version in key — stale embeddings after model update

Key f"emb:{text_hash}" — after deploying a new embedding model, all cached embeddings are from the old model but still hit the cache. The vector store serves mismatched embeddings silently.

Fix Always include the model version in the cache key: f"emb:{model_version}:{text_hash}". On model update, either flush all keys matching the old version pattern or bump the version string (cache miss is acceptable during warm-up).

@lru_cache wraps the function at class-definition time, not per-instance. When the method is called, self is the first argument — the cache stores (self, arg1, arg2, ...) as the key, holding a strong reference to self. The object can never be garbage collected as long as the cache exists. The fix: use @functools.cached_property for properties (stores in instance __dict__, collected with the instance), or cache at module level with explicit keys that do not reference self.

Version your cache keys explicitly: include a model_version string in every cache key. For lru_cache: call cache_clear() after loading the new model. For joblib.Memory: bump the version argument and optionally call mem.clear(). For Redis: use a key pattern like "emb:{version}:*" and delete all matching keys with r.delete(*r.keys(pattern)) when deploying a new model. Never rely on TTL alone for model version invalidation — a 1-hour TTL means 1 hour of wrong results after every model update.

lru_cache: in-process, in-memory, no serialisation — zero overhead for cache hits. Evicts LRU items when maxsize is reached. Lost on process restart. Arguments must be hashable. Best for: repeated calls within the same process run (embedding lookups, preprocessing). joblib.Memory: persistent disk-based cache — survives process restarts. Handles arbitrary arguments (numpy arrays, DataFrames) by hashing their content. Slower cache hits (disk read vs dict lookup). Best for: expensive computations that take minutes (feature engineering, large data transforms) that you want to skip on the next run.

Premature optimisation is evil. Profile first with cProfile. Fix the one hotspot that matters. Test that the fix did not regress correctness.

End of Python reference

Now build something with it.

Theory compounds only when it meets production code.