Thursday, January 23, 2020

Python study notes 6: Spark SQL, Pyspark tutorial examples

Here is some example code of using pyspark kernel in notebook to load hive file into python dataframe.
#===============================================================
import datetime
import os
import math
import numpy as np
import pandas as pd
#import matplotlib.pyplot as plt
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext, Row, HiveContext
from pyspark.sql.types import *
from pyspark.sql.functions import regexp_replace, regexp_extract, substring, upper, trim, levenshtein, udf, isnull, isnan, concat, datediff, to_date
from pyspark.sql.functions import when, lit, col, abs, unix_timestamp, spark_partition_id, monotonically_increasing_id,max
from pyspark.sql.functions import cos, sin, radians, atan2, sqrt, pow, levenshtein, instr, count
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, length, countDistinct
from pyspark import StorageLevel
from random import randint
from datetime import datetime
#import pprint
#from IPython.display import display
import argparse

sc = SparkContext.getOrCreate()
sc.stop()

# Create new config
conf = (SparkConf()
   .setAppName('JK_record_confidence')
   .set('spark.yarn.dist.files','file:/usr/lib/spark/python/lib/pyspark.zip,file:/usr/lib/spark/python/lib/py4j-0.10.7-src.zip')           
   .setExecutorEnv('PYTHONPATH','pyspark.zip:py4j-src.zip')
   .set("spark.driver.maxResultSize", "8g")
   .set("spark.driver.memory", "10g")
   .set("spark.executor.memory", "20g")
   .set("spark.executor.cores", "4")
   .set("spark.shuffle.service.enabled", "true")
   .set("spark.dynamicAllocation.enabled", "false")
   .set("spark.executor.instances", "100") #80
   #.set("spark.dynamicAllocation.initialExecutors", "24")
   #.set("spark.dynamicAllocation.minExecutors", "2")
   .set("spark.submit.deployMode", "client")
   .set("spark.python.worker.memory", "512")
   .set("spark.default.parallelism", "64")
   .set("spark.yarn.executor.memoryOverhead", "4g")
   .setMaster("yarn"))

# Create new context
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
start =  datetime.now()    
# create hivecontext
hc = HiveContext(sc)
filename="hdfs://dsda-cluster-m/user/hive/warehouse/jg_xref_state_cntycd_other2"
#filename="gs://dsda-dev-aux-hive-infrastructure-data/hive_data/jg_xref_state_cntycd_other2"
#filename="gs://dsda-dev-aux-hive-infrastructure-data/hive_data/jg_xref_state_cntycd_other2/000000_0.snappy.orc"
#data = orc.ORCFile(filename)

import findspark
from pyspark.sql import SparkSession

findspark.init()
#spark = SparkSession.builder.getOrCreate()
df_spark = spark.read.orc(filename)
df_spark.printSchema()
print(df_spark.count())
df_spark.limit(10).show()

df_pandas = df_spark.toPandas()
print(df_pandas.columns)
#print(df_pandas.irislandusecd_tax.value_counts())

import pickle
# open a file, where you ant to store the data
file = open('Golden_record_OC_data_1M', 'wb')
# dump information to that file
pickle.dump(df_pandas, file)
# remember to close the file, otherwise trouble
file.close()
end = datetime.now()
print(round((end - start)/60,2), "mins" ) 
print('this shit is done')
#===============================================================

In case if you got some error message like: pyspark java.lang.OutOfMemory Error: Java heap space, most likely you are download too big hive data, you might want to break that into smaller pieces to download piece by piece.

Note that the java.lang.OutOfMemoryError: GC overhead limit exceeded error, is only thrown when 2% of the memory is freed after several GC cycles. This means that the small amount of heap the GC is able to clean will likely be quickly filled again, forcing the GC to restart the cleaning process again. This forms a vicious cycle where the CPU is 100% busy with GC and no actual work can be done. End users of the application face extreme slowdowns – operations which normally complete in milliseconds take minutes to finish.

So the “java.lang.OutOfMemoryError: GC overhead limit exceeded” message is a pretty nice example of a fail fast principle in action.



Question:
spark.ml vs spark.mllib?


spark.mllib contains the legacy API built on top of RDDs, it's now in maintenance mode, not in the dev mode.
spark.ml provides higher-level API built on top of DataFrames for constructing ML pipelines.
After reaching feature parity (roughly estimated for Spark 2.3), the RDD-based API will be deprecated.
The RDD-based API/spark.Mllib is expected to be removed in Spark 3.0.



Question:
What's Spark? What's RDD? What's inside?


Answer:

Spark(Apache Spark) is in-momory computation engine, whereas MapReduce is in-and-out disk computation engine. Which is the main purpose for Spark to free the limitation of MapReduce.




RDD is short for resilient distributed dataset: it is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.



Spark is an open source processing engine for Big Data that brings together an impressive combination of speed, ease of use and advanced analytics.
Originally developed at the University of California, Berkeley's AMPLab.
Spark enables applications in Hadoop clusters to run in-memory at up to 100x faster than MapReduce, while also delivering significant speed-ups when running purely on disk. Spark SQL provides an interface for users to query their data from Spark RDDs as well as other data sources such as Hive tables, parquet files and JSON files.



Spark's APIs in Python, Scala & Java make it easy to build parallel apps. Lastly, Spark provides strong support for streaming data and complex analytics where iterative calculations are used such as in machine learning and graph algorithms - this is where Spark shines brightest. Spark's versatility has led users to call it "the swiss army knife" of processing engine platforms as users can combine all of these capabilities in a single platform and workflow.



Spark SQL is a Spark module for structured data processing. One use of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation.





The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use SparkSession.builder:
Most of the following code are coming the spark repo.

   
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.





# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# +----+-------+



In Python it's possible to access a DataFrame's columns either by attribute (df.age) or by indexing (df['age']).
While the former is convenient for interactive data exploration,
users are highly encouraged to use the latter form, which is future proof and won't break with column names that are also attributes on the DataFrame class.

In PySpark, we recommend using "s3://" to access the EMR(Elastic MapReduce) file system(EMRFS) in EMR and "s3a://" to access S3A file system in other environments. Examples:
data_s3 = spark.read.format("libsvm").load("s3://some-bucket/some-prefix")
data_s3a = spark.read.format("libsvm").load("s3a://some-bucket/some-prefix")
   

Training and Hosting a K-Means Clustering model using SageMaker PySpark:
A KMeansSageMakerEstimator runs a training job using the Amazon SageMaker KMeans algorithm upon invocation of fit(), returning a SageMakerModel.

from pyspark import SparkContext, SparkConf
from sagemaker_pyspark import IAMRole, classpath_jars
from sagemaker_pyspark.algorithms import KMeansSageMakerEstimator

# Load the sagemaker_pyspark classpath. If you used --jars to submit your job
# there is no need to do this in code.
conf = (SparkConf()
        .set("spark.driver.extraClassPath", ":".join(classpath_jars())))
SparkContext(conf=conf)

iam_role = "arn:aws:iam:0123456789012:role/MySageMakerRole"

region = "us-east-1"
training_data = spark.read.format("libsvm").option("numFeatures", "784")
  .load("s3a://sagemaker-sample-data-{}/spark/mnist/train/".format(region))

test_data = spark.read.format("libsvm").option("numFeatures", "784")
  .load("s3a://sagemaker-sample-data-{}/spark/mnist/train/".format(region))

kmeans_estimator = KMeansSageMakerEstimator(
    trainingInstanceType="ml.m4.xlarge",
    trainingInstanceCount=1,
    endpointInstanceType="ml.m4.xlarge",
    endpointInitialInstanceCount=1,
    sagemakerRole=IAMRole(iam_role))

kmeans_estimator.setK(10)
kmeans_estimator.setFeatureDim(784)

kmeans_model = kmeans_estimator.fit(training_data)

transformed_data = kmeans_model.transform(test_data)
transformed_data.show() 





 # spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |null|    1|
# |  30|    1|
# +----+-----+



The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.





# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+    




Creating Datasets via Scala:

case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+




Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class.


from pyspark.sql import Row

sc = spark.sparkContext

neighbor_df.repartition(1).write.csv(path=csv_location, mode="append/overwrite", header="true")

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:'pyspark.RDD' of :class:'Row'.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)
# Name: Justin
a DataFrame can be created programmatically with three steps: 1. Create an RDD of tuples or lists from the original RDD; 2. Create the schema represented by a StructType matching the structure of tuples or lists in the RDD created in the step 1. 3. Apply the schema to the RDD via createDataFrame method provided by SparkSession.

# Import data types
from pyspark.sql.types import *

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) 
 for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()
How do we change the columns for the Pyspark dataframe? We can't do as what we usually do for python dataframe, simply using df.columns=(...) to rename the columns, it's not working that way. Here are several solutions, here is the most easy one via reduce() and withColumnRenamed statement:

b1 = spark.read.format("com.databricks.spark.csv").option("header","true").option("delimiter","|").load('gs://folder/****_extract_20190524.csv')
b1.limit(5).toPandas()
b1.printSchema()
b1.columns

oldColumns = b1.schema.names
newColumns = [w.replace('something_usually_the_file_name.','') for w in b1.columns]

data=b1
df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), data)
df.printSchema()
df.show()
here is another solution by using selectExpr():

data = sqlContext.createDataFrame([("Alberto", 2), ("Dakota", 2)], 
                                  ["Name", "askdaosdka"])
data.show()
data.printSchema()

# Output
#+-------+----------+
#|   Name|askdaosdka|
#+-------+----------+
#|Alberto|         2|
#| Dakota|         2|
#+-------+----------+

#root
# |-- Name: string (nullable = true)
# |-- askdaosdka: long (nullable = true)

df = data.selectExpr("Name as name", "askdaosdka as age")
df.show()
df.printSchema()
Here are 2 other solutions, one by alias:

#Option 3. using alias, in Scala you can also use as.
from pyspark.sql.functions import col

data = data.select(col("Name").alias("name"), col("askdaosdka").alias("age"))
data.show()

# Output
#+-------+---+
#|   name|age|
#+-------+---+
#|Alberto|  2|
#| Dakota|  2|
#+-------+---+
Option 4. Using sqlContext.sql, which lets you use SQL queries on DataFrames registered as tables.

sqlContext.registerDataFrameAsTable(data, "myTable")
df2 = sqlContext.sql("SELECT Name AS name, askdaosdka as age from myTable")

df2.show()
#pyspark drop duplicates:
df1=df(['name', 'height']).drop_duplicates(inplace=True)

No comments:

Post a Comment

GCP Study Notes 4: GCP Big Data and Machine Learning Fundamentals (coursera notes)

In this lab, you will: Create Cloud SQL instance Create database tables by importing .sql files from Cloud Storage Populate the tables b...