Thursday, July 29, 2021

GCP Study Notes 8-a: Example of using Google dataflow & Apache Beam

In this tutorial, you'll learn the basics of the Cloud Dataflow service by running a simple example pipeline using the Apache Beam Python SDK. This pipeline will show you the basics of reading a text file from Google Cloud Storage, counting the number of unique words in the file, and finally writing the word counts back to Google Cloud Storage.

What's dataflow?
Auto-scale on the dataflow:
To use Dataflow, turn on the Cloud Dataflow APIs and open the Cloud Shell.

Dataflow runs jobs written using the Apache Beam SDK. To submit jobs to the Dataflow Service using Python, your development environment will require Python, the Google Cloud SDK, and the Apache Beam SDK for Python. Additionally, Cloud Dataflow uses pip3, Python's package manager, to manage SDK dependencies, and virtualenv to create isolated Python environments.

1. Install virtualenv and activate a Python virtual environment:
pip3 install --upgrade virtualenv --user

2. Create a Python virtual environment:
python3 -m virtualenv env

and activate it:
3. source env/bin/activate

In order to write a Python Dataflow job, you will first need to download the SDK from the repository. When you run this command, pip3 will download and install the appropriate version of the Apache Beam SDK.
4. pip3 install --quiet apache-beam[gcp]

5: to see the examples we are using:
pwd
~/env/lib/python3.7/site-packages

#list all the folders with beam:
ls |grep beam
pwd
/home/usename_id/env/lib/python3.7/site-packages/apache_beam/examples

Set up a Cloud Storage bucket, Cloud Dataflow uses Cloud Storage buckets to store output data and cache your pipeline code. In Cloud Shell, use the command gsutil mb to create a Cloud Storage bucket.

gsutil mb gs://symbol-idap-poc-de4d

In Cloud Dataflow, data processing work is represented by a pipeline. A pipeline reads input data, performs transformations on that data, and then produces output data. A pipeline's transformations might include filtering, grouping, comparing, or joining data.

Use Python to launch your pipeline on the Cloud Dataflow service. The running pipeline is referred to as a job:

python3 -m \ apache_beam.examples.wordcount \ --project symbol-idap-poc-pid \ --runner DataflowRunner \ --temp_location gs://symbol-idap-poc-gcs-userid/temp \ --output gs://symbol-idap-poc-gcs-userid/results/output \ --job_name dataflow-intro \ --region us-central1 python -m apache_beam.examples.wordcount \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://symbol-idap-poc-gcs-username/ouput \ --runner DataflowRunner \ --project symbol-idap-poc-de4d \ --region us-west1 \ --job_name dataflow-intro \ --service_account=tf-sybm-idap-poc@sybm-poc-pid.iam.gserviceaccount.com \ --subnetwork=https://www.googleapis.com/compute/v1/projects/sybm-network-sbx-nid/ \ regions/us-west1/subnetworks/sybm-app-us-w1-app-sbx-subnet \ --temp_location gs://symbol-idap-poc-gcs-username/temp/

--runner is the specific execution engine to use to run your pipeline. The DataflowRunner uses the Dataflow Service as the execution engine.
--temp_location is the storage bucket Cloud Dataflow will use for the binaries and other data for running your pipeline. This location can be shared across multiple jobs.
--output is the bucket used by the WordCount example to store the job results.
--job_name is a user-given unique identifier. Only one job may execute with the same name.

You might get some error message similar to the following:
"I wrote a simple python program 'guessing_game.py'. When I try to run this code in the command prompt using "python -m guessing_game.py" the program runs fine but in the end it says Error while finding module specification for 'guessing_game.py' (ModuleNotFoundError: path attribute not found on 'guessing_game' while trying to find 'guessing_game.py'). And when I run the same program using "python -guessing_game.py" it runs fine and it doesn't show that message as well."

The solution: change test.py to test, -m takes a module name, not a file path, and .py isn't part of the module name; so either python test.py or python -m test should be working. Since the argument is a module name, you must not give a file extension (.py). The module-name should be a valid Python module name.

python -m test \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://sybm-idap-poc-gcs-username/ouput \ --runner DataflowRunner \ --project sybm-idap-poc-decd \ --region us-west1 \ --service_account=tf-clgx-idap-poc@clgx-idap-poc-de4d.iam.gserviceaccount.com \ --subnetwork=https://www.googleapis.com/compute/v1/projects/sybm-network-sbx-77c3/ \ regions/us-west1/subnetworks/sybm-app-us-w1-app-sbx-subnet \ --temp_location gs://sybm-idap-poc-gcs-username/temp/

Use the following gcloud command-line tool command to view the dataflow service account:
gcloud iam roles describe roles/dataflow.serviceAgent

Controller service account: Pay attention to Terraform Service Accounts:
service_account=tf-nameid-poc@nameid-poc-de4d.iam.gserviceaccount.com
Terraform Service Accounts Module: This module allows easy creation of one or more service accounts, and granting them basic roles.

Compute Engine instances execute Apache Beam SDK operations in the cloud. These workers use your project’s controller service account to access your pipeline’s files and other resources. Dataflow also uses the controller service account to perform “metadata” operations, which don’t run on your local client or on Compute Engine workers. These operations perform tasks such as determining input sizes and accessing Cloud Storage files.

For the controller service account to be able to create, run, and examine a job, ensure that it has the roles/dataflow.admin and roles/dataflow.worker roles. In addition, the iam.serviceAccounts.actAs permission is required for your user account in order to impersonate the service account.

here are the example coding for the program test.py

import apache_beam as beam import argparse import logging import datetime, os from apache_beam.options.pipeline_options import PipelineOptions from datetime import datetime # change these to try this notebook out BUCKET = 'symb-idap-poc-gcs-bid' PROJECT = 'symb-idap-poc-de4d' REGION = 'us-west1' os.environ['BUCKET'] = BUCKET os.environ['PROJECT'] = PROJECT os.environ['REGION'] = REGION def compute_fit1(row): from scipy import stats import numpy as np durations = row['duration_array'] ag, bg, cg = stats.gamma.fit(durations) if np.isfinite(ag) and np.isfinite(bg) and np.isfinite(cg): result = {} result['cntycd'] = str(row['cntycd']) result['ag'] = ag result['bg'] = bg result['cg'] = cg yield result def run_job(): import shutil, os job_name = 'dataflow-test-' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S') print("\n ******** start Time = ",datetime.now().strftime("%H:%M:%S")) print('\n Launching Dataflow job {} ... hang on'.format(job_name)) #parser = argparse.ArgumentParser() #args, beam_args = parser.parse_known_args() for i in range(11): globals() ["query"+str(i+1)] = " SELECT cntycd,count(*) as cnt, \ ARRAY_AGG(livsqftnbr) AS duration_array FROM `dataflow_test.data_new4` \ where cntycd is not null and livsqftnbr is not null \ GROUP BY cntycd having cnt > 1000 " beam_options = PipelineOptions( runner='DataflowRunner', job_name=job_name, project=PROJECT, region=REGION ) # 'requirements_file': 'requirements.txt' with beam.Pipeline(options = beam_options) as p: (p | 'read_bq1' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query1, use_standard_sql=True)) | 'compute_fit1' >> beam.FlatMap(compute_fit1) | 'write_bq1' >> beam.io.gcp.bigquery.WriteToBigQuery( 'dataflow_test.out1', schema='cntycd:string,ag:FLOAT64,bg:FLOAT64,cg:FLOAT64', write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE) ) (p | 'read_bq2' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query2, use_standard_sql=True)) | 'compute_fit2' >> beam.FlatMap(compute_fit1) | 'write_bq2' >> beam.io.gcp.bigquery.WriteToBigQuery( 'dataflow_test.out2', schema='cntycd:string,ag:FLOAT64,bg:FLOAT64,cg:FLOAT64', write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE) ) run_job() print("\n ******** end Time = ", datetime.now().strftime("%H:%M:%S")) print('Done')

Difference btw FlatMap transform and Map transform:
These transforms in Beam are exactly same as Spark (Scala too).
A Map transform, maps from a PCollection of N elements into another PCollection of N elements.
A FlatMap transform maps a PCollections of N elements into N collections of zero or more elements, which are then flattened into a single PCollection.

As a simple example, the following happens:

beam.Create([1, 2, 3]) | beam.Map(lambda x: [x, 'any']) # The result is a collection of THREE lists: [[1, 'any'], [2, 'any'], [3, 'any']] Whereas: beam.Create([1, 2, 3]) | beam.FlatMap(lambda x: [x, 'any']) # The lists that are output by the lambda, are then flattened into a # collection of SIX single elements: [1, 'any', 2, 'any', 3, 'any'] #FlatMap requires that the function passed to it return a list; beam.Create([1, 2, 3]) | beam.FlatMap(lambda x: x) will raise an error #as always with PCollection, the order is arbitrary #- so it could be [1, 2, 3, 'any', 'any', 'any'] #Map transformation is "one to one" mapping on each element of list/collection. {"Amar", "Akabar", "Anthony"} -> {"Mr.Amar", "Mr.Akabar", "Mr.Anthony"} #FlatMap transformation is usually on collection like "list of list", #this collection gets flattened to single list and transformation/ #mapping is applied on each element of "list of list"/collection { {"Amar", "Akabar"}, "Anthony"} -> {"Mr.Amar", "Mr.Akabar", "Mr.Anthony"}

What's the Difference Between Batch and Streaming Processing?
In simple words: Stream="Continuous data"/"Real time data", Batch='Window of data'.

A batch is a collection of data points that have been grouped together within a specific time interval. Another term often used for this is a window of data. Streaming processing deals with continuous data and is key to turning big data into fast data.

Batch processing is most often used when dealing with very large amounts of data, and/or when data sources are legacy systems that are not capable of delivering data in streams.

Use cases for batch processing: Payroll, Billing, Orders from customers
Use cases for stream processing: fraud detection, log monirtoring, social media sentiment analysis

ParDo is useful for a variety of common data processing operations, including: Filtering a dataset: You can use Pardo to consider each element in a Pcollection and either output thtat element to a new collection or discard it.

The fundamental piece of every Beam program, a Pipeline contains the entire data processing task, from I/O to data transforms. An illustration of a 4-step pipeline is showed by the code snippet below; the three key elements I/O transform, PCollection, and PTransform are wrapped inside the Pipeline.

How to use Pandas in apache beam?
pandas is "supported", in the sense that you can use the pandas library the same way you'd be using it without Apache Beam, and the same way you can use any other library from your Beam pipeline as long as you specify the proper dependencies. It is also "supported" in the sense that it is bundled as a dependency by default so you don't have to specify it yourself. For example, you can write a DoFn that performs some computation using pandas for every element; a separate computation for each element, performed by Beam in parallel over all elements.

It is not supported in the sense that Apache Beam currently provides no special integration with it, e.g. you can't use a PCollection as a pandas dataframe, or vice versa. A PCollection does not physically contain any data (this should be particularly clear for streaming pipelines) - it is just a placeholder node in Beam's execution plan.

No comments:

Post a Comment

Data Science Study Notes: recommendation engine notes 1: Deep matrix factorization using Apache MXNet

Deep matrix factorization using Apache MXNet ( notes from Oreilly , github notebook ) Recommendation engines are widely used models th...