Commit 996a2155 authored by dasunx's avatar dasunx

Medium article and dev.to article scrappers added

parent 4a46704a
......@@ -29,10 +29,38 @@ const AutomatedAnswerSchema = mongoose.Schema({
type: String
}
],
blogs: [
medium_articles: [
{
type: Schema.Types.ObjectId,
ref: 'BlogArticle'
title: String,
pubDate: String,
link: String,
guid: String,
author: String,
thumbnail: String,
description: String,
content: String
}
],
dev_articles: [
{
title: String,
pubDate: String,
link: String,
guid: String,
author: String,
thumbnail: String,
description: String,
content: String
}
],
medium_resources: [
{
type: String
}
],
dev_resources: [
{
type: String
}
]
});
......
const mongoose = require('mongoose');
const BlogArticleSchema = mongoose.Schema({
automatedAnswer: {
type: Schema.Types.ObjectId,
ref: 'AutomatedAnswer',
required: true
},
blogName: {
type: String,
required: true
},
link: {
type: String,
required: true
},
content: {
type: String
}
});
module.exports = mongoose.model('BlogArticle', BlogArticleSchema);
from search_engine_parser import GoogleSearch
import re
import requests
import random
class DevTo:
def __init__(self, title, tags):
self.title = title
self.tags = tags
def getApiKey(self):
api_keys = [
"2rk1eg4sexdnp5umrwtwbtwd2insqvgzvejooqrn",
"yit6ytfcs3ziawdgasfd3bgkbf4tef1m2nzdxvnz",
"mpawymyrc6derrwmgodowfsaabtuoes4iiwintd7",
]
return random.choice(api_keys)
def google(self, query):
search_args = (query, 1)
gsearch = GoogleSearch()
gresults = gsearch.search(*search_args)
return gresults["links"]
def getValidUrls(self, links):
validUrls = []
for i in links:
if "dev.to" in i:
uriTrimmed = re.match(r"^.*?\&sa=", i[29:]).group(0)
ur = uriTrimmed.replace("&sa=", "")
validUrls.append(ur)
return validUrls
def getValidSets(self, validUrls):
validSets = []
for url in validUrls:
try:
vset = {}
print(url)
username = re.search(r"https://dev.to/([^/?]+)", url).group(1)
tag = re.search(r"https://dev.to/([^/?]+)/([^/?]+)", url).group(2)
vset["username"] = username
vset["tag"] = tag
validSets.append(vset)
except Exception as e:
print(e)
continue
return validSets
def getBlogs(self, username, tag):
blog = {}
try:
response = requests.get(
f"https://api.rss2json.com/v1/api.json?rss_url=https%3A%2F%2Fdev.to%2Ffeed%2F{username}&api_key={self.getApiKey()}"
)
if response.status_code == 200:
res = response.json()
for item in res["items"]:
if tag in item["link"]:
blog = item
except Exception as e:
print(e)
return blog
def getDevArticles(self):
links = self.google(f"site:dev.to {self.title} after:2020-01-01")
validUrls = self.getValidUrls(links)
validSets = self.getValidSets(validUrls)
blogs = []
for validset in validSets:
blog = self.getBlogs(validset["username"], validset["tag"])
if bool(blog):
blogs.append(blog)
return {"blogs": blogs, "resources": validUrls}
import requests
from requests_html import HTMLSession
from bs4 import BeautifulSoup
import json
from search_engine_parser import GoogleSearch
import re
from lxml import etree
import requests
import random
class Medium:
def __init__(self, qtitle, keywords=[], description=""):
self.qtitle = qtitle
self.keywords = keywords
self.description = description
self.urls = []
self.session = HTMLSession()
def searchArticles(self):
"""
Search details using google dorks,
With google dorks we can filter out other search results from other web sites.
"""
html_page = requests.get(
f"https://google.com/search?q=site%3Amedium.com+{self.qtitle}"
)
soup = BeautifulSoup(html_page.content, "html.parser")
for link in soup.findAll("a"):
if "https://medium.com" in link["href"]:
self.urls.append(self.extractMediumURLS(link["href"]))
self.viewArticle(self.urls[0])
def extractMediumURLS(self, uriString):
"""
Remove unwanted characters from the url string and filter out the targeted url
"""
uriTrimmed = uriString[7:]
uriTrimmed = re.match(r"^.*?\&sa=", uriTrimmed).group(0)
return uriTrimmed.replace("&sa=", "")
def viewArticle(self, url):
html_page = self.session.get(url)
html_page.html.render(timeout=20)
# soup = BeautifulSoup(html_page.content, "html.parser")
# dom = etree.HTML(str(soup))
with open("medium.html", "wb") as med:
med.write(html_page.content)
med.close()
with open("medium.html", encoding="utf8") as sf:
soup = BeautifulSoup(sf, "html.parser")
dom = etree.HTML(str(soup))
# art = dom.xpath('//*[@class="a b c"]')[0]
# print(etree.tostring(art))
title = dom.xpath('//*[@class="ap aq ar as at ff av w"]/div/h1')[0].text
article = dom.xpath('//*[@class="ap aq ar as at ff av w"]')[0]
with open(f"article-{title.replace(' ','')}.html", "wb") as artFile:
artFile.write(etree.tostring(article))
artFile.close()
def __init__(self, title, tags):
self.title = title
self.tags = tags
def getApiKey(self):
"""
Returns an API key for retrieve json data
"""
api_keys = [
"2rk1eg4sexdnp5umrwtwbtwd2insqvgzvejooqrn",
"yit6ytfcs3ziawdgasfd3bgkbf4tef1m2nzdxvnz",
"mpawymyrc6derrwmgodowfsaabtuoes4iiwintd7",
]
return random.choice(api_keys)
def google(self, query):
"""
Use a query to search using google search enging
"""
search_args = (query, 1)
gsearch = GoogleSearch()
gresults = gsearch.search(*search_args)
return gresults["links"]
def getValidUrls(self, links):
"""
Validate and filter out the urls.
Returns the urls that contain medium.com in it as a list
"""
validUrls = []
for i in links:
if "medium.com" in i:
uriTrimmed = re.match(r"^.*?\&sa=", i[29:]).group(0)
ur = uriTrimmed.replace("&sa=", "")
validUrls.append(ur)
return validUrls
def getValidSets(self, validUrls):
"""
Extract usernames and article id's from article url
pass a list of urls => returns objects list that contain usernam and article id
"""
validSets = []
for url in validUrls:
try:
vset = {}
print(url)
username = re.search(r"https://medium.com/([^/?]+)", url).group(1)
tag = re.search(r"https://medium.com/([^/?]+)/([^/?]+)", url).group(2)
vset["username"] = username
vset["tag"] = tag
validSets.append(vset)
except Exception as e:
print(e)
continue
return validSets
def getBlogs(self, username, tag):
"""
Get the content of the article
"""
blog = {}
try:
response = requests.get(
f"https://api.rss2json.com/v1/api.json?rss_url=https%3A%2F%2Fmedium.com%2Ffeed%2F{username}&api_key={self.getApiKey()}"
)
if response.status_code == 200:
res = response.json()
for item in res["items"]:
if tag in item["link"]:
blog = item
except Exception as e:
print(e)
return blog
def getMediumArticles(self):
"""
return a list of articles and/or resources
"""
links = self.google(f"site:medium.com {self.title} after:2020-01-01")
validUrls = self.getValidUrls(links)
validSets = self.getValidSets(validUrls)
blogs = []
for validset in validSets:
blog = self.getBlogs(validset["username"], validset["tag"])
if bool(blog):
blogs.append(blog)
with open("ff.json", "w") as f:
json.dump({"blogs": blogs, "resources": validUrls}, f)
f.close()
return {"blogs": blogs, "resources": validUrls}
bson==0.5.10
beautifulsoup4==4.9.3
dnspython==2.1.0
lxml==4.6.1
......@@ -6,4 +7,5 @@ regex==2020.7.14
requests==2.24.0
requests-html==0.10.0
scipy==1.5.4
search-engine-parser==0.6.2
youtube-search-python==1.4.6
from youtube import Youtube
from Medium import Medium
from Dev import DevTo
from stof import STOF
import sys
from database import get_database
def saveAnswer(ans_id, stackoverflow, videos):
def saveAnswer(ans_id, stackoverflow, videos, medium_r, dev_r):
db = get_database()
try:
from bson.objectid import ObjectId
......@@ -13,7 +14,26 @@ def saveAnswer(ans_id, stackoverflow, videos):
automatedanswers = db["automatedanswers"]
automatedanswers.update_one(
{"_id": ObjectId(ans_id)},
{"$set": {"youtube": videos, "stackoverflow": stackoverflow}},
{
"$set": {
"youtube": videos,
"stackoverflow": stackoverflow,
"medium_articles": medium_r["blogs"],
"dev_articles": dev_r["blogs"],
"medium_resources": medium_r["resources"],
"dev_resources": dev_r["resources"],
}
},
)
print(
{
"youtube": videos,
"stackoverflow": stackoverflow,
"medium_articles": medium_r["blogs"],
"dev_articles": dev_r["blogs"],
"medium_resources": medium_r["resources"],
"dev_resources": dev_r["resources"],
}
)
except NameError as err:
print(err)
......@@ -23,19 +43,19 @@ if __name__ == "__main__":
# title = input("Enter question title: ")
title = sys.argv[1] # "python django or flask for web development"
tags = sys.argv[2] # ["react"]
AUTO_ANS_ID = sys.argv[3] # "60d746076689344694ad9e30" #
AUTO_ANS_ID = sys.argv[3] # "60dc9a5f84692f001569d7ab"
stack = STOF(title)
ans = stack.searchQuestion()
print(ans)
# medium = Medium(title)
# medium.searchArticles()
# f = open("data.txt", "a")
# f.write(f"updated {title} {tags} {AUTO_ANS_ID}\n")
# f.close()
medium = Medium(title, tags)
medium_articels = medium.getMediumArticles()
devto = DevTo(title, tags)
dev_articles = devto.getDevArticles()
youtube = Youtube(title, tags)
videos = youtube.find_videos()
saveAnswer(AUTO_ANS_ID, ans, videos)
saveAnswer(AUTO_ANS_ID, ans, videos, medium_articels, dev_articles)
print("WORKED")
sys.stdout.flush()
......@@ -7,4 +7,5 @@ regex==2020.7.14
requests==2.24.0
requests-html==0.10.0
scipy==1.5.4
search-engine-parser==0.6.2
youtube-search-python==1.4.6
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment