Commit ee6f9195 authored by sashika sewwandi's avatar sashika sewwandi

Upload New File

parent bb95304d
import sys
import traceback
import snowflake.connector
class MICROSERVICES:
def connect_to_snowflake(self):
# Connect to snowflake database
try:
self.con = snowflake.connector.connect(
user='sash95',
password='Sash@123##',
account='bj40190.ap-southeast-1',
role='ACCOUNTADMIN',
warehouse='COMPUTE_WH'
)
except Exception as e:
print(e)
sys.exit(1)
# Create DDL Statements
try:
data_warehouse = 'USE warehouse {WH}'.format(WH='COMPUTE_WH')
self.con.cursor().execute(data_warehouse)
database = 'USE database {db}'.format(db='FinalYearResearch')
self.con.cursor().execute(database)
schema = 'USE schema {schema}'.format(schema='Microservices')
self.con.cursor().execute(schema)
self.table = """
create or replace table "FINALYEARRESEARCH"."MICROSERVICES"."Microservice_Data" as
select t.$1 as id, t.$2 as conversation_id,t.$3 as created_at, t.$4 as date,t.$5 as time, t.$6 as timezone,t.$7 as user_id
from @MICROSERVICES_DATA_STAGE(file_format => 'csv_with_header') t;
"""
self.file_format = '''
create or replace file format csv_with_header
TYPE = CSV
COMPRESSION = AUTO
FIELD_DELIMITER = ','
SKIP_HEADER = 1
TIMESTAMP_FORMAT = AUTO
TRIM_SPACE = TRUE
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
VALIDATE_UTF8 = TRUE
ENCODING = 'WINDOWS1252'
;'''
self.create_stage = '''
CREATE OR REPLACE STAGE Microservices_data_stage URL='s3://microservicesdb/Twitter/'
CREDENTIALS=(AWS_KEY_ID='AKIAZV24JW4ZIG3RWJS2' AWS_SECRET_KEY='PsSVN+gb52UyMcryPSaQ7sBcjiPecFWtWRdsYTw0')
FILE_FORMAT = csv_with_header;
'''
self.con.cursor().execute(self.file_format)
self.con.cursor().execute(self.create_stage)
self.con.cursor().execute(self.table)
except snowflake.connector.ProgrammingError as e:
print(e)
sys.exit()
except KeyError:
print(traceback.format_exc())
sys.exit()
except Exception as e:
print(e)
sys.exit()
else:
print("successfully created the table")
def load_data(self):
try:
self.con.cursor().execute(''' copy into "FINALYEARRESEARCH"."MICROSERVICES"."Microservice_Data"
from (select $1, $2,$3, $4,$5, $6,$7 from @MICROSERVICES_DATA_STAGE)
file_format = (format_name = csv_with_header)
on_error = 'skip_file';
''')
except Exception as e:
print(e)
sys.exit()
else:
print('data successfully loaded')
if __name__ == '__main__':
new_file = MICROSERVICES()
new_file.connect_to_snowflake()
new_file.load_data()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment