Thursday, January 23, 2020

Python study notes 6: Spark SQL, Pyspark tutorial examples

How do we debug in the pyspark program?
Due to the setup of parallel running in pyspark, it's not that easy to see the log output in the parallel process. For example, if you are using UDF GROUPED_MAP, we can not usually see the log output unless we specifically pull them via command line:
#you need to find out which application ID for your job: 
yarn logs -applicationId application_161921989****_#### > log_check1.txt
#it will output the log fiel to this text file: log_check1.txt
#then you can type the following to see the log file:  
vi log_check1.txt 
YARN (Yet Another Resource Negotiator) is a cluster management system. It has been part of Apache Hadoop since v2.0. With the help of YARN arbitrary applications can be executed on a Hadoop cluster.

Some commonly used code of via pyspark kernel in notebook to load hive file into python dataframe.
import datetime
import os
import math
import numpy as np
import pandas as pd
#import matplotlib.pyplot as plt
from import Pipeline
from import PipelineModel
from import RandomForestClassifier
from import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext, Row, HiveContext
from pyspark.sql.types import *
from import StringIndexer, VectorIndexer, VectorAssembler
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, length, countDistinct
from pyspark import StorageLevel
from random import randint
from datetime import datetime
#import pprint
#from IPython.display import display
import argparse

from pyspark.sql.functions import regexp_replace, regexp_extract, substring, isnull, isnan, 
from pyspark.sql.functions import upper, trim, levenshtein, udf,  datediff, to_date, concat,  instr
from pyspark.sql.functions import when, lit, col, abs, max, cos, sin, radians, atan2, sqrt, pow, count
from pyspark.sql.functions import levenshtein,unix_timestamp, spark_partition_id, monotonically_increasing_id
from pyspark.sql.functions as func 

import findspark
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

app_name = 'ML-other-data_{uc}'.format(uc=use_case)

spark = SparkSession.builder.master("yarn-client").appName(app_name) \
.config("spark.debug.maxToStringFields", "100").config('spark.sql.crossJoin.enabled','true') \
.config('spark.sql.codegen.wholeStage', 'false') \
.config('spark.sql.hive.manageFilesourcePartitions', 'false').enableHiveSupport().getOrCreate()

#note of the option: config('spark.sql.codegen.wholeStage', 'false') to avoid the error: 
#ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to compile:
#Compiling "GeneratedClass": Code of method "processNext()V" of class
#org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage20" grows beyond 64 KB


import pandas as pd
sc = spark.sparkContext


#some example creating dataframe from pandas
data1 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Name': {0: 'Owen', 1: 'Florence', 2: 'Laina', 3: 'Lily', 4: 'William'},
         'Sex': {0: 'male', 1: 'female', 2: 'female', 3: 'female', 4: 'male'},
         'Survived': {0: 0, 1: 1, 2: 1, 3: 1, 4: 0}}

# Pandas to Spark
df_pd = pd.DataFrame(data1, columns=data1.keys())
df_sp = spark_session.createDataFrame(df_pd)

# Spark to Pandas
df_pd = df.toPandas()

#save & read pyspark dataframe to gcs bucket: 
import pyspark
from pyspark.sql import SparkSession

data =[("James ","","Smith","36636","M",3000),
              ("Michael ","Rose","","40288","M",4000),
              ("Robert ","","Williams","42114","M",4000),
              ("Maria ","Anne","Jones","39192","F",4000),

#you can use spark SQL via temp view, easier to grab subset via sql

parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")

#you can even create temp view from parquet file directly: 
OPTIONS (path \"/tmp/output/people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()

    CREATE TABLE default.ak_test
    stored as parquet
    as select * from my_temp_table'''
#upload the temp table to hive

#pyspark drop 2 columns: 
join_df0 = join_df0.drop('iris_tax','iris_ao')
#no need for the [], not like 

#pyspark drop duplicates: 
df1=df(['name', 'height']).drop_duplicates(inplace=True)  

#summary describle stats:'col_var1').describe().show()'col_var1').summary().show()

#create a new comlumn
df2=df2.withColumn('AgeTimesFare', df2.Age*df2.Fare)

df = df.withColumn('col3', 
                    func.concat(sf.col('col1'),sf.lit('_'), sf.col('col2')))
df = df.withColumn('col3',func.coalesce(col("avm1"),col("avm2"),col("avm3")))
#when / otherwise statement, isin the list condition:
df = df.withColumn('new_var',when(col("var1).isin('q4','q5'),col("var2") )

df.withColumn("var1",when(df.var0 == 1, 'one')
                     .when(df.var0 == 2, 'two') 
                      .otherwise(lit(None)) #filled with NULL otherwise"*",when(df.var0 == 1, 'one')
              .when(df.var0 == 2, 'two')

from pyspark.sql.functions import expr"*",expr("CASE WHEN var0 == 1 THEN  'one' 
                         WHEN var0 == 2 THEN  'two' 
                         ELSE 'other' END AS var1"))
df.withColumn("var1",expr("CASE WHEN var0 == 1 THEN  'one' 
                                WHEN var0 == 2 THEN  'two' 
                                ELSE 'other' END AS var1"))
df.selectExpr("*","CASE WHEN var0 == 1 THEN  'one' 
                        WHEN var0 == 2 THEN  'two' 
                        ELSE 'other' END AS var1")

#subset a dataframe['PassengerId', 'Name']).show()
df1.filter(df1.Sex == 'female').show()

#get a random sample
df.sample(False, 0.1, seed=0).limit(1)
df.rdd.takeSample(True, 1000, seed=123) #takeSample(withReplacement, num, seed=None) 
#takeSample works on rdd, not on the pyspark dataframe directly.

#get the null value columns
from pyspark.sql.functions import  isnull, isnotnull,least, greatest, coalesce, col, lit,when,round
df.where(col("a").isNotNull()) #returns all records with dt_mvmt as None/Null

#simple stats summary
avg_cols = ['Age', 'Fare']

#convert the dataframe to a GroupedData object with groupby()
gdf2 = df2.groupby('Pclass')

avg_cols = ['Age', 'Fare']

#to get multiple stats functions on different columns:
gdf2.agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'}).show()

#To rename the columns count(1), avg(Age) etc, use toDF().
    .agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'})
    .toDF('Pclass', 'counts', 'average_age', 'total_fare')

#sort dataframe in spark:
df2.sort('Fare', ascending=False).show()

#join two dataframe in spark:
df1.join(df2, ['joinkey_var']).show() #default is inner join. 
df1.join(df2, ['joinkey_var'],how='left').show() 
#specify df1 as a left join df2 as b, df1 as left one

df1.join(df2, [df1.var1== df2.var1a, df1.var2== df2.var2a],how='left')'id1','cntycd','pclid')
       .join(df2, [df1.id1==df2.id1a],how='left').show()     

#use alias to change column names first, then do the join."puid").alias("puid_noavm"),col("pcnt").alias("pcnt_noavm"))"puid").alias("puid_avm"),col("pcnt").alias("pcnt_avm"))

df1.join(df2, df1.PassengerId <= df2.PassengerId).show() # Note the duplicate col names


#in python via value_counts(drop=False):

#some example of summary statistics over loop
for check1 in ('bad_1','bad_2' ,'bad_3' ,'bad_4a','bad_5a','bad_6a','bad_7','bad_8'):
t1b=temp0.pivot(index='type', columns='bad', values='cnt')
t1a=temp0.pivot(index='type', columns='bad', values='pcnt')

#get substring from a column in pyspark dataframe: 
#1. use WithColumn & substring(str, pos, len) fuction: 
from pyspark.sql.functions import substring
data = [(1,"20200828"),(2,"20180525")]
df.withColumn('year', substring('date', 1,4))\
    .withColumn('month', substring('date', 5,2))\
    .withColumn('day', substring('date', 7,2))

#2. use select & substring'date', substring('date', 1,4).alias('year'), \
                  substring('date', 5,2).alias('month'), \
                  substring('date', 7,2).alias('day'))
#3. use col & substr()
df3=df.withColumn('year', col('date').substr(1, 4))\
  .withColumn('month',col('date').substr(5, 2))\
  .withColumn('day', col('date').substr(7, 2))
Running spark from the command line:

--example:load gcs bucket file
file1 = "gs://idap-dev-stg-process-engine/geo_test/stg_record_tractnbr_output"
1. In case if you got some error message like: pyspark java.lang.OutOfMemory Error: Java heap space, most likely you are download too big hive data, you might want to break that into smaller pieces to download piece by piece.

Note that the java.lang.OutOfMemoryError: GC overhead limit exceeded error, is only thrown when 2% of the memory is freed after several GC cycles. This means that the small amount of heap the GC is able to clean will likely be quickly filled again, forcing the GC to restart the cleaning process again. This forms a vicious cycle where the CPU is 100% busy with GC and no actual work can be done. End users of the application face extreme slowdowns – operations which normally complete in milliseconds take minutes to finish.

So the “java.lang.OutOfMemoryError: GC overhead limit exceeded” message is a pretty nice example of a fail fast principle in action.
2. In case if you got error message: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext, most likely it is consequence of some error which happend earlier,say you have run some session previously, and somehow not able to finish successfully. In that case, you can add the code: spark.stop(), restart the kernel, also sometimes you might not have enough memory. add some code like:
--conf spark.cleaner.periodicGC.interval=60 
--conf spark.cleaner.referenceTracking.blocking=false
Question: How do we submit multiple jobs applications using one script in parallel?
Using the following code to run multiple jobs applications using one script, they will kick-off to run at the same time, you don't have open two windows/screens to submit those jobs manually:

spark-submit --conf spark.cores.max=8 --py-files --jars spark-bigquery-with-dependencies_2.11-0.17.2.jar --files piq_service.json &
spark-submit --conf spark.cores.max=8 --py-files --jars spark-bigquery-with-dependencies_2.11-0.17.2.jar --files piq_service.json &
Note: --conf spark.cores.max=8 is the key! otherwise, the first process will take all the resource to finish first, then the 2nd one will start.

In case if you submitted 17+ jobs in pyspark, you might realize you can only run 17 jobs in parallel, the tricky part is to set up: --conf spark.port.maxRetries=56, otherwise, those error messages will stop you process more jobs:"WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041."

spark-submit --conf spark.cores.max=8 --conf  spark.port.maxRetries=56 --py-files --jars spark-bigquery-with-dependencies_2.11-0.17.2.jar &
spark-submit --conf spark.cores.max=8 --conf  spark.port.maxRetries=56 --py-files --jars spark-bigquery-with-dependencies_2.11-0.17.2.jar &
spark-submit --conf spark.cores.max=8 --conf  spark.port.maxRetries=56 --py-files --jars spark-bigquery-with-dependencies_2.11-0.17.2.jar &
For this case, you can actually submit 56 jobs to run in parallel.

Question: How do you input parameter from the command line and run the scripts?

#will save as
import argparse  
parser = argparse.ArgumentParser()

parser.add_argument("-target_kbe", '--target_kbe', type=str, choices=['var1', 'var2'])
parser.add_argument("-model_nbr",'--model_nbr', nargs='?', const=1, type=int)  ##? or 0/1 parameters
parser.add_argument('-n', '--names_list', nargs='+', default=[])
args = parser.parse_args()
target_kbe = args.target_kbe

#the code script to submit in the command line: 
spark-submit --conf spark.cores.max=8  --jars spark-bigquery-with-dependencies_2.11-0.20.0.jar --py-files -target_kbe 'var1'  --model_nbr 0  -n lay1 layer2 tractnbr  stdsubdcd  zip9_avm  & 
spark-submit --conf spark.cores.max=8  --jars spark-bigquery-with-dependencies_2.11-0.20.0.jar --py-files -target_kbe 'var1'  --model_nbr 1  -n lay1 layer2 tractnbr  stdsubdcd  zip9_avm  & 
spark-submit --conf spark.cores.max=8  --jars spark-bigquery-with-dependencies_2.11-0.20.0.jar --py-files -target_kbe 'var2'  --model_nbr 0  -n lay1 layer2 tractnbr  stdsubdcd  zip9_avm  & 
spark-submit --conf spark.cores.max=8  --jars spark-bigquery-with-dependencies_2.11-0.20.0.jar --py-files -target_kbe 'var2'  --model_nbr 1  -n lay1 layer2 tractnbr  stdsubdcd  zip9_avm  & 

Question: How do we change the columns for the Pyspark dataframe? We can't do as what we usually do for python dataframe, simply using df.columns=(...) to rename the columns, it's not working that way. Here are several solutions, here is the most easy one via df1.schema.names:

for pyspark column rename: 
df1.schema.names = ['_'.join(col) for col in df1.schema.names]
#don't use df1.columns = df1.columns + '_year5', 
#you will get error message: TypeError: can only concatenate list (not "str") to list

#be careful: only change schema.names, it will not change df1.columns!
#double check df1.columns after you run the schema.names change! No change!

#read a csv file into pyspark dataframe
b1 ="com.databricks.spark.csv").option("header","true").option("delimiter","|").load('gs://folder/****_extract_20190524.csv')

oldColumns = b1.schema.names
newColumns = [w.replace('something_usually_the_file_name.','') for w in b1.columns]

df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), data)
here is another solution by using selectExpr():

data = sqlContext.createDataFrame([("Alberto", 2), ("Dakota", 2)], 
                                  ["Name", "askdaosdka"])

# Output
#|   Name|askdaosdka|
#|Alberto|         2|
#| Dakota|         2|

# |-- Name: string (nullable = true)
# |-- askdaosdka: long (nullable = true)

df = data.selectExpr("Name as name", "askdaosdka as age")
Here are 2 other solutions, one by alias:

#Option 3. using alias, in Scala you can also use as.
from pyspark.sql.functions import col

data ="Name").alias("name"), col("askdaosdka").alias("age"))

# Output
#|   name|age|
#|Alberto|  2|
#| Dakota|  2|
Option 4. Using sqlContext.sql, which lets you use SQL queries on DataFrames registered as tables.

sqlContext.registerDataFrameAsTable(data, "myTable")
df2 = sqlContext.sql("SELECT Name AS name, askdaosdka as age from myTable")
spark.mllib contains the legacy API built on top of RDDs, it's now in maintenance mode, not in the dev mode. provides higher-level API built on top of DataFrames for constructing ML pipelines.
After reaching feature parity (roughly estimated for Spark 2.3), the RDD-based API will be deprecated.
The RDD-based API/spark.Mllib is expected to be removed in Spark 3.0.

Spark(Apache Spark) is in-momory computation engine, whereas MapReduce is in-and-out disk computation engine. Which is the main purpose for Spark to free the limitation of MapReduce.

RDD is short for resilient distributed dataset: it is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

Spark is an open source processing engine for Big Data that brings together an impressive combination of speed, ease of use and advanced analytics.

Originally developed at the University of California, Berkeley's AMPLab. Spark enables applications in Hadoop clusters to run in-memory at up to 100x faster than MapReduce, while also delivering significant speed-ups when running purely on disk. Spark SQL provides an interface for users to query their data from Spark RDDs as well as other data sources such as Hive tables, parquet files and JSON files.

Spark's APIs in Python, Scala & Java make it easy to build parallel apps. Lastly, Spark provides strong support for streaming data and complex analytics where iterative calculations are used such as in machine learning and graph algorithms - this is where Spark shines brightest. Spark's versatility has led users to call it "the swiss army knife" of processing engine platforms as users can combine all of these capabilities in a single platform and workflow.

Spark SQL is a Spark module for structured data processing. One use of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation.

The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use SparkSession.builder. Most of the following code are coming the spark repo.

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \

With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.

# spark is an existing SparkSession
df ="examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# +----+-------+

In Python it's possible to access a DataFrame's columns either by attribute (df.age) or by indexing (df['age']).
While the former is convenient for interactive data exploration,
users are highly encouraged to use the latter form, which is future proof and won't break with column names that are also attributes on the DataFrame class.

In PySpark, we recommend using "s3://" to access the EMR(Elastic MapReduce) file system(EMRFS) in EMR and "s3a://" to access S3A file system in other environments. Examples:

data_s3 ="libsvm").load("s3://some-bucket/some-prefix")
data_s3a ="libsvm").load("s3a://some-bucket/some-prefix")

Training and Hosting a K-Means Clustering model using SageMaker PySpark:
A KMeansSageMakerEstimator runs a training job using the Amazon SageMaker KMeans algorithm upon invocation of fit(), returning a SageMakerModel.

from pyspark import SparkContext, SparkConf
from sagemaker_pyspark import IAMRole, classpath_jars
from sagemaker_pyspark.algorithms import KMeansSageMakerEstimator

# Load the sagemaker_pyspark classpath. If you used --jars to submit your job
# there is no need to do this in code.
conf = (SparkConf()
        .set("spark.driver.extraClassPath", ":".join(classpath_jars())))

iam_role = "arn:aws:iam:0123456789012:role/MySageMakerRole"

region = "us-east-1"
training_data ="libsvm").option("numFeatures", "784")

test_data ="libsvm").option("numFeatures", "784")

kmeans_estimator = KMeansSageMakerEstimator(


kmeans_model =

transformed_data = kmeans_model.transform(test_data) 

 # spark, df are from the previous example
# Print the schema in a tree format
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column"name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
# +----+-----+
# | age|count|
# +----+-----+
# |null|    1|
# |  30|    1|
# +----+-----+

The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.

# Register the DataFrame as a SQL temporary view

sqlDF = spark.sql("SELECT * FROM people")
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+    

Creating Datasets via Scala:

case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS() + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS =[Person]
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class.

from pyspark.sql import Row

sc = spark.sparkContext

neighbor_df.repartition(1).write.csv(path=csv_location, mode="append/overwrite", header="true")

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = l: l.split(","))
people = p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:'pyspark.RDD' of :class:'Row'.
teenNames = p: "Name: " +
for name in teenNames:
# Name: Justin
a DataFrame can be created programmatically with three steps:
1. Create an RDD of tuples or lists from the original RDD;
2. Create the schema represented by a StructType matching the structure of tuples or lists in the RDD created in the step 1.
3. Apply the schema to the RDD via createDataFrame method provided by SparkSession.

# Import data types
from pyspark.sql.types import *

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = l: l.split(","))
# Each line is converted to a tuple.
people = p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) 
 for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

No comments:

Post a Comment

Data Science Study Notes: recommendation engine notes 1: Deep matrix factorization using Apache MXNet

Deep matrix factorization using Apache MXNet ( notes from Oreilly , github notebook ) Recommendation engines are widely used models th...