Thursday, April 30, 2020

GCP Study Notes 8: step by step example to develop and deploy model using Bigquery and DataFlow

We will use step by step example to cover how to develop and deploy model using Bigquery and DataFlow.

The more sophisticated your models are, the more struggles you’ll face when it comes to production. Did you ever regret ensembling 5 different models when developing a customer churn classifier? Don’t worry, Apache Beam comes to rescue.

Example code to backup a BigQuery table via gcs bucket:
If an ETL job goes bad and you want to revert back to yesterday’s data, you can simply do:
#================================================
CREATE OR REPLACE TABLE dataset.table_restored
AS 
SELECT *
FROM dataset.table
FOR SYSTEM TIME AS OF 
  TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL -1 DAY)
#However, time travel is restricted to 7 days. 


bq show --schema dataset.table. # schema.json
bq --format=json show dataset.table.  # tbldef.json
bq extract --destination_format=AVRO \
           dataset.table gs://.../data_*.avro # AVRO files
           
bq load --source_format=AVRO \
    --time_partitioning_expiration ... \
    --time_partitioning_field ... \
    --time_partitioning_type ... \
    --clustering_fields ... \
    --schema ... \
    todataset.table_name \
    gs://.../data_*.avro  
    
# backup: If you do not need the backup to be in the form of files, 
# a much simpler way to backup your BigQuery table is use bq cp to backup:
date=...
bq mk dataset_${date}
bq cp dataset.table dataset_${date}.table
# restore
bq cp dataset_20200301.table dataset_restore.table    

#move data from bigqeruy to gcs bucket: 
bq query --format=csv --max_rows=999999 --use_legacy_sql=false\
  "select * from libarary1.data1 limit 20" > testing.csv
 
gsutil cp testing.csv 'gs://folder/hive_data/testing.csv'
 
#===========================================================
Why do we need to use Apache Beam via google cloud dataflow? Inituitively, Before getting started with Apache Beam, let’s see what options you have to operate your model.

The 1st option is creating a single virtual machine (VM) on the cloud for computing tasks. Fair enough, but setting up and managing that VM would be a headache cause it requires a lot of manual work (your cloud engineers will not like it!).

2. Cloud Dataproc helps to release the managing demands, and is another good option to consider as it provides computing resources that only live for the duration of one run. However, you’ll need to spend some time converting your Python code into PySpark or Scala, not to mention that you may not able to fully replicate what you did in Python with these programming languages.

3. Dataflow is likely to be a good option, since:
You can write Python3 using Apache Beam Python SDK to create a data pipeline that runs on the DataflowRunner backend. What does that mean? It means you can write Python3 to construct your pipeline, and operate your ML models built on Python directly without converting them into Scala or PySpark.

Dataflow is a fully managed service that minimises latency, processing time, and cost through autoscaling worker resources. Apache Beam is an open-source, unified model that allows users to build a program by using one of the open-source Beam SDKs (Python is one of them) to define data processing pipelines. The pipeline is then translated by Beam Pipeline Runners to be executed by distributed processing backends, such as Google Cloud Dataflow.

#================================================
#1. Creating datasets for Machine Learning using Dataflow
#run the following twice in notebook to avoid oauth2client error
pip install --user apache-beam[gcp]
#Note:  You may ignore the following responses in the cell output above:
#ERROR (in Red text) related to: witwidget-gpu, fairing
#WARNING (in Yellow text) related to: hdfscli, hdfscli-avro, pbr, fastavro, gen_client
#Restart the kernel before proceeding further (On the Notebook menu - Kernel - Restart Kernel).

import apache_beam as beam
print(beam.__version__)

# change these to try this notebook out
BUCKET = 'qwiklabs-gcp-04-********-vcm'
PROJECT = 'qwiklabs-gcp-04-********'
REGION = 'us-central1'

import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
  gsutil mb -l ${REGION} gs://${BUCKET}
fi

# Create SQL query using natality data after the year 2000
query = """
SELECT
  weight_pounds,
  is_male,
  mother_age,
  plurality,
  gestation_weeks,
  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
"""

# Call BigQuery and examine in dataframe
from google.cloud import bigquery
df = bigquery.Client().query(query + " LIMIT 100").to_dataframe()
df.head()
#===========================================================

Apache Beam is an open source, unified model for defining both batch- and streaming-data parallel-processing pipelines. The Apache Beam programming model simplifies the mechanics of large-scale data processing. The Apache Beam model provides useful abstractions that insulate you from low-level details of distributed processing, such as coordinating individual workers, sharding datasets.
#================================================
import datetime, os

def to_csv(rowdict):
  # Pull columns from BQ and create a line
  import hashlib
  import copy
  CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks'.split(',')

  # Create synthetic data where we assume that no ultrasound has been performed
  # and so we don't know sex of the baby. Let's assume that we can tell the difference
  # between single and multiple, but that the errors rates in determining exact number
  # is difficult in the absence of an ultrasound.
  no_ultrasound = copy.deepcopy(rowdict)
  w_ultrasound = copy.deepcopy(rowdict)

  no_ultrasound['is_male'] = 'Unknown'
  if rowdict['plurality'] > 1:
    no_ultrasound['plurality'] = 'Multiple(2+)'
  else:
    no_ultrasound['plurality'] = 'Single(1)'

  # Change the plurality column to strings
  w_ultrasound['plurality'] = ['Single(1)', 'Twins(2)', 'Triplets(3)', 'Quadruplets(4)', 'Quintuplets(5)'][rowdict['plurality'] - 1]

  # Write out two rows for each input row, one with ultrasound and one without
  for result in [no_ultrasound, w_ultrasound]:
    data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS])
    key = hashlib.sha224(data.encode('utf-8')).hexdigest()  # hash the columns to form a key
    yield str('{},{}'.format(data, key))
  
def preprocess(in_test_mode):
  import shutil, os, subprocess
  job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')

  if in_test_mode:
      print('Launching local job ... hang on')
      OUTPUT_DIR = './preproc'
      shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
      os.makedirs(OUTPUT_DIR)
  else:
      print('Launching Dataflow job {} ... hang on'.format(job_name))
      OUTPUT_DIR = 'gs://{0}/babyweight/preproc/'.format(BUCKET)
      try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
      except:
        pass

  options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
      'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
      'job_name': job_name,
      'project': PROJECT,
      'region': REGION,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True,
      'num_workers': 4,
      'max_num_workers': 5
  }
  opts = beam.pipeline.PipelineOptions(flags = [], **options)
  if in_test_mode:
      RUNNER = 'DirectRunner'
  else:
      RUNNER = 'DataflowRunner'
  p = beam.Pipeline(RUNNER, options = opts)
  query = """
SELECT
  weight_pounds,
  is_male,
  mother_age,
  plurality,
  gestation_weeks,
  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
AND weight_pounds > 0
AND mother_age > 0
AND plurality > 0
AND gestation_weeks > 0
AND month > 0
    """

  if in_test_mode:
    query = query + ' LIMIT 100' 

  for step in ['train', 'eval']:
    if step == 'train':
      selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) < 3'.format(query)
    else:
      selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) = 3'.format(query)

    (p 
     | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True))
     | '{}_csv'.format(step) >> beam.FlatMap(to_csv)
     | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))
    )

  job = p.run()
  if in_test_mode:
    job.wait_until_finish()
    print("Done!")
    
preprocess(in_test_mode = False)
#===========================================================

No comments:

Post a Comment

Python Study Notes: how to load stock data, manipulate data, find patterns for profit?

#================================================ from pandas_datareader import data as pdr #run the upgrade if see error: pandas_dataread...