Apache Beam — From Zero to Hero Pt. 1: Batch Pipelines

Apache Beam — From Zero to Hero Pt. 1: Batch Pipelines

In the previous post I talked about how we can use Python Generators to create simple data pipelines. In this post I am going to introduce the Apache Beam framework for building production grade data pipelines, and build a batch pipeline with it while explaining some of its main concepts.

Introduction

Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines.

With Apache Beam we can implement complex and scalable data pipelines in a concise, clear manner. Apache Beam has 3 SDKs — Python, Java & Go (we’re going to use Python here). Pipelines written with the framework can run on different platforms via different runners — some of the available runners are Spark, Flink and Google Cloud Dataflow. By default, the runner being used is the DirectRunner, which is used in the development stages.

A typical structure of a pipeline looks something like this:

import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run(argv=None, save_main_session=True):
# Defining the pipeline's arguments
parser = argparse.ArgumentParser()
parser.add_argument(
--input',
default='#SOME GCS PATH#,
help='Input file pattern to process.')
...
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
# Defining the pipeline's steps
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'ReadData' >> ReadFromText()
....
if __name__ == '__main__':
run()

Now that we know what an Apache Beam pipeline is and more or less how it looks like, let’s get started on writing our pipeline.

The pipeline we’re going to implement is a basic wordcount pipeline with a twist. We’ll use 9 text books from 3 famous authors — Jules Verne, Jane Austen and Lewis Carroll. The pipeline is required to do the following:

  1. Read the books’ text files from Google Cloud Storage (GCS) while keeping the association from the text to its author — here we’ll learn about Connectors.
  2. Split the words in each line and filter out conjunctions — here we’ll learn about Bounded PCollections & ParDo Transform.
  3. Count appearances per author and word — here we’ll learn about some of the builtin transforms — Map & Aggregation Transforms.
  4. Writing the results to BigQuery — we’ll learn about the BigQuery Connector.

Our first step is to read the text files from the GCS bucket. We do that by using a Connector, so let’s first explain what Connectors are and what part they play in the pipeline.

Connectors

Connectors are Apache Beam’s components for reading and writing data from external sources such as GCS, BigQuery, Snowflake, Hadoop and many more — info about all of the available connectors can be found here.

Typically, the first and last steps in the pipeline would use a connector — we need to get the initial data to the pipeline from somewhere, and we’d typically want to save the pipeline’s results somewhere.

We’ll be using the TextIO connector, that allows us to read data from local text files or from GCS paths. We need to read text files from GCS with the name of the file we are reading (so that we’ll know to connect the text to the author). The text files reside in a separate directory per author so it will be fairly easy to extract the author’s name later on:

gs://blog-data-resources/books_txt_files/jane-austen/
gs://blog-data-resources/books_txt_files/jules-verne/
gs://blog-data-resources/books_txt_files/lewis-carroll/

We’ll use the ReadFromTextWithFilename function to get the text and the filename together:

import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.textio import ReadFromTextWithFilename
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
default='gs://blog-data-resources/books_txt_files/**',
help='Input file pattern to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Read files' >> ReadFromTextWithFilename(known_args.input))
if __name__ == '__main__':
run()

The function returns an object called PCollection that is used throughout the framework, and before we’ll move to implement the pipeline’s next steps we need to know what a PCollection is.

Bounded PCollections

A PCollection is an object that serves as pipeline data for Apache Beam operations. This is the object that Connectors (and the rest of the framework’s components) either receive, return or both. If we compare it to Generators from my previous post, a PCollection object serves a similar role to that a Generator object in a data pipeline — data is being streamed into it and out of it.

We can create an initial PCollection from a Data Source with one of the available connectors, or create it manually — like this:

beam.Create([
'To be, or not to be: that is the question: ',
"Whether 'tis nobler in the mind to suffer ",
'The slings and arrows of outrageous fortune, ',
'Or to take arms against a sea of troubles, ',
])

A PCollection can either be bounded or unbounded in size. As a rule of thumb, bounded PCollections are used in Batch Dataflows and unbounded PCollections are used in Streaming Dataflows.

Our next step in the pipeline is to split the line into words. In order to do this, we first need to learn about Transforms — specifically about the ParDo Transform.

ParDo Transform

Transforms are the different operations in the pipeline. If we want to modify, aggregate or flatten the data in any way, it is done via a transform.

There are several transforms that come builtin in the framework (and we’ll cover 3 of them in the next section), but here we need to implement one ourselves. In order to build a custom transform you need to create a class that sub-classes DoFN, like this:

class WordExtractingDoFn(beam.DoFn):
def process(self, element):
pass

Our ParDo transform needs to do 3 things:
1. Separate the words in the line.
2. Filter out the words that are conjunctions.
3. Extract the author’s name from the filename.

We split the words like this:

import reclass WordExtractingDoFn(beam.DoFn):
def process(self, element):
author, line = element
for word in re.findall(r"[\w']+", line, re.UNICODE):
yield word

Pay attention here — we’re using the yield keyword we’ve learned in the previous post, so the process function does not finish its execution until all of the words in the line are yielded.

We filter out conjunctions by using the NLTK module, like this:

import re
from nltk.corpus import stopwords
class WordExtractingDoFn(beam.DoFn):
def __init__(self):
self.stopwords = set(stopwords.words('english'))
def process(self, element):
filename, line = element
for word in re.findall(r"[\w']+", line, re.UNICODE):
if word not in self.stopwords:
yield word

Now all we have to do is to extract the author’s name from the filename, and yield it with each word as a tuple:

class WordExtractingDoFn(beam.DoFn):
def __init__(self):
self.stopwords = set(stopwords.words('english'))
def __extract_author_from_filename(self, filename):
pattern = re.compile(r'.*\/([\w\-]+)\/[\w\-]+\.txt')
match = pattern.match(filename)
if match is not None:
return match.group(1)
return Nonedef process(self, element):
filename, line = element
for word in re.findall(r"[\w']+", line, re.UNICODE):
if word not in self.stopwords:
yield (self.__extract_author_from_filename(filename), word)

This is how we add our transform to the pipeline:

with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Read files' >> ReadFromTextWithFilename(known_args.input)
| 'Split lines' >> beam.ParDo(WordExtractingDoFn()))

The next step in the pipeline is to count the occurrences of each word and author — for that, we’ll learn 3 of the builtin transforms.

Map & Aggregation Transforms

There are 2 transforms we can use to count our words appearances — GroupByKey & CombinePerKey. We can use both interchangeably here (the next step will need to be slightly different for each), but as a general rule we’d prefer using CombinePerKey as it scales better — a detailed explanation to why can be found here.

Both functions receive a tuple of 2 values and treat them as key-value pairs, grouping by the key and performing an aggregate function on the values. In our case, our key consists of the word and the author, and our aggregate function should be sum. We can count each (author, word) tuple appearance by mapping each tuple to 1, and then summarizing the 1 values for each tuple (this is the standard way to implement wordcount). It requires us to add a preliminary step that uses the Map transform, mapping each tuple to a 1 value.

A Map transform maps each of its input elements to output elements via a lambda function. Our pipeline with the 2 new steps looks like this:

(p
| 'Read files' >> ReadFromTextWithFilename(known_args.input)
| 'Split lines' >> beam.ParDo(WordExtractingDoFn())
| 'Pair with 1' >> beam.Map(lambda x: ((x[0], x[1]), 1))
| 'Sum per author & word' >> beam.CombinePerKey(sum))

We’re almost done — all we have to do now is to write the results to BigQuery using the BigQuery Connector.

BigQuery Connector

First, let’s add a parameter that specifies the destination table name in BigQuery:

parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://blog-data-resources/books_txt_files/**',
help='Input file pattern to process.')
parser.add_argument(
'--table_spec ',
dest='table_spec',
default='ilan-uzan-297514:tests.author_wordcount',
help='Destination BigQuery table.')

known_args, pipeline_args = parser.parse_known_args(argv)

Now we need to define our schema for the destination table (you can learn more about how to define schemas in BigQuery and in Apache Beam in general in here). Our schema is pretty simple:

table_schema = {
'fields': [
{'name':'author','type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'word', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'cnt', 'type': 'INTEGER', 'mode': 'NULLABLE'}
]
}

The WriteToBigQuery function accepts dictionaries as values, so in order to use it we first need to convert our elements from tuples to dictionaries — the easiest way is to use the Map transform again:

def to_json_row(element):
key, cnt = element
author, word = key
return {"author": author, "word": word, "cnt": cnt}(p
| 'Read files' >> ReadFromTextWithFilename(known_args.input)
| 'Split lines' >> beam.ParDo(WordExtractingDoFn())
| 'Pair with 1' >> beam.Map(lambda x: ((x[0], x[1]), 1))
| 'Sum per author & word' >> beam.CombinePerKey(sum)
| 'Format records to JSON' >> beam.Map(to_json_row))

Great, now let’s finally write to BigQuery! We choose to create the table if needed (according to the schema we defined)*** and to empty the data before each load — that way we can run the pipeline multiple times and the results aren’t duplicated.

(p
| 'Read files' >> ReadFromTextWithFilename(known_args.input)
| 'Split lines' >> beam.ParDo(WordExtractingDoFn())
| 'Pair with 1' >> beam.Map(lambda x: ((x[0], x[1]), 1))
| 'Sum per author & word' >> beam.CombinePerKey(sum)
| 'Format records to JSON' >> beam.Map(to_json_row)
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
known_args.table_spec,
schema=table_schema,
write_disposition=..WRITE_TRUNCATE,
create_disposition=..CREATE_IF_NEEDED))

*** We only have to define a schema if we want to use the CREATE_IF_NEEDED create disposition — it’s possible to not provide a schema if the table already exists, like this:

| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
known_args.table_spec,
write_disposition=..WRITE_TRUNCATE,
create_disposition=..CREATE_NEVER))

And that’s it — our pipeline is finished. We run the pipeline like we run any module in python:

$ python -m main --temp_location gs://tests_tmp/tmp

Because we’re loading data into BigQuery, we also need to specify the temp_location parameter — it’s a GCS path that the Apache Beam Runner of choice will stage the files in before loading them into BigQuery.

After running the pipeline, the data will be visible in BigQuery:

Conclusion

All of the code in the post can be found here. I hope you enjoyed reading, familiarised yourself with the Apache Beam framework and got a sense of how to write batch pipelines with it. If something is unclear, you’re of course welcome to ask in the comments.

In the next post we’ll implement a streaming pipeline, and we’ll learn some new concepts like Unbounded PCollections, Windowing & Triggers.