GCP DataFlow-How To Create First Streaming Application

Kapil Jain
3 min readAug 7, 2021

--

Dataflow :
Google Cloud Dataflow is the data processing service, which supports large batch and real-time applications.
We can create a simple and complex data pipeline using a dataflow service.
In Dataflow we will create the pipeline and run whenever required means control the scheduling of service launches.
Dataflow having a predefine template as well for source and destination.

Distributed Programming Understanding :
It will be great if you priorly know any streaming framework like a spark and flink, it will be easy to relate or understand the underhood programming and technology.
Understanding distributed and parallel computing basic concepts will really help you to visualize things.

Apache Beam Programming Support :
The dataFlow is supporting the apache Beam API for developing the pipeline code.
The Apache Beam API provides the different programming language SDK like for java, python, etc.

PCollection :
The Apache beam is providing PCollection as an abstraction which provides an unbound collection of data. This PCollection is a collection of objects, which We use this PCollection for the transformation of input and output.

Transformation :
In Apache beam collection we can apply no. of transformation.
Explaining all transformations in quick time is difficult but still, I am explaining the basics below:

ParDo -> Similar like Map phase in map-reduce. It is applied to each record of PCollection.
GroupByKey -> Similar to Shuffle phase in MapReduce.
CoGroupByKey -> Create collection from multiple key or composite key.
Combine-> Combine the data of similar key in PCollection
Flatten->Merging multiple PCollection objects into Single logical objects.
Partition-> Segregate similar data together in PCollection.

Other Feature:
Windowing -> Fetching of record on the overlapping of the window after some interval or on append basis.

Sample Program
Streaming data based on new events or records arrived on Pubsub topics.

Data Streaming flow GCP DataFlow

From Pubsub(Distributed Queue) Topic -> DataFlowService(Apache Beam(Spark)) -> Bigquery(Dataset(table))

For the Below program, python SDK has required and also the google python API to access the GCP services.

import argparse
import logging
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import window
PROJECT = "<PROJECT_NAME>"# Bigquery table Schema As per your need
table_schema = 'col1:STRING, col2:INT, col3:INT, col4:INT, col5:TIMESTAMP'
TOPIC = "<PUBSUB_TOPIC_NAME>"# For priniting the data for checking the data
def printer(data):
print(data)
return data
# Split the data like below code is for csv
# we can split and kind of data like json
class Split(beam.DoFn):
def process(self, element):
element = element.split(",")
return [{
'col1': element[0],
'col2': int(element[1]),
'col3': int(element[2]),
'col4': int(element[3]),
'col5': int(element[4]),
}]
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())# PCollection and tranformation and Load data to big query table
(p
| 'ReadData From Pub Sub' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(printer)
| 'Parse CSV' >> beam.ParDo(Split())
| 'window' >> beam.WindowInto(window.FixedWindows(30))
| 'Write To BigQuery' >> beam.io.WriteToBigQuery('{0}:<Bigquery_dataset.table>'.format(PROJECT),
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "<service_key_path_for _local_sdk_run>.json"
logging.getLogger().setLevel(logging.INFO)
run()

For Code Execution :
Below are some required parameters to execute the dataflow pipeline.

python -m <Python_main_class_name> --runner=DataflowRunner --project="<GCP_project_name>" --region="us-central1" --temp_location="gs://<Location_Of_Bucket>" --staging_location="gs://<Location_Of_Bucket>/staging" --jobName="FirstDataflowApplication" --streaming

Questions?
If you have any questions, I will be happy to answer them. Please leave a comment for follow-up.
For more updates: Follow me Medium or Linkedin

Hope you enjoy learning and are ready to run your first streaming GCP dataflow application.. ❤

--

--

Kapil Jain
Kapil Jain

Written by Kapil Jain

Technophile | Backend Developer | Cloud Architect | Big Data Development and Services | Java Enthusiast | Transaction System | Data analysis | Data Engineer ❤

No responses yet