Saturday, August 28, 2021

AWS Study notes: AWS Glue data

AWS Glue usage:
1. to build a data warehouse to organize, clease,validate,and format data.
2. when you run serverless queries against Amazon S3 data lake
3. when you want to create event-driven ETL(extract,transform,load) pipelines.
4. to understand your datasets.
#Step 1: Crawlers: connect to a data store processes through a prioritied list of classifiers to determine the schema for your data, then create metadata in your data catalog. For example, you can create a new crawler that can crawl the s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv file, and can place the resulting metadata into a database named payments in the AWS Glue Data Catalog.


#Step 2: Add Boilerplate Script to the Development Endpoint Notebook 
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

#Step 3: Compare Different Schema Parsings 
medicare = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").load(
   's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv')
medicare.printSchema()

medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
       database = "payments",
       table_name = "medicare")
medicare_dynamicframe.printSchema()

medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
medicare_res.printSchema()

medicare_res.toDF().where("'provider id' is NULL").show()
medicare_dataframe = medicare_res.toDF()
medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")

#Step 4: Map the Data and Use Apache Spark Lambda Functions 
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

chop_f = udf(lambda x: x[1:], StringType())
medicare_dataframe = medicare_dataframe.withColumn(
        "ACC", chop_f(
            medicare_dataframe["average covered charges"])).withColumn(
                "ATP", chop_f(
                    medicare_dataframe["average total payments"])).withColumn(
                        "AMP", chop_f(
                            medicare_dataframe["average medicare payments"]))
medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
from awsglue.dynamicframe import DynamicFrame
medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested")
medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'),
                 ('provider id', 'long', 'provider.id', 'long'),
                 ('provider name', 'string', 'provider.name', 'string'),
                 ('provider city', 'string', 'provider.city', 'string'),
                 ('provider state', 'string', 'provider.state', 'string'),
                 ('provider zip code', 'long', 'provider.zip', 'long'),
                 ('hospital referral region description', 'string','rr', 'string'),
                 ('ACC', 'string', 'charges.covered', 'double'),
                 ('ATP', 'string', 'charges.total_pay', 'double'),
                 ('AMP', 'string', 'charges.medicare_pay', 'double')])
medicare_nest_dyf.printSchema()

#Step 5: Write the Data to Apache Parquet 
glueContext.write_dynamic_frame.from_options(
       frame = medicare_nest_dyf,
       connection_type = "s3",
       connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"},
       format = "parquet")
Data Catalog: Table definition, job definition.
Classifier: schema of the data.
Connection: JDBC connection.

No comments:

Post a Comment

Data Science Study Notes: recommendation engine notes 1: Deep matrix factorization using Apache MXNet

Deep matrix factorization using Apache MXNet ( notes from Oreilly , github notebook ) Recommendation engines are widely used models th...