Thursday, January 23, 2020

Python study notes 6: Spark SQL, Pyspark tutorial examples

Some commonly used code of via pyspark kernel in notebook to load hive file into python dataframe.
import datetime
import os
import math
import numpy as np
import pandas as pd
#import matplotlib.pyplot as plt
from import Pipeline
from import PipelineModel
from import RandomForestClassifier
from import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext, Row, HiveContext
from pyspark.sql.types import *
from import StringIndexer, VectorIndexer, VectorAssembler
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, length, countDistinct
from pyspark import StorageLevel
from random import randint
from datetime import datetime
#import pprint
#from IPython.display import display
import argparse

from pyspark.sql.functions import regexp_replace, regexp_extract, substring, isnull, isnan, 
from pyspark.sql.functions import upper, trim, levenshtein, udf,  datediff, to_date, concat,  instr
from pyspark.sql.functions import when, lit, col, abs, max, cos, sin, radians, atan2, sqrt, pow, count
from pyspark.sql.functions import levenshtein,unix_timestamp, spark_partition_id, monotonically_increasing_id
from pyspark.sql.functions as func 

import findspark
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import pandas as pd
sc = spark.sparkContext


#some example creating dataframe from pandas
data1 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Name': {0: 'Owen', 1: 'Florence', 2: 'Laina', 3: 'Lily', 4: 'William'},
         'Sex': {0: 'male', 1: 'female', 2: 'female', 3: 'female', 4: 'male'},
         'Survived': {0: 0, 1: 1, 2: 1, 3: 1, 4: 0}}

# Pandas to Spark
df_pd = pd.DataFrame(data1, columns=data1.keys())
df_sp = spark_session.createDataFrame(df_pd)

# Spark to Pandas
df_pd = df.toPandas()

#save & read pyspark dataframe to gcs bucket: 
import pyspark
from pyspark.sql import SparkSession

