Monday, February 1, 2021

GCP Study notes: Biquery API SQL examples

Use bigquery to download/upload data from/to pyspark:
#===============================================================
from util.BigqueryHandler import BigqueryHandler

spark = sparkSession.getOrCreate()

project_id = 'project_id_name'
tmp_gcs_bucket = 'bucket_name'
#sa_file='bq_service.json'
        
bq = BigqueryHandler(project_id=project_id, spark_session=spark, tmp_gcs_bucket=tmp_bucket, sa=sa_file)

# whatever dataset you want
dataset = 'database1'

# whatever var name and table you want - returns a spark dataframe
data1 = bq.read_bq_table_spark(dataset=data_schema1, table='table_name1')

# WRITE BQ
# data1 is your spark dataframe
bq.write_bq_table_spark(df=data1, dataset=data_schema2, table='table_name2')

#Save the previous file as test1.py, then in the command line run:  
spark-submit --py-files util.zip --jars spark-bigquery-with-dependencies_2.11-0.17.2.jar test1.py

#if requres special service account permission: 
spark-submit --py-files util.zip --jars spark-bigquery-with-dependencies_2.11-0.17.2.jar \
             --files piq_service.json test1.py

#here is another direct way to load/save data:
df = spark.read.format('bigquery')\
          .option('table', '{pid}:{ds}.{tbl}'.format(pid=project_id, ds=dataset, tbl=table))\
          .option('maxParallelism', '56')\
          .option('viewMaterializationDataset', 'database1')\
          .option('viewsEnabled', 'True')\
     #    .option('parentProject', project_id)\
     #    .option('credentialsFile', sa_file)\
          .load()

#Reading from views is disabled by default. In order to enable it, 
#either set the viewsEnabled option when reading the specific view (.option("viewsEnabled", "true")) 
#or set it globally by calling spark.conf.set("viewsEnabled", "true").

#you might get error message for the no permission to bigquery.create 
#    when you access the views from the tables. 
#BigQuery views are not materialized by default, which means that 
#    the connector needs to materialize them before it can read them.
#so you need to add the code: .option('viewMaterializationDataset', 'database1')\

#By default, the materialized views are created in the same project and dataset. 
#Those can be configured by: viewMaterializationProject and viewMaterializationDataset options. 
#These options can also be globally set by calling spark.conf.set(...) before reading the views.

df.createOrReplaceTempView("df_view")
df1=spark.sql(''' select var1,var2, case when ... from df where ...''')
#===============================================================
#defined a fuction to load data.
def read_bq_table_spark(dataset, table):        
        df = spark.read.format('bigquery')\
                    .option('table', '{pid}:{ds}.{tbl}'.format(pid=project_id, ds=dataset, tbl=table))\
                    .option('maxParallelism', '56')\
                    .option('viewsEnabled', 'True')\
                    .option('viewMaterializationDataset', 'stg_property_panoramiq')\
                    .option('parentProject', self.project_id)\
                    .option('credentialsFile', self.sa)\
                    .load()                          
        return df
        
#use sql to select subset in bigquery connector, need updated version for bigquery connector         
#run the following from the  cluster command line: 
pyspark --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.20.0.jar

#if you are using older version: 0.17.3, you migth get those error message: 
#Caused by: java.io.InvalidClassException: com.google.cloud.spark.bigquery.SparkBigQueryConfig; 
#local class incompatible: stream classdesc serialVersionUID = -3988734315685039601,
#then you need to go to the github host to download the new verion jar, and upload to the server. 

spark.conf.set("viewsEnabled","true")
#spark.conf.set("viewMaterializationDataset", 'stg_property_panoramiq')
#spark.conf.set("materializationDataset", 'stg_property_panoramiq')

sql = """
  SELECT tag, COUNT(*) c
  FROM (
    SELECT SPLIT(tags, '|') tags
    FROM `bigquery-public-data.stackoverflow.posts_questions` a
    WHERE EXTRACT(YEAR FROM creation_date)>=2014
  ), UNNEST(tags) tag
  GROUP BY 1
  ORDER BY 2 DESC
  LIMIT 10
  """
df = spark.read.format("bigquery")\
.option('maxParallelism', '56')\
.option('viewsEnabled', 'True')\
.option('viewMaterializationDataset', 'stg_property_panoramiq')\
.option('parentProject', project_id)\
.load(sql)

df.show()

# second option is to use the query option like this:

df = spark.read.format("bigquery").option("query", sql).load()
#-----------------------------------------------------------


#defined a fuction to save data back to bigqery.
def write_bq_table_spark(df, dataset, table):

    df.write.mode('overwrite')\
            .format('bigquery')\
            .option('parentProject', self.project_id)\
            .option('credentialsFile', self.sa)\
            .option('table', '{pid}:{ds}.{tbl}'.format(pid=self.project_id, ds=dataset, tbl=table))\
            .option('TemporaryGcsBucket', tmp_gcs_bucket)\
            .save()
 
#note we are trying to upload a pyspark dataframe to bigquery, 
#if you have a hive data, you might need to load the hive table first: 

