Sunday, February 16, 2020

GCP study notes -3: upload download file from gcp in python

#===========================================================
import pickle
import os
import pandas as pd
import numpy as np
import scipy as sp
import datetime
from datetime import date
from datetime import timedelta
import _pickle as cPickle

print(pd.__version__)
pd.set_option('mode.chained_assignment', None)
pd.set_option('display.max_columns', 500)  
pd.set_option('display.max_rows', 300)
pd.set_option('display.max_colwidth', 250)
pd.set_option('display.width', 125)

###################################
#load the parquet saved file into python dataframe
def import_hive(data0):

    prefix0=data0
    prefix1='hive_data/'+prefix0+'/'
    import pyarrow.parquet as pq
    from io import BytesIO        
    from google.cloud import storage
    storage_client = storage.Client()   
    bucket = storage_client.get_bucket("e**-d**-data")
    blobs = storage_client.list_blobs(bucket, prefix=prefix1, delimiter=None) 
    temp0=pd.DataFrame(index=range(1))[0:0]
    for blob in blobs:
        blob1=blob.name
        blob_name = bucket.blob(blob1)    
        print(blob_name)
        content = blob_name.download_as_string()
        if len(content)>=1:
            test1 = pq.read_table(BytesIO(content)).to_pandas()
            #print('the size of one file is ', test1.shape[0])
            temp0=temp0.append(test1)
    
    return temp0

temp0=import_hive('your_hive_parquet_data')
temp0.head()
temp0.shape

#upload to gcs from the master node: 
###################################
from google.cloud import storage
storage_client = storage.Client()   
bucket = storage_client.get_bucket("ds-python-users")
blob_name = "folder/data_in/filename1"
blob = bucket.blob(blob_name)    
print(blob)
blob.upload_from_filename('filename1')

#################################
#download a file from gcs bucket
#################################
from google.cloud import storage
storage_client = storage.Client()   
bucket = storage_client.get_bucket("ds-python-users")
blob_name = "folder/data_in/filename1"
blob = bucket.blob(blob_name)    
#blob = bucket.blob(model.replace(blob_name, ''))
blob.download_to_filename('filename1')
print("done")

content = blob.download_as_string()
test1 = pd.read_csv(BytesIO(content))
test1 = pq.read_table(BytesIO(content))
#test1 = pq.read_table(BytesIO(content)).to_pandas()

#################################
#load a pickle file to python session
###################################
file1='filename1'    
file = open(file1, 'rb')
State_list=cPickle.load(file)
file.close() 
State_list.shape
State_list.tail()

State_list.columns=['state','puid_count']
list_state0=State_list.state
list_state0[0:5]

#================================================
#sample python code running in gcp, save and read csv: 

state1.to_csv(path_or_buf="gs:/****-users/user/Record_*****/raw_output/temp_data_check_data.csv",index=False,header=True)

ca1 = pd.read_csv("gs://edg-***-users/****/test_CA",sep=',',delimiter=',',dtype=object,encoding = "ISO-8859-1")
#in case if there is error message: no module gcsfs, you can open a terminal to install:
pip install --user gcsfs
#then restart the kernel to run the pd.read_csv()
#===========================================================

###############################################
#we can load the pickle file from the cluster/vm
#but not load directly from the gcs bucket
###############################################
file = open('file1', 'rb') #opening the file in read-binary (rb) 
dfa = pickle.load(file)
file.close()  
dfa.shape

###############################
#break the data by each county
###############################
cwd = os.getcwd()
cwd1 = os.path.join(cwd,'datain/')
cwd2 = os.path.join(cwd,'dataout/')
cwd3 = os.path.join(cwd,'data_log/')
    
m=0    
for state1 in list_state2:
    m=m+1
    state_start = datetime.datetime.now()
    date_start=datetime.datetime.now().date()
    temp0=df0.loc[(df0.state==state1)]      
    directory1=cwd1+str(state1)
    directory2=cwd2+str(state1)
    #temp0.reset_index(drop=True, inplace=True)
    
    try:
       os.makedirs(directory1)
       os.makedirs(directory2)
    except FileExistsError:
    # directory already exists
       pass 

    list_cntycd0=temp0.groupby(['cntycd'],as_index=False).agg({"puid": 'count'})
                      .sort_values(['puid'],ascending=True) 
    list_cntycd0.tail()
    list_cntycd1=list_cntycd0[(list_cntycd0.puid>=50) 
      & (list_cntycd0.cntycd.notna()) & (list_cntycd0.cntycd!='nan') ]
    list_cntycd1.shape
    list_cntycd2=list_cntycd1.cntycd
    
    for cntycd1 in list_cntycd2:
        temp1=temp0.loc[(temp0.cntycd==cntycd1)]     
        file1=directory1+'/cnty_'+cntycd1    
        file = open(file1, 'wb')
        cPickle.dump(temp1, file, protocol=4)
        file.close()
    time_end = datetime.datetime.now()
    time_mins=(time_end-state_start)
    print("round", m, " for state:", state1, " : taking: ", time_mins, 
          ' seconds of size data: ', temp0.shape[0], ' rows')    
