In this tutorial, we design an end-to-end, production-style analytics and modeling pipeline using wax Operating efficiently on millions of rows without materializing the data in memory. We generate a realistic, large-scale dataset, engineer rich behavior and city-level features using lazy expressions and predictive statistics and holistic insights at scale. We then integrate Vax with Scikit-Learn to train and evaluate predictive models, demonstrating how Vax can serve as the backbone for high-performance exploratory analytics and machine-learning workflows.
!pip -q install "vaex==4.19.0" "vaex-core==4.19.0" "vaex-ml==0.19.0" "vaex-viz==0.6.0" "vaex-hdf5==0.15.0" "pyarrow>=14" "scikit-learn>=1.3"
import os, time, json, numpy as np, pandas as pd
import vaex
import vaex.ml
from vaex.ml.sklearn import Predictor
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, average_precision_score
print("Python:", __import__("sys").version.split()(0))
print("vaex:", vaex.__version__)
print("numpy:", np.__version__)
print("pandas:", pd.__version__)
rng = np.random.default_rng(7)
n = 2_000_000
cities = np.array(("Montreal","Toronto","Vancouver","Calgary","Ottawa","Edmonton","Quebec City","Winnipeg"), dtype=object)
city = rng.choice(cities, size=n, replace=True, p=np.array((0.16,0.18,0.12,0.10,0.10,0.10,0.10,0.14)))
age = rng.integers(18, 75, size=n, endpoint=False).astype("int32")
tenure_m = rng.integers(0, 180, size=n, endpoint=False).astype("int32")
tx = rng.poisson(lam=22, size=n).astype("int32")
base_income = rng.lognormal(mean=10.6, sigma=0.45, size=n).astype("float64")
city_mult = pd.Series({"Montreal":0.92,"Toronto":1.05,"Vancouver":1.10,"Calgary":1.02,"Ottawa":1.00,"Edmonton":0.98,"Quebec City":0.88,"Winnipeg":0.90}).reindex(city).to_numpy()
income = (base_income * city_mult * (1.0 + 0.004*(age-35)) * (1.0 + 0.0025*np.minimum(tenure_m,120))).astype("float64")
income = np.clip(income, 18_000, 420_000)
noise = rng.normal(0, 1, size=n).astype("float64")
score_latent = (
0.55*np.log1p(income/1000.0)
+ 0.28*np.log1p(tx)
+ 0.18*np.sqrt(np.maximum(tenure_m,0)/12.0 + 1e-9)
- 0.012*(age-40)
+ 0.22*(city == "Vancouver").astype("float64")
+ 0.15*(city == "Toronto").astype("float64")
+ 0.10*(city == "Ottawa").astype("float64")
+ 0.65*noise
)
p = 1.0/(1.0 + np.exp(-(score_latent - np.quantile(score_latent, 0.70))))
target = (rng.random(n) < p).astype("int8")
df = vaex.from_arrays(city=city, age=age, tenure_m=tenure_m, tx=tx, income=income, target=target)
df("income_k") = df.income / 1000.0
df("tenure_y") = df.tenure_m / 12.0
df("log_income") = df.income.log1p()
df("tx_per_year") = df.tx / (df.tenure_y + 0.25)
df("value_score") = (0.35*df.log_income + 0.20*df.tx_per_year + 0.10*df.tenure_y - 0.015*df.age)
df("is_new") = df.tenure_m < 6
df("is_senior") = df.age >= 60
print("nRows:", len(df), "Columns:", len(df.get_column_names()))
print(df(("city","age","tenure_m","tx","income","income_k","value_score","target")).head(5))
We create a large, realistic synthetic dataset and initialize a VAX dataframe to work sluggishly on millions of rows. We engineer key numerical attributes as direct expressions so that no intermediate data is materialized. We validate the setup by inspecting a small sample of the schema, row count, and calculated values.
encoder = vaex.ml.LabelEncoder(features=("city"))
df = encoder.fit_transform(df)
city_map = encoder.labels_("city")
inv_city_map = {v:k for k,v in city_map.items()}
n_cities = len(city_map)
p95_income_k_by_city = df.percentile_approx("income_k", 95, binby="label_encoded_city", shape=n_cities, limits=(-0.5, n_cities-0.5))
p50_value_by_city = df.percentile_approx("value_score", 50, binby="label_encoded_city", shape=n_cities, limits=(-0.5, n_cities-0.5))
avg_income_k_by_city = df.mean("income_k", binby="label_encoded_city", shape=n_cities, limits=(-0.5, n_cities-0.5))
target_rate_by_city = df.mean("target", binby="label_encoded_city", shape=n_cities, limits=(-0.5, n_cities-0.5))
n_by_city = df.count(binby="label_encoded_city", shape=n_cities, limits=(-0.5, n_cities-0.5))
p95_income_k_by_city = np.asarray(p95_income_k_by_city).reshape(-1)
p50_value_by_city = np.asarray(p50_value_by_city).reshape(-1)
avg_income_k_by_city = np.asarray(avg_income_k_by_city).reshape(-1)
target_rate_by_city = np.asarray(target_rate_by_city).reshape(-1)
n_by_city = np.asarray(n_by_city).reshape(-1)
city_table = pd.DataFrame({
"city_id": np.arange(n_cities),
"city": (inv_city_map(i) for i in range(n_cities)),
"n": n_by_city.astype("int64"),
"avg_income_k": avg_income_k_by_city,
"p95_income_k": p95_income_k_by_city,
"median_value_score": p50_value_by_city,
"target_rate": target_rate_by_city
}).sort_values(("target_rate","p95_income_k"), ascending=False)
print("nCity summary:")
print(city_table.to_string(index=False))
df_city_features = vaex.from_pandas(city_table(("city","p95_income_k","avg_income_k","median_value_score","target_rate")), copy_index=False)
df = df.join(df_city_features, on="city", rsuffix="_city")
df("income_vs_city_p95") = df.income_k / (df.p95_income_k + 1e-9)
df("value_vs_city_median") = df.value_score - df.median_value_score
We encode categorical city data and calculate scalable, predictive per-city statistics using binning-based operations. We aggregate these aggregates into a city-level table and add them back to the main dataset. We then obtain relative features that compare each record to its city context.
features_num = (
"age","tenure_y","tx","income_k","log_income","tx_per_year","value_score",
"p95_income_k","avg_income_k","median_value_score","target_rate",
"income_vs_city_p95","value_vs_city_median"
)
scaler = vaex.ml.StandardScaler(features=features_num, with_mean=True, with_std=True, prefix="z_")
df = scaler.fit_transform(df)
features = ("z_"+f for f in features_num) + ("label_encoded_city")
df_train, df_test = df.split_random((0.80, 0.20), random_state=42)
model = LogisticRegression(max_iter=250, solver="lbfgs", n_jobs=None)
vaex_model = Predictor(model=model, features=features, target="target", prediction_name="pred")
t0 = time.time()
vaex_model.fit(df=df_train)
fit_s = time.time() - t0
df_test = vaex_model.transform(df_test)
y_true = df_test("target").to_numpy()
y_pred = df_test("pred").to_numpy()
auc = roc_auc_score(y_true, y_pred)
ap = average_precision_score(y_true, y_pred)
print("nModel:")
print("fit_seconds:", round(fit_s, 3))
print("test_auc:", round(float(auc), 4))
print("test_avg_precision:", round(float(ap), 4))
We standardize all numerical features using Vaex’s ML utilities and generate a consistent feature vector for modeling. We partition the dataset without loading the entire dataset into memory. We train a logistic regression model using Vax’s Sklaren wrapper and evaluate its predictive performance.
deciles = np.linspace(0, 1, 11)
cuts = np.quantile(y_pred, deciles)
cuts(0) = -np.inf
cuts(-1) = np.inf
bucket = np.digitize(y_pred, cuts(1:-1), right=True).astype("int32")
df_test_local = vaex.from_arrays(y_true=y_true.astype("int8"), y_pred=y_pred.astype("float64"), bucket=bucket)
lift = df_test_local.groupby(by="bucket", agg={"n": vaex.agg.count(), "rate": vaex.agg.mean("y_true"), "avg_pred": vaex.agg.mean("y_pred")}).sort("bucket")
lift_pd = lift.to_pandas_df()
baseline = float(df_test_local("y_true").mean())
lift_pd("lift") = lift_pd("rate") / (baseline + 1e-12)
print("nDecile lift table:")
print(lift_pd.to_string(index=False))
We analyze model behavior by dividing predictions into deciles and calculating lift metrics. We calculate base rates and compare them in score buckets to assess ranking quality. We summarize the results in a compact lift table that reflects real-world model diagnostics.
out_dir = "/content/vaex_artifacts"
os.makedirs(out_dir, exist_ok=True)
parquet_path = os.path.join(out_dir, "customers_vaex.parquet")
state_path = os.path.join(out_dir, "vaex_pipeline.json")
base_cols = ("city","label_encoded_city","age","tenure_m","tenure_y","tx","income","income_k","value_score","target")
export_cols = base_cols + ("z_"+f for f in features_num)
df_export = df(export_cols).sample(n=500_000, random_state=1)
if os.path.exists(parquet_path):
os.remove(parquet_path)
df_export.export_parquet(parquet_path)
pipeline_state = {
"vaex_version": vaex.__version__,
"encoder_labels": {k: {str(kk): int(vv) for kk,vv in v.items()} for k,v in encoder.labels_.items()},
"scaler_mean": (float(x) for x in scaler.mean_),
"scaler_std": (float(x) for x in scaler.std_),
"features_num": features_num,
"export_cols": export_cols,
}
with open(state_path, "w") as f:
json.dump(pipeline_state, f)
df_reopen = vaex.open(parquet_path)
df_reopen("income_k") = df_reopen.income / 1000.0
df_reopen("tenure_y") = df_reopen.tenure_m / 12.0
df_reopen("log_income") = df_reopen.income.log1p()
df_reopen("tx_per_year") = df_reopen.tx / (df_reopen.tenure_y + 0.25)
df_reopen("value_score") = (0.35*df_reopen.log_income + 0.20*df_reopen.tx_per_year + 0.10*df_reopen.tenure_y - 0.015*df_reopen.age)
df_city_features = vaex.from_pandas(city_table(("city","p95_income_k","avg_income_k","median_value_score","target_rate")), copy_index=False)
df_reopen = df_reopen.join(df_city_features, on="city", rsuffix="_city")
df_reopen("income_vs_city_p95") = df_reopen.income_k / (df_reopen.p95_income_k + 1e-9)
df_reopen("value_vs_city_median") = df_reopen.value_score - df_reopen.median_value_score
with open(state_path, "r") as f:
st = json.load(f)
labels_city = {k: int(v) for k,v in st("encoder_labels")("city").items()}
df_reopen("label_encoded_city") = df_reopen.city.map(labels_city, default_value=-1)
for i, feat in enumerate(st("features_num")):
mean_i = st("scaler_mean")(i)
std_i = st("scaler_std")(i) if st("scaler_std")(i) != 0 else 1.0
df_reopen("z_"+feat) = (df_reopen(feat) - mean_i) / std_i
df_reopen = vaex_model.transform(df_reopen)
print("nArtifacts written:")
print(parquet_path)
print(state_path)
print("nReopened parquet predictions (head):")
print(df_reopen(("city","income_k","value_score","pred","target")).head(8))
print("nDone.")
We export a large, feature-complete sample into Parquet and continue to maintain the full preprocessing condition for reproducibility. We reload the data and definitively reconstruct all engineered features from the saved metadata. We run inference on the reloaded dataset to confirm that the pipeline remains stable and deployable end-to-end.
Finally, we demonstrated how VAX enables fast, memory-efficient data processing while supporting advanced feature engineering, aggregation, and model integration. We showed that approximate statistics, lazy computation, and out-of-core execution allow us to cleanly scale from analytics to deployment-ready artifacts. By exporting reproducible features and maintaining full pipeline state, we closed the loop from raw data to inference, demonstrating how VAX fits naturally into modern big-data Python workflows.
check it out full code here. Also, feel free to follow us Twitter And don’t forget to join us 120k+ ml subreddit and subscribe our newsletter. wait! Are you on Telegram? Now you can also connect with us on Telegram.
