A coding implementation to create an integrated Apache Beam pipeline performing batch and stream processing with event-time windowing using DirectorRunner

by
0 comments
A coding implementation to create an integrated Apache Beam pipeline performing batch and stream processing with event-time windowing using DirectorRunner

In this tutorial, we demonstrate how to create an integrated apache beam Pipelines that work seamlessly in both batch and stream-like modes using DirectRunner. We generate synthetic, event-time-aware data and apply fixed windowing with triggers and latency to allow us to demonstrate how Apache Beam handles both on-time and late events consistently. By simply switching the input source, we keep the core aggregation logic the same, which helps us clearly understand how Beam’s event-time model, windows, and panes behave without relying on external streaming infrastructure. check it out full code here,

!pip -q install -U "grpcio>=1.71.2" "grpcio-status>=1.71.2"
!pip -q install -U apache-beam crcmod


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezone

We install the required dependencies and ensure version compatibility so that Apache Beam. We import the core Beam API with the necessary windowing, triggers, and teststream utilities later in the pipeline. We also bring standard Python modules for time management and JSON formatting. check it out full code here,

MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120


def make_event(user_id, event_type, amount, event_time_epoch_s):
   return {"user_id": user_id, "event_type": event_type, "amount": float(amount), "event_time": int(event_time_epoch_s)}


base = datetime.now(timezone.utc).replace(microsecond=0)
t0 = int(base.timestamp())


BATCH_EVENTS = (
   make_event("u1", "purchase", 20, t0 + 5),
   make_event("u1", "purchase", 15, t0 + 20),
   make_event("u2", "purchase",  8, t0 + 35),
   make_event("u1", "refund",   -5, t0 + 62),
   make_event("u2", "purchase", 12, t0 + 70),
   make_event("u3", "purchase",  9, t0 + 75),
   make_event("u2", "purchase",  3, t0 + 50),
)

We define global configuration that controls window size, latency, and execution mode. We create synthetic events with explicit event-time timestamps so that the windowing behavior is deterministic and easy to reason about. We prepare a small dataset that intentionally includes out-of-order and late-arriving events to observe the event-time semantics of the beam. check it out full code here,

def format_joined_record(kv):
   user_id, d = kv
   return {
       "user_id": user_id,
       "count": int(d("count")(0)) if d("count") else 0,
       "sum_amount": float(d("sum_amount")(0)) if d("sum_amount") else 0.0,
   }


class WindowedUserAgg(beam.PTransform):
   def expand(self, pcoll):
       stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e("event_time")))
       windowed = stamped | beam.WindowInto(
           FixedWindows(WINDOW_SIZE_SECS),
           allowed_lateness=ALLOWED_LATENESS_SECS,
           trigger=AfterWatermark(
               early=AfterProcessingTime(10),
               late=AfterProcessingTime(10),
           ),
           accumulation_mode=AccumulationMode.ACCUMULATING,
       )
       keyed = windowed | beam.Map(lambda e: (e("user_id"), e("amount")))
       counts = keyed | beam.combiners.Count.PerKey()
       sums = keyed | beam.CombinePerKey(sum)
       return (
           {"count": counts, "sum_amount": sums}
           | beam.CoGroupByKey()
           | beam.Map(format_joined_record)
       )

We construct a reusable beam PTransform that encapsulates all window aggregation logic. We apply fixed windows, triggers, and accumulation rules, then group events by user and calculate counts and totals. We keep this transformation independent of the data source, so the same logic applies to both batch and streaming inputs. check it out full code here,

class AddWindowInfo(beam.DoFn):
   def process(self, element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
       ws = float(window.start)
       we = float(window.end)
       yield {
           **element,
           "window_start_utc": datetime.fromtimestamp(ws, tz=timezone.utc).strftime("%H:%M:%S"),
           "window_end_utc": datetime.fromtimestamp(we, tz=timezone.utc).strftime("%H:%M:%S"),
           "pane_timing": str(pane_info.timing),
           "pane_is_first": pane_info.is_first,
           "pane_is_last": pane_info.is_last,
       }


def build_test_stream():
   return (
       TestStream()
       .advance_watermark_to(t0)
       .add_elements((
           beam.window.TimestampedValue(make_event("u1", "purchase", 20, t0 + 5), t0 + 5),
           beam.window.TimestampedValue(make_event("u1", "purchase", 15, t0 + 20), t0 + 20),
           beam.window.TimestampedValue(make_event("u2", "purchase", 8, t0 + 35), t0 + 35),
       ))
       .advance_processing_time(5)
       .advance_watermark_to(t0 + 61)
       .add_elements((
           beam.window.TimestampedValue(make_event("u1", "refund", -5, t0 + 62), t0 + 62),
           beam.window.TimestampedValue(make_event("u2", "purchase", 12, t0 + 70), t0 + 70),
           beam.window.TimestampedValue(make_event("u3", "purchase", 9, t0 + 75), t0 + 75),
       ))
       .advance_processing_time(5)
       .add_elements((
           beam.window.TimestampedValue(make_event("u2", "purchase", 3, t0 + 50), t0 + 50),
       ))
       .advance_watermark_to(t0 + 121)
       .advance_watermark_to_infinity()
   )

We enrich each aggregated record with window and pane metadata so we can clearly see when and why results are emitted. We convert the beam’s internal timestamps to human-readable UTC time for clarity. We also define a teststream that simulates real streaming behavior using watermarks, processing-time advance, and latency data. check it out full code here,

def run_batch():
   with beam.Pipeline(options=PipelineOptions(())) as p:
       (
           p
           | beam.Create(BATCH_EVENTS)
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )


def run_stream():
   opts = PipelineOptions(())
   opts.view_as(StandardOptions).streaming = True
   with beam.Pipeline(options=opts) as p:
       (
           p
           | build_test_stream()
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )


run_stream() if MODE == "stream" else run_batch()

We wire everything together into executable batch and stream-like pipelines. We toggle between modes by changing a single flag, reusing the same aggregation transformation. We run the pipeline and print windowed results directly, making it easy to inspect the execution flow and output.

In conclusion, we demonstrated that the same beam pipeline can process both bounded batch data and unbounded, stream-like data while preserving the same windowing and aggregation semantics. We looked at how watermarks, triggers, and accumulation modes affect when results are released and how late data is updated in previously calculated windows. Additionally, we focused on the conceptual foundation of Beam’s integrated model, which provides a solid foundation for later scaling the same design to real streaming runners and production environments.


check it out full code hereAlso, feel free to follow us Twitter And don’t forget to join us 100k+ ml subreddit and subscribe our newsletterwait! Are you on Telegram? Now you can also connect with us on Telegram.

Check out our latest releases ai2025.devA 2025-focused analytics platform that models launches, benchmarks and transforms ecosystem activity into a structured dataset that you can filter, compare and export


Michael Sutter is a data science professional and holds a Master of Science in Data Science from the University of Padova. With a solid foundation in statistical analysis, machine learning, and data engineering, Michael excels in transforming complex datasets into actionable insights.

Related Articles

Leave a Comment