temp=spark.sql('''select * from {hive_schema}.{table_name} '''.format(hive_schema=schema1,table_name='table1'))
bq.write_bq_table_spark(df=temp, dataset=schema1, table='table1')

In case if you are using GCP bigquery to prepare dataset, here are some examples:

#cumulative summation example in bigquery SQL: 
#1. group by each layer, also count the total.
#2. cummulative summation for each layer.

drop table if exists `prodjectid1.schema1.data_OCSD1`;
create table `prodjectid1.schema1.data_OCSD1` as
SELECT a.cntycd,number_of_stories,b.cnt_cnty,count(*) as cnt  FROM `prodjectid1.schema1.data_OCSD0` as a 

left join ( SELECT cntycd,count(*) as cnt_cnty  FROM `prodjectid1.schema1.data_OCSD10`
           where  living_area_all_buildings<3000 group by cntycd ) as b

on a.cntycd=b.cntycd
where  living_area_all_buildings<3000
group by cntycd,number_of_stories,b.cnt_cnty
order by cntycd,number_of_stories,cnt desc;

--#############################################
#with statement to join with multiple SQL: 
with b as (select situscensid,count(*) as cnt from `project_id1.Dataschema1.table_name1` as a 
where   storiesnbr=2 and zip5='21213' and year5=1915
group by 1)

select a.situscensid,b.cnt,count(*) as cnt1  from `project_id1.Dataschema1.table_name1` as a 

inner join b  on a.situscensid=b.situscensid

where  sumnbrbdrm =2 and zip5='68516' and sumnbrbath=4
group by 1,2
order by 3 desc
limit 20;

--#############################################
--transform row to column in bigquery: 
CALL fhoffa.x.pivot(
  'bigquery-public-data.iowa_liquor_sales.sales' # source table
  , 'fh-bigquery.temp.test' # destination table
  , ['date'] # row_ids
  , 'store_number' # pivot_col_name
  , 'sale_dollars' # pivot_col_value
  , 30 # max_columns
  , 'SUM' # aggregation
  , 'LIMIT 10' # optional_limit
);


#use cast to convert to string in cumulative sum, otherwise error message. 
SELECT a.cntycd,cnt_cnty,cast(number_of_stories as string) as stories,cnt,
SUM(cnt)
OVER
  (PARTITION BY cntycd,cnt_cnty
  ORDER BY number_of_stories ) AS running_sum
from `prodjectid1.schema1.data_OCSD1` as a;
#partition by should be the same across the cummulative sum. 

--check the column data type without clicking the data: 
SELECT * FROM  `project_id1.Dataschema1.INFORMATION_SCHEMA.COLUMNS
WHERE  table_name="table_name_c";

#use Bigquery API select statement via unnest for record row: 
SELECT *
FROM `firebase-public-project.analytics_153293282.events_20180915`
WHERE event_name = "level_complete_quickplay"
LIMIT 10

#You might get error: cannot access field key  on a value with type
SELECT event_name, param
FROM `firebase-public-project.analytics_153293282.events_20180915`,
UNNEST(event_params) AS param
WHERE event_name = "level_complete_quickplay"
AND param.key = "value"

SELECT event_name, param.value.int_value AS score
FROM `firebase-public-project.analytics_153293282.events_20180915`,
UNNEST(event_params) AS param
WHERE event_name = "level_complete_quickplay"
AND param.key = "value"

drop table if exists `project_id1.Dataschema1.table_name1`;
create table `project_id1.Dataschema1.table_name1`
as select distinct var1,var2
from `project_id1.Dataschema1.table_name_a` as a 
inner join `project_id1.Dataschema1.table_name_b`  as trans
on  cast(LPAD(cast(a.cntycd as string), 5, '0') as string)=trans.cntycd ;
--paddle/replace with leading 0 in a string.  


select id1, cnlegaltxt 
from `prodjectid1.schemat1.michale_property` p 
inner join `prodjectid2.schemat2.sam_sdp_master_xref` x 
on x.datasource_pid1_value = p.cntycd and x.datapid2_value= p.pclid and x.datapid3_value =cast(p.pclseqnbr as string)  
and x.datasource_name='tax'
where 
REGEXP_CONTAINS(lower(p.cntylegaltxt), r'single family')   
-- REGEXP_CONTAINS(lower(p.cntylegaltxt), r'condo') or REGEXP_CONTAINS(lower(p.cntylegaltxt), r'condominium')
--where LENGTH (cntylegaltxt) > 5000
limit 50;
#REGEXP_MATCH is not working

using different function in bigquery to convert to datetime: 
SELECT 
timestamp_millis(effectivedt) as date_from_millisecond, --use this one most of time
TIMESTAMP_SECS(effectivedt) as date_from_seconds,
FROM `c***-poc-****.df.seller` 
where effectivedt is not null LIMIT 10;

No comments:

Post a Comment

Python Study notes: how do we use Underscore(_) in Python

You will find max six different uses of underscore(_) . If you want you can use it for different purposes after you have an idea about unde...