### How to set up and run
##### Create virtual environment
###### Windows
py -3 -m venv <name of environment>
###### Linux/MaxOS
python3 -m venv <name of environment>
##### Activate virtual environment
###### Windows
<name of environment>\Scripts\activate
###### Linux/MaxOS
. <name of environment>/bin/activate
##### Install required libraries
pip install -r requirements.txt
##### Run app locally
"User-Agent": 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'
'Chrome/50.0.2661.102 Safari/537.36 '
import csv
import requests
from bs4 import BeautifulSoup
from config import HEADER
# html tags need to be scrapped
TAGS = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'div', 'span', 'td', 'li', 'a']
# keywords csv file path
KEYWORDS_PATH = 'keywords.csv'
# init KEYWORDS global dictionary to store keywords and their respective weight
KEYWORDS = dict()
def test_with_bs4(url):
response = requests.get(url, headers=HEADER)
soup = BeautifulSoup(response.text, "html.parser")
file = open("test.html", "w+")
def load_keywords():
# access global KEYWORDS
KEYWORDS = dict()
# read csv file
with open(KEYWORDS_PATH) as csv_file:
# init csv reader
csv_reader = csv.reader(csv_file, delimiter=',')
line_count = 0
for row in csv_reader:
# header row
if line_count == 0:
print(f'Column names are {", ".join(row)}')
line_count += 1
# other rows
KEYWORDS[row[0]] = int(row[1])
line_count += 1
print(f'Processed {line_count} lines.')
def scrapping_words(url):
# init word dictionary
words = dict()
# download html page of the website
response = requests.get(url, headers=HEADER)
# parse with bs4
soup = BeautifulSoup(response.text, "html.parser")
# iterate through html tags
for tag in TAGS:
# find all inner texts for the tag
rows = soup.find_all(tag)
# iterate through all rows found related to the given tag
for row in rows:
# inner text to lower
sentence = row.get_text().lower()
# keep only alphabet
sentence = ''.join(x for x in sentence if x.isalpha() or x == ' ')
# split into words
array = sentence.split(' ')
# cleaning array
modified_array = [e.strip() for e in array if len(e.strip()) > 0]
# iterate through each word
for word in modified_array:
# if word not exists in dict add it
if word not in words.keys():
words[word] = 0
# increase count by 1
words[word] += 1
# return scrapped words from the given webpage
return words
def calculate_score(words):
# init total score to zero initially
total_score = 0
# iterate through scrapped words
for word, frequency in words.items():
# if scrapped word exists in keywords
if word in KEYWORDS.keys():
# multiply weight of the keyword by frequency and add it to total score
total_score += (KEYWORDS[word] * words[word])
return total_score
if __name__ == "__main__":
scrapped_words = scrapping_words("")
score = calculate_score(scrapped_words)
print(f"Score = {score}")
# if score > 0:
# print("========> POSITIVE")
# else:
# print("========> NEGATIVE")
web: gunicorn app:app
from flask import Flask, jsonify
from flask_cors import CORS
from flask_apscheduler import APScheduler
from model import schedule_model_training, is_training, CURRENCIES
from web_scrapping import get_sentiment
app = Flask(__name__)
cors = CORS(app, resources={r"/crypto-currency/*": {"origins": "*"}})
app.config['CORS_HEADERS'] = 'Content-Type'
scheduler = APScheduler()
schedule model re-training
scheduler.add_job(id='Scheduled Task', func=schedule_model_training, trigger="interval", seconds=3600)
@app.route('/crypto-currency', methods=['GET'])
def index():
return f"<div align='center'><h2>Crypto Currency Forecasting Sever is Active</h2></div>"
@app.route("/crypto-currency/predict", methods=['GET'])
def predict():
if is_training():
response = jsonify({
"message": "all forecasting models are training now!",
"code": 100
data = dict()
for currency in list(CURRENCIES.keys()):
if CURRENCIES[currency]["enable"] and CURRENCIES[currency]["available_data"]:
data[currency] = {
"price": CURRENCIES[currency]["price"],
"volume": CURRENCIES[currency]["volume"],
"market_cap": CURRENCIES[currency]["market_cap"]
response = jsonify({
"code": 200,
"message": "Success",
"data": data
response.headers.add('Access-Control-Allow-Origin', '*')
return response, 200
@app.route("/crypto-currency/sentiment", methods=['GET'])
def sentiment():
response = jsonify({
"code": 200,
"message": "Success",
"sentiment": get_sentiment()
response.headers.add('Access-Control-Allow-Origin', '*')
return response, 200
if __name__ == "__main__":
"User-Agent": 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36'
'Chrome/50.0.2661.102 Safari/537.36 '
import os
import ssl
from urllib.request import Request, urlopen
import certifi
from model_training.pp_market_cap import pp_market_cap
from model_training.pp_price import pp_price
from model_training.pp_volume import pp_volume
from web_scrapping import start_web_scrapping, set_sentiment
DATABASE_DIR = f"database{os.sep}"
THRESHOLD = 1000000
"BTC_USD": {
"url": "",
"available_data": False,
"path": None,
"enable": True,
"price": {
"today": 0,
"tomorrow": 0,
"score": 0,
"exceeded": False
"volume": {
"today": 0,
"tomorrow": 0,
"score": 0,
"exceeded": False
"market_cap": {
"today": 0,
"tomorrow": 0,
"score": 0,
"exceeded": False
"ETH_USD": {
"url": "",
"available_data": False,
"path": None,
"enable": True,
"price": {
"today": 0,
"tomorrow": 0,
"score": 0,
"exceeded": False
"volume": {
"today": 0,
"tomorrow": 0,
"score": 0,
"exceeded": False
"market_cap": {
"today": 0,
"tomorrow": 0,
"score": 0,
"exceeded": False
"url": "",
"available_data": False,
"path": None,
"enable": True,
"price": {
"today": 0,
"tomorrow": 0,
"score": 0,
"exceeded": False
"volume": {
"today": 0,
"tomorrow": 0,
"score": 0,
"exceeded": False
"market_cap": {
"today": 0,
"tomorrow": 0,
"score": 0,
"exceeded": False
def download_data_sources():
for currency in list(CURRENCIES.keys()):
CURRENCIES[currency]["available_data"] = False
CURRENCIES[currency]["path"] = None
for currency in list(CURRENCIES.keys()):
request = Request(
headers={'User-Agent': 'Mozilla/5.0'}
print(f"download data source for {currency}")
with urlopen(request, context=ssl.create_default_context(cafile=certifi.where())) as file:
downloaded_file ='utf-8')
csv_file = open(f'{DATABASE_DIR}{currency}.csv', "w+")
CURRENCIES[currency]["available_data"] = True
CURRENCIES[currency]["path"] = f'{DATABASE_DIR}{currency}.csv'
print(f"successfully downloaded data source for {currency}")
def is_data_sources_configured():
for currency in list(CURRENCIES.keys()):
if CURRENCIES[currency]["enable"] and not CURRENCIES[currency]["available_data"]:
return False
return True
def set_training(_flag):
TRAINING = _flag
def is_training():
def schedule_model_training():
print("start model training")
retry_count = 0
while retry_count < 3:
print("downloading data sources")
retry_count += 1
print(f"attempting - {retry_count}")
if is_data_sources_configured():
print("data sources successfully downloaded")
# model training
for currency in list(CURRENCIES.keys()):
if CURRENCIES[currency]["enable"] and CURRENCIES[currency]["available_data"]:
file_path = CURRENCIES[currency]["path"]
today_price, pred_price = pp_price(file_path)
today_volume, pred_volume = pp_volume(file_path)
today_market_cap, pred_market_cap = pp_market_cap(file_path)
# price
CURRENCIES[currency]["price"]["today"] = today_price
CURRENCIES[currency]["price"]["tomorrow"] = pred_price
score = ((pred_price - today_price) / today_price) * 10
if score < 0:
score = 0
CURRENCIES[currency]["price"]["score"] = score
flag = False
if pred_price >= THRESHOLD:
flag = True
CURRENCIES[currency]["price"]["exceeded"] = flag
# volume
CURRENCIES[currency]["volume"]["today"] = today_volume
CURRENCIES[currency]["volume"]["tomorrow"] = pred_volume
score = ((pred_volume - today_volume) / today_volume) * 10
if score < 0:
score = 0
CURRENCIES[currency]["volume"]["score"] = score
flag = False
if pred_volume >= THRESHOLD:
flag = True
CURRENCIES[currency]["volume"]["exceeded"] = flag
# market cap
CURRENCIES[currency]["market_cap"]["today"] = today_market_cap
CURRENCIES[currency]["market_cap"]["tomorrow"] = pred_market_cap
score = ((pred_market_cap - today_market_cap) / today_market_cap) * 10
if score < 0:
score = 0
CURRENCIES[currency]["market_cap"]["score"] = score
flag = False
if pred_market_cap >= THRESHOLD:
flag = True
CURRENCIES[currency]["market_cap"]["exceeded"] = flag
print("end model training")
set_sentiment('Not Available')
import statsmodels.api as sm
import warnings
def model_training(training_data, scaler):
history = [x for x in training_data]
model = sm.tsa.arima.ARIMA(history, order=(5, 1, 0))
model_fit =
output = model_fit.forecast()
return scaler.inverse_transform([[history[-1]]])[0][0], scaler.inverse_transform([[output[0]]])[0][0]
import pandas as pd
from sklearn.preprocessing import StandardScaler
from model_training.helper import model_training
def pp_market_cap(file_path):
df = pd.read_csv(file_path, delimiter=',', parse_dates=True, squeeze=True)
df.drop(['total_volume', 'price'], axis=1, inplace=True)
df['market_cap'] = df['market_cap'].fillna(0)
df['snapped_at'] = df['snapped_at'].apply(lambda x: x.split(' ')[0].strip())
df['snapped_at'] = pd.to_datetime(df['snapped_at'], infer_datetime_format=True)
scaler = StandardScaler()
df[['market_cap']] = scaler.fit_transform(df[['market_cap']])
training_data = df['market_cap'].values
return model_training(training_data, scaler)
import pandas as pd
from sklearn.preprocessing import StandardScaler
from model_training.helper import model_training
def pp_price(file_path):
df = pd.read_csv(file_path, delimiter=',', parse_dates=True, squeeze=True)
df.drop(['total_volume', 'market_cap'], axis=1, inplace=True)
df['price'] = df['price'].fillna(0)
df['snapped_at'] = df['snapped_at'].apply(lambda x: x.split(' ')[0].strip())
df['snapped_at'] = pd.to_datetime(df['snapped_at'], infer_datetime_format=True)
scaler = StandardScaler()
df[['price']] = scaler.fit_transform(df[['price']])
training_data = df['price'].values
return model_training(training_data, scaler)
import pandas as pd
from sklearn.preprocessing import StandardScaler
from model_training.helper import model_training
def pp_volume(file_path):
df = pd.read_csv(file_path, delimiter=',', parse_dates=True, squeeze=True)
df.drop(['price', 'market_cap'], axis=1, inplace=True)
df['total_volume'] = df['total_volume'].fillna(0)
df['snapped_at'] = df['snapped_at'].apply(lambda x: x.split(' ')[0].strip())
df['snapped_at'] = pd.to_datetime(df['snapped_at'], infer_datetime_format=True)
scaler = StandardScaler()
df[['total_volume']] = scaler.fit_transform(df[['total_volume']])
training_data = df['total_volume'].values
return model_training(training_data, scaler)