#===========================================================
Some more tricky way to load hadoop hive orc data in gcs bucket to python session:
###############################################
DROP TABLE IF EXISTS default.data_01_txt;
CREATE TABLE default.data_01_txt
row format delimited fields terminated by ','
stored as TEXTFILE
tblproperties("compress"="SNAPPY")
as select a.*
from data0 a 
where state='01'
;

DROP TABLE IF EXISTS default.data_01_parquet_snappy;
CREATE TABLE default.data_01_parquet_snappy
stored as PARQUET 
tblproperties("compress"="SNAPPY")
as select a.*
from data0 a 
where state='01'
;

DROP TABLE IF EXISTS default.data_01_parquet_zip;
CREATE TABLE default.data_01_parquet_zip
stored as PARQUET 
tblproperties("compress"="GZIP")
as select a.*
from data0 a 
where state='01'
;

#then we can read them directly from python session via:
from io import BytesIO        
from google.cloud import storage
storage_client = storage.Client()   
bucket = storage_client.get_bucket("bucket-data")
blob_name = "hive_data/data_01_txt/000000_0"
#note those are the partitioned files in the gcp bucket
#blob_name = "hive_data/data_01_txt/000001_0"
blob = bucket.blob(blob_name)    
print(blob)
content = blob.download_as_string()
test1 = pd.read_csv(BytesIO(content))
test1.head()

#2nd working way, using pyarrow.parquet directly:
import pyarrow.parquet as pq
from io import BytesIO        
from google.cloud import storage
storage_client = storage.Client()   
bucket = storage_client.get_bucket("bucket1-data")
blob_name = "hive_data/data_01_parquet_snappy/000002_0"
blob = bucket.blob(blob_name)    
print(blob)
content = blob.download_as_string()
test1 = pq.read_table(BytesIO(content))
#test1 = pq.read_table(BytesIO(content)).to_pandas()
test2 = test1.to_pandas()
test2.head()
test2.shape

#3rd way, using panda directly:
bkt="gcs://bucket-data/hive_data/data_01_parquet_zip/000000_0"
df0=pd.read_parquet(bkt)
#===========================================================

Now you have create those multiple partitioned file in hive/hadoop in parquet format, we didn't use orc(Optimized Row Columnar) format here due to the trouble later when loading the data in python session via pyarrow.parquet, we can't load those orc file, but we can load them via parquet or textfile.
#===========================================================
m=0 
for state1 in State_list1[0:2]:
    m=m+1
    state_start = datetime.datetime.now()
    log.info('-----------Start grab data for state: '+state1)
    import pyarrow.parquet as pq
    from io import BytesIO        
    from google.cloud import storage
    storage_client = storage.Client()
    
    state0=pd.DataFrame(index=range(1))[0:0]
    bucket = storage_client.get_bucket("bucket1-data")
    prefix1='hive_data/data_'+state1+'/'
    blobs = storage_client.list_blobs(bucket, prefix=prefix1, delimiter=None) 
    #blobs = storage_client.list_blobs(bucket, prefix='hive_data/data_01_parquet_snappy', delimiter=None) 
    #if try to load one partitioned orc file, you will not get the normal output. 
    for blob in blobs:
        blob1=blob.name
        blob_name = bucket.blob(blob1)    
        #print(blob_name)
        content = blob_name.download_as_string()
        if len(content)>=1:
            test1 = pq.read_table(BytesIO(content)).to_pandas()
            #print('the size of one file is ', test1.shape[0])
            state0=state0.append(test1)

    state0.head()
    state0.shape
    state0.columns
#===========================================================



No comments:

Post a Comment

GCP Study notes 13: Architecting with Google Kubernetes Engine: Foundations (courseRA notes)

Architecting with Google Compute Engine Specialization : 4 Courses in this Specialization. 1. Google Cloud Platform Fundamentals: Core In...