Commit b90f08ef authored by sashika sewwandi's avatar sashika sewwandi

Upload New File

parent ee6f9195
from __future__ import print_function
import twint
import sys
import json
import boto3
import traceback
import snowflake.connector
from boto3.s3.transfer import S3Transfer
class Twitter:
def __init__(self):
self.page_link = None
self.zip_file_path = None
self.download_url = None
self.file = None
def create_download_url(self):
try:
c = twint.Config()
c.Search = [self.key_word] # topic
c.Limit = self.data_size # number of Tweets to scrape
c.Store_csv = True # store tweets in a csv file
c.Output = self.output_file # path to csv file
twint.run.Search(c)
except Exception as e:
print("Can't scrape data from this site:", self.page_link)
print(e)
sys.exit(-1)
else:
print("data extraction process is success")
def invoke(self):
self.metadata_setup("twitter_config")
self.create_download_url()
self.s3_data_transfer(self.output_file)
self.connect_to_snowflake()
self.load_data()
def metadata_setup(self, cong_file):
"""
Setup metadata for the application
:return: set of global variables
"""
try:
if cong_file == 'twitter_config':
data = {
"key_word": "Avengers",
"access_token": "",
"data_size": 100,
"output_file": "Twitter/Avengers3.csv",
"snowflake": {
"Account": "wg33245.us-east-2",
"data_warehouse": "COMPUTE_WH",
"username": "sasika2023",
"password": "Sanjaya@#3171",
"role": "ACCOUNTADMIN",
"database": "FINALYEARRESEARCH",
"schema": "ORIGINAL",
"table_name": "ORIGINAL_DATA"
},
"aws": {
"aws_access_key_id": "AKIAZV24JW4ZIG3RWJS2",
"aws_secret_access_key": "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",
"bucket": "orginaldata"
}
}
else:
data = {
"base_url": "http://www.values.com/inspirational-quotes",
"output_file": "quotes.csv",
"snowflake": {
"Account": "wg33245.us-east-2",
"data_warehouse": "COMPUTE_WH",
"username": "sasika2023",
"password": "Sanjaya@#3171",
"role": "ACCOUNTADMIN",
"database": "FINALYEARRESEARCH",
"schema": "ORIGINAL",
"table_name": "ORIGINAL_DATA"
},
"aws": {
"aws_access_key_id": "AKIAZV24JW4ZIG3RWJS2",
"aws_secret_access_key": "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",
"bucket": "orginaldata"
}
}
self.output_file = data["output_file"]
# twitter configuration
if 'twitter' in cong_file:
self.key_word = data["key_word"]
self.access_token = data["access_token"]
self.data_size = data["data_size"]
elif 'quotes' in cong_file:
self.base_url = data["base_url"]
# snowflake configuration
self.Account = data["snowflake"]["Account"]
self.data_warehouse = data["snowflake"]["data_warehouse"]
self.username = data["snowflake"]["username"]
self.password = data["snowflake"]["password"]
self.role = data["snowflake"]["role"]
self.database = data["snowflake"]["database"]
self.schema = data["snowflake"]["schema"]
self.table_name = data["snowflake"]["table_name"]
# aws configuration
self.access_key = data["aws"]["aws_access_key_id"]
self.secret_key = data["aws"]["aws_secret_access_key"]
self.bucket = data["aws"]["bucket"]
except Exception as e:
print(e)
sys.exit(-1)
else:
print("Metadata configuration success")
def s3_data_transfer(self, file):
"""
Extract files will be uploaded to aws bucket
:param file: file which needs to be uploaded
:return: No return values
"""
try:
client = boto3.client('s3',
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
)
s3 = boto3.resource('s3')
transfer = S3Transfer(client)
transfer.upload_file(file, self.bucket, file)
except Exception as e:
print(e)
sys.exit(-1)
else:
print("Data successfully load to s3 bucket")
def connect_to_snowflake(self):
# Connect to snowflake database
try:
self.con = snowflake.connector.connect(
user='sash95',
password='Sash@123##',
account='bj40190.ap-southeast-1',
role='ACCOUNTADMIN',
warehouse='COMPUTE_WH'
)
except Exception as e:
print(e)
sys.exit(1)
# Create DDL Statements
try:
data_warehouse = 'USE warehouse {WH}'.format(WH='COMPUTE_WH')
self.con.cursor().execute(data_warehouse)
database = 'USE database {db}'.format(db='FinalYearResearch')
self.con.cursor().execute(database)
schema = 'USE schema {schema}'.format(schema='ORIGINAL')
self.con.cursor().execute(schema)
self.table = """
create or replace table "FINALYEARRESEARCH"."ORIGINAL"."ORIGINAL_DATA" as
select t.$1 as id, t.$2 as conversation_id,t.$3 as created_at, t.$4 as date,t.$5 as time, t.$6 as timezone,t.$7 as user_id
from @orginal_data_stage(file_format => 'csv_with_header') t;
"""
self.file_format = '''
create or replace file format csv_with_header
TYPE = CSV
COMPRESSION = AUTO
FIELD_DELIMITER = ','
SKIP_HEADER = 1
TIMESTAMP_FORMAT = AUTO
TRIM_SPACE = TRUE
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
VALIDATE_UTF8 = TRUE
ENCODING = 'WINDOWS1252'
;'''
self.create_stage = '''
CREATE OR REPLACE STAGE orginal_data_stage URL='s3://orginaldata/Twitter/'
CREDENTIALS=(AWS_KEY_ID='AKIAZV24JW4ZIG3RWJS2' AWS_SECRET_KEY='PsSVN+gb52UyMcryPSaQ7sBcjiPecFWtWRdsYTw0')
FILE_FORMAT = csv_with_header;
'''
self.con.cursor().execute(self.file_format)
self.con.cursor().execute(self.create_stage)
self.con.cursor().execute(self.table)
except snowflake.connector.ProgrammingError as e:
print(e)
sys.exit()
except KeyError:
print(traceback.format_exc())
sys.exit()
except Exception as e:
print(e)
sys.exit()
else:
print("successfully created the table")
def load_data(self):
try:
self.con.cursor().execute(''' copy into "FINALYEARRESEARCH"."ORIGINAL"."ORIGINAL_DATA"
from (select $1, $2,$3, $4,$5, $6,$7 from @orginal_data_stage)
file_format = (format_name = csv_with_header)
on_error = 'skip_file';
''')
except Exception as e:
print(e)
sys.exit()
else:
print('data successfully loaded')
if __name__ == '__main__':
custom_driver_params = 'Create a new file WebScraping Twitter data'
new_file = Twitter()
new_file.invoke()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment