A coding guide to building a scalable end-to-end machine learning data pipeline using Daft for high-performance structured and image data processing

by
0 comments
A coding guide to building a scalable end-to-end machine learning data pipeline using Daft for high-performance structured and image data processing

In this tutorial, we will explore how we use fearlessly As a high-performance, Python-native data engine for building end-to-end analytical pipelines. We start by loading the real-world MNIST dataset, then progressively transform it using UDFs, feature engineering, aggregation, joins, and lazy execution. Furthermore, we demonstrate how to seamlessly combine structured data processing, numerical computation, and machine learning. Finally, we’re not just manipulating data, we’re building a complete model-ready pipeline powered by Daft’s scalable execution engine.

!pip -q install daft pyarrow pandas numpy scikit-learn


import os
os.environ("DO_NOT_TRACK") = "true"


import numpy as np
import pandas as pd
import daft
from daft import col


print("Daft version:", getattr(daft, "__version__", "unknown"))


URL = "https://github.com/Eventual-Inc/mnist-json/raw/master/mnist_handwritten_test.json.gz"


df = daft.read_json(URL)
print("nSchema (sampled):")
print(df.schema())


print("nPeek:")
df.show(5)

We install Daft and its supporting libraries directly into Google Colab to ensure a clean, reproducible environment. We configure optional settings and verify the installed version to confirm that everything is working correctly. By doing so, we establish a stable foundation on which to build our end-to-end data pipeline.

def to_28x28(pixels):
   arr = np.array(pixels, dtype=np.float32)
   if arr.size != 784:
       return None
   return arr.reshape(28, 28)


df2 = (
   df
   .with_column(
       "img_28x28",
       col("image").apply(to_28x28, return_dtype=daft.DataType.python())
   )
   .with_column(
       "pixel_mean",
       col("img_28x28").apply(lambda x: float(np.mean(x)) if x is not None else None,
                              return_dtype=daft.DataType.float32())
   )
   .with_column(
       "pixel_std",
       col("img_28x28").apply(lambda x: float(np.std(x)) if x is not None else None,
                              return_dtype=daft.DataType.float32())
   )
)


print("nAfter reshaping + simple features:")
df2.select("label", "pixel_mean", "pixel_std").show(5)

We load the real-world MNIST JSON dataset directly from a remote URL using Daft’s native reader. We inspect the schema and preview the data to understand its structure and column types. This allows us to validate the dataset before applying transformations and feature engineering.

@daft.udf(return_dtype=daft.DataType.list(daft.DataType.float32()), batch_size=512)
def featurize(images_28x28):
   out = ()
   for img in images_28x28.to_pylist():
       if img is None:
           out.append(None)
           continue
       img = np.asarray(img, dtype=np.float32)
       row_sums = img.sum(axis=1) / 255.0
       col_sums = img.sum(axis=0) / 255.0
       total = img.sum() + 1e-6
       ys, xs = np.indices(img.shape)
       cy = float((ys * img).sum() / total) / 28.0
       cx = float((xs * img).sum() / total) / 28.0
       vec = np.concatenate((row_sums, col_sums, np.array((cy, cx, img.mean()/255.0, img.std()/255.0), dtype=np.float32)))
       out.append(vec.astype(np.float32).tolist())
   return out


df3 = df2.with_column("features", featurize(col("img_28x28")))


print("nFeature column created (list(float)):")
df3.select("label", "features").show(2)

We resize the raw pixel arrays into structured 28×28 images using row-wise UDFs. We calculate statistical features, such as mean and standard deviation, to enrich the dataset. By applying these transformations, we convert the raw image data into a structured and model-friendly representation.

label_stats = (
   df3.groupby("label")
      .agg(
          col("label").count().alias("n"),
          col("pixel_mean").mean().alias("mean_pixel_mean"),
          col("pixel_std").mean().alias("mean_pixel_std"),
      )
      .sort("label")
)


print("nLabel distribution + summary stats:")
label_stats.show(10)


df4 = df3.join(label_stats, on="label", how="left")


print("nJoined label stats back onto each row:")
df4.select("label", "n", "mean_pixel_mean", "mean_pixel_std").show(5)

We apply a batch UDF to extract rich feature vectors from resized images. We perform group-by-group aggregation and add summary statistics back to the dataset for contextual enrichment. It demonstrates how we combine scalable computation with advanced analytics within Daft.

small = df4.select("label", "features").collect().to_pandas()


small = small.dropna(subset=("label", "features")).reset_index(drop=True)


X = np.vstack(small("features").apply(np.array).values).astype(np.float32)
y = small("label").astype(int).values


from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report


clf = LogisticRegression(max_iter=1000, n_jobs=None)
clf.fit(X_train, y_train)


pred = clf.predict(X_test)
acc = accuracy_score(y_test, pred)


print("nBaseline accuracy (feature-engineered LogisticRegression):", round(acc, 4))
print("nClassification report:")
print(classification_report(y_test, pred, digits=4))


out_df = df4.select("label", "features", "pixel_mean", "pixel_std", "n")
out_path = "/content/daft_mnist_features.parquet"
out_df.write_parquet(out_path)


print("nWrote parquet to:", out_path)


df_back = daft.read_parquet(out_path)
print("nRead-back check:")
df_back.show(3)

We convert the selected columns to pandas and train a baseline logistic regression model. We evaluate performance to validate the usefulness of our engineered features. Additionally, we continue to process processed datasets in Parquet format, completing our end-to-end pipeline from raw data ingestion to production-ready storage.

In this tutorial, we created a production-style data workflow using Daft, moving from raw JSON ingestion to feature engineering, aggregation, model training, and Parquet persistence. We demonstrated how to integrate advanced UDF logic, perform efficient groupby and join operations, and materialize the results for downstream machine learning within a clean, scalable framework. Through this process, we saw how Daft enables us to handle complex transformations while remaining Pythonic and efficient. We completed the work with a reusable, end-to-end pipeline that demonstrates how we can combine modern data engineering and machine learning workflows in a unified environment.


check it out full code here. Also, feel free to follow us Twitter And don’t forget to join us 120k+ ml subreddit and subscribe our newsletter. wait! Are you on Telegram? Now you can also connect with us on Telegram.


Michael Sutter is a data science professional and holds a Master of Science in Data Science from the University of Padova. With a solid foundation in statistical analysis, machine learning, and data engineering, Michael excels in transforming complex datasets into actionable insights.

Related Articles

Leave a Comment