data =[("James ","","Smith","36636","M",3000),
              ("Michael ","Rose","","40288","M",4000),
              ("Robert ","","Williams","42114","M",4000),
              ("Maria ","Anne","Jones","39192","F",4000),

#you can use spark SQL via temp view, easier to grab subset via sql

parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")

#you can even create temp view from parquet file directly: 
OPTIONS (path \"/tmp/output/people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()

    CREATE TABLE default.ak_test
    stored as parquet
    as select * from my_temp_table'''
#upload the temp table to hive

#pyspark drop 2 columns: 
join_df0 = join_df0.drop('iris_tax','iris_ao')
#no need for the [], not like 

#pyspark drop duplicates: 
df1=df(['name', 'height']).drop_duplicates(inplace=True)  

#create a new comlumn
df2=df2.withColumn('AgeTimesFare', df2.Age*df2.Fare)

df = df.withColumn('col3', 
                    func.concat(sf.col('col1'),sf.lit('_'), sf.col('col2')))
df = df.withColumn('col3',func.coalesce(col("avm1"),col("avm2"),col("avm3")))
#when / otherwise statement, isin the list condition:
df = df.withColumn('new_var',when(col("var1).isin('q4','q5'),col("var2") ).otherwise(col("var3")))

df.withColumn("var1",when(df.var0 == 1, 'one').when(df.var0 == 2, 'two').otherwise('other'))"*",when(df.var0 == 1, 'one').when(df.var0 == 2, 'two').otherwise('other').alias('var1'))

from pyspark.sql.functions import expr"*",expr("CASE WHEN var0 == 1 THEN  'one' WHEN var0 == 2 THEN  'two' ELSE 'other' END AS var1"))
df.withColumn("var1",expr("CASE WHEN var0 == 1 THEN  'one' WHEN var0 == 2 THEN  'two' ELSE 'other' END AS var1"))
df.selectExpr("*","CASE WHEN var0 == 1 THEN  'one' WHEN var0 == 2 THEN  'two' ELSE 'other' END AS var1")

#subset a dataframe['PassengerId', 'Name']).show()
df1.filter(df1.Sex == 'female').show()

#get a random sample
df.sample(False, 0.1, seed=0).limit(1)
df.rdd.takeSample(True, 1000, seed=123) #takeSample(withReplacement, num, seed=None) 
#takeSample works on rdd, not on the pyspark dataframe directly.

#get the null value columns
from pyspark.sql.functions import  isnull, isnotnull,least, greatest, coalesce, col, lit,when,round
df.where(col("a").isNotNull()) #returns all records with dt_mvmt as None/Null

#simple stats summary
avg_cols = ['Age', 'Fare']

#convert the dataframe to a GroupedData object with groupby()
gdf2 = df2.groupby('Pclass')

avg_cols = ['Age', 'Fare']

#to get multiple stats functions on different columns:
gdf2.agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'}).show()

#To rename the columns count(1), avg(Age) etc, use toDF().
    .agg({'*': 'count', 'Age': 'avg', 'Fare':'sum'})
    .toDF('Pclass', 'counts', 'average_age', 'total_fare')

#sort dataframe in spark:
df2.sort('Fare', ascending=False).show()

#join two dataframe in spark:
df1.join(df2, ['joinkey_var']).show() #default is inner join. 
df1.join(df2, ['joinkey_var'],how='left').show() 
#specify df1 as a left join df2 as b, df1 as left one

df1.join(df2, [df1.var1== df2.var1a, df1.var2== df2.var2a],how='left')'id1','cntycd','pclid')
       .join(df2, [df1.id1==df2.id1a],how='left').show()     

#use alias to change column names first, then do the join."puid").alias("puid_noavm"),col("pcnt").alias("pcnt_noavm"))"puid").alias("puid_avm"),col("pcnt").alias("pcnt_avm"))

df1.join(df2, df1.PassengerId <= df2.PassengerId).show() # Note the duplicate col names


#in python via value_counts(drop=False):

#some example of summary statistics over loop
for check1 in ('bad_1','bad_2' ,'bad_3' ,'bad_4a','bad_5a','bad_6a','bad_7','bad_8'):
t1b=temp0.pivot(index='type', columns='bad', values='cnt')
t1a=temp0.pivot(index='type', columns='bad', values='pcnt')

#get substring from a column in pyspark dataframe: 
#1. use WithColumn & substring(str, pos, len) fuction: 
from pyspark.sql.functions import substring
data = [(1,"20200828"),(2,"20180525")]
df.withColumn('year', substring('date', 1,4))\
    .withColumn('month', substring('date', 5,2))\
    .withColumn('day', substring('date', 7,2))

#2. use select & substring'date', substring('date', 1,4).alias('year'), \
                  substring('date', 5,2).alias('month'), \
                  substring('date', 7,2).alias('day'))
#3. use col & substr()
df3=df.withColumn('year', col('date').substr(1, 4))\
  .withColumn('month',col('date').substr(5, 2))\
  .withColumn('day', col('date').substr(7, 2))
Use bigquery to download/upload data from/to pyspark:
from util.BigqueryHandler import BigqueryHandler

spark = sparkSession.getOrCreate()

project_id = 'project_id_name'
tmp_gcs_bucket = 'bucket_name'
bq = BigqueryHandler(project_id=project_id, spark_session=spark, tmp_gcs_bucket=tmp_bucket, sa=sa_file)

# whatever dataset you want
dataset = 'database1'

# whatever var name and table you want - returns a spark dataframe
data1 = bq.read_bq_table_spark(dataset=data_schema1, table='table_name1')

# data1 is your spark dataframe
bq.write_bq_table_spark(df=data1, dataset=data_schema2, table='table_name2')

#Save the previous file as, then in the command line run:  
spark-submit --py-files --jars spark-bigquery-with-dependencies_2.11-0.17.2.jar

#if requres special service account permission: 
spark-submit --py-files --jars spark-bigquery-with-dependencies_2.11-0.17.2.jar --files piq_service.json

#here is another direct way to load/save data:
df ='bigquery')\
          .option('table', '{pid}:{ds}.{tbl}'.format(pid=project_id, ds=dataset, tbl=table))\
          .option('maxParallelism', '56')\
          .option('viewMaterializationDataset', 'database1')\
          .option('viewsEnabled', 'True')\
     #    .option('parentProject', project_id)\
     #    .option('credentialsFile', sa_file)\

#Reading from views is disabled by default. In order to enable it, 
#either set the viewsEnabled option when reading the specific view (.option("viewsEnabled", "true")) 
#or set it globally by calling spark.conf.set("viewsEnabled", "true").

#you might get error message for the no permission to bigquery.create when you access the views from the tables. 
#BigQuery views are not materialized by default, which means that the connector needs to materialize them before it can read them.
#so you need to add the code: .option('viewMaterializationDataset', 'database1')\

#By default, the materialized views are created in the same project and dataset. 
#Those can be configured by the optional viewMaterializationProject and viewMaterializationDataset options, respectively. 
#These options can also be globally set by calling spark.conf.set(...) before reading the views.

df1=spark.sql(''' select var1,var2, case when ... from df where ...''')
#defined a fuction to load data.
def read_bq_table_spark(dataset, table):        
        df ='bigquery')\
                    .option('table', '{pid}:{ds}.{tbl}'.format(pid=project_id, ds=dataset, tbl=table))\
                    .option('maxParallelism', '56')\
                    .option('viewsEnabled', 'True')\
                    .option('viewMaterializationDataset', 'stg_property_panoramiq')\
                    .option('parentProject', self.project_id)\
        return df

#defined a fuction to save data back to bigqery.
def write_bq_table_spark(df, dataset, table):

            .option('parentProject', self.project_id)\
            .option('table', '{pid}:{ds}.{tbl}'.format(pid=self.project_id, ds=dataset, tbl=table))\
            .option('TemporaryGcsBucket', tmp_gcs_bucket)\
#note we are trying to upload a pyspark dataframe to bigquery, 
#if you have a hive data, you might need to load the hive table first: 

temp=spark.sql('''select * from {hive_schema}.{table_name} '''.format(hive_schema=schema1,table_name='table1'))
bq.write_bq_table_spark(df=temp, dataset=schema1, table='table1')

1. In case if you got some error message like: pyspark java.lang.OutOfMemory Error: Java heap space, most likely you are download too big hive data, you might want to break that into smaller pieces to download piece by piece.

Note that the java.lang.OutOfMemoryError: GC overhead limit exceeded error, is only thrown when 2% of the memory is freed after several GC cycles. This means that the small amount of heap the GC is able to clean will likely be quickly filled again, forcing the GC to restart the cleaning process again. This forms a vicious cycle where the CPU is 100% busy with GC and no actual work can be done. End users of the application face extreme slowdowns – operations which normally complete in milliseconds take minutes to finish.

So the “java.lang.OutOfMemoryError: GC overhead limit exceeded” message is a pretty nice example of a fail fast principle in action.
2. In case if you got error message: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext, most likely it is consequence of some error which happend earlier,say you have run some session previously, and somehow not able to finish successfully. In that case, you can add the code: spark.stop(), restart the kernel, also sometimes you might not have enough memory. add some code like:
--conf spark.cleaner.periodicGC.interval=60 
--conf spark.cleaner.referenceTracking.blocking=false
Question: How do we change the columns for the Pyspark dataframe? We can't do as what we usually do for python dataframe, simply using df.columns=(...) to rename the columns, it's not working that way. Here are several solutions, here is the most easy one via df1.schema.names:

for pyspark column rename: 
df1.schema.names = ['_'.join(col) for col in df1.schema.names]
#don't use df1.columns = df1.columns + '_year5', 
#you will get error message: TypeError: can only concatenate list (not "str") to list

#be careful: only change schema.names, it will not change df1.columns!
#double check df1.columns after you run the schema.names change! No change!

#read a csv file into pyspark dataframe
b1 ="com.databricks.spark.csv").option("header","true").option("delimiter","|").load('gs://folder/****_extract_20190524.csv')

oldColumns = b1.schema.names
newColumns = [w.replace('something_usually_the_file_name.','') for w in b1.columns]

df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), data)
here is another solution by using selectExpr():

data = sqlContext.createDataFrame([("Alberto", 2), ("Dakota", 2)], 
                                  ["Name", "askdaosdka"])

# Output
#|   Name|askdaosdka|
#|Alberto|         2|
#| Dakota|         2|

# |-- Name: string (nullable = true)
# |-- askdaosdka: long (nullable = true)

df = data.selectExpr("Name as name", "askdaosdka as age")
Here are 2 other solutions, one by alias:

#Option 3. using alias, in Scala you can also use as.
from pyspark.sql.functions import col

data ="Name").alias("name"), col("askdaosdka").alias("age"))

# Output
#|   name|age|
#|Alberto|  2|
#| Dakota|  2|
Option 4. Using sqlContext.sql, which lets you use SQL queries on DataFrames registered as tables.

sqlContext.registerDataFrameAsTable(data, "myTable")
df2 = sqlContext.sql("SELECT Name AS name, askdaosdka as age from myTable")
spark.mllib contains the legacy API built on top of RDDs, it's now in maintenance mode, not in the dev mode. provides higher-level API built on top of DataFrames for constructing ML pipelines.
After reaching feature parity (roughly estimated for Spark 2.3), the RDD-based API will be deprecated.
The RDD-based API/spark.Mllib is expected to be removed in Spark 3.0.

Spark(Apache Spark) is in-momory computation engine, whereas MapReduce is in-and-out disk computation engine. Which is the main purpose for Spark to free the limitation of MapReduce.

RDD is short for resilient distributed dataset: it is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

Spark is an open source processing engine for Big Data that brings together an impressive combination of speed, ease of use and advanced analytics.

Originally developed at the University of California, Berkeley's AMPLab. Spark enables applications in Hadoop clusters to run in-memory at up to 100x faster than MapReduce, while also delivering significant speed-ups when running purely on disk. Spark SQL provides an interface for users to query their data from Spark RDDs as well as other data sources such as Hive tables, parquet files and JSON files.

Spark's APIs in Python, Scala & Java make it easy to build parallel apps. Lastly, Spark provides strong support for streaming data and complex analytics where iterative calculations are used such as in machine learning and graph algorithms - this is where Spark shines brightest. Spark's versatility has led users to call it "the swiss army knife" of processing engine platforms as users can combine all of these capabilities in a single platform and workflow.

Spark SQL is a Spark module for structured data processing. One use of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation.

The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use SparkSession.builder. Most of the following code are coming the spark repo.

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \

With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.

# spark is an existing SparkSession
df ="examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# +----+-------+

In Python it's possible to access a DataFrame's columns either by attribute (df.age) or by indexing (df['age']).
While the former is convenient for interactive data exploration,
users are highly encouraged to use the latter form, which is future proof and won't break with column names that are also attributes on the DataFrame class.

In PySpark, we recommend using "s3://" to access the EMR(Elastic MapReduce) file system(EMRFS) in EMR and "s3a://" to access S3A file system in other environments. Examples:

data_s3 ="libsvm").load("s3://some-bucket/some-prefix")
data_s3a ="libsvm").load("s3a://some-bucket/some-prefix")

Training and Hosting a K-Means Clustering model using SageMaker PySpark:
A KMeansSageMakerEstimator runs a training job using the Amazon SageMaker KMeans algorithm upon invocation of fit(), returning a SageMakerModel.

from pyspark import SparkContext, SparkConf
from sagemaker_pyspark import IAMRole, classpath_jars
from sagemaker_pyspark.algorithms import KMeansSageMakerEstimator

# Load the sagemaker_pyspark classpath. If you used --jars to submit your job
# there is no need to do this in code.
conf = (SparkConf()
        .set("spark.driver.extraClassPath", ":".join(classpath_jars())))

iam_role = "arn:aws:iam:0123456789012:role/MySageMakerRole"

region = "us-east-1"
training_data ="libsvm").option("numFeatures", "784")

test_data ="libsvm").option("numFeatures", "784")

kmeans_estimator = KMeansSageMakerEstimator(


kmeans_model =

transformed_data = kmeans_model.transform(test_data) 

 # spark, df are from the previous example
# Print the schema in a tree format
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column"name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
# +----+-----+
# | age|count|
# +----+-----+
# |null|    1|
# |  30|    1|
# +----+-----+

The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.

# Register the DataFrame as a SQL temporary view

sqlDF = spark.sql("SELECT * FROM people")
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+    

Creating Datasets via Scala:

case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS() + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS =[Person]
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class.

from pyspark.sql import Row

sc = spark.sparkContext

neighbor_df.repartition(1).write.csv(path=csv_location, mode="append/overwrite", header="true")

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = l: l.split(","))
people = p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:'pyspark.RDD' of :class:'Row'.
teenNames = p: "Name: " +
for name in teenNames:
# Name: Justin
a DataFrame can be created programmatically with three steps:
1. Create an RDD of tuples or lists from the original RDD;
2. Create the schema represented by a StructType matching the structure of tuples or lists in the RDD created in the step 1.
3. Apply the schema to the RDD via createDataFrame method provided by SparkSession.

# Import data types
from pyspark.sql.types import *

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = l: l.split(","))
# Each line is converted to a tuple.
people = p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) 
 for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

No comments:

Post a Comment

Data Science Study Notes: reinforcement learning

Terminology: State vs Action vs Policy vs Reward vs State Transition. Policy function is probabality density function(PDF), policy network:...