python - Instagram Scraper - 空数据框
问题描述
目前正在尝试运行此代码以进行主题标签抓取:
https://github.com/kitsamho/Instagram_Scraper_Graph/blob/master/InstagramScraper.ipynb
但它一直返回一个空数据框。
class InstagramScraper():
"""
Class that allows you to scrape the content of Instagram posts, either
a profile or a hashtag.
Initialised with the location of your Chromedriver location
"""
def __init__(self,driver_loc='C:/Users/gctia/Documents/Python Scripts/basf/chromedriver.exe'):
self.driver_loc = driver_loc
def multithreadCompile(self,thread_count,iteration_list,func):
"""
This function compiles the batched needed for mult-threadding
Args:
thread_count is the number of threads used for multi-threadding
iteration_list is the source list of urls to iterate over
func is the function to be used in the multi-thredding process
Returns:
The batches that have been allocated to be run using the specified
function
"""
jobs = [] #empty list for jobs
#distribute iteration list to batches and append to jobs list
batches = [i.tolist() for i in np.array_split(iteration_list,thread_count)]
for i in range(len(batches)):
jobs.append(threading.Thread(target=func,args=[batches[i]]))
return jobs
def multithreadExecute(self,jobs):
"""
This function executes the multi-threadding process
Args:
The batches that have been appended to a jobs list
Returns:
Nothing, merely executes the multi-threadding
"""
# Start the threads
for j in jobs:
print('execute working')
j.start()
# Ensure all of the threads have finished
for j in jobs:
j.join()
return
def getJson(self,url):
"""
This function exracts a JSON style dictionary from the html for any
given unique Instagram post
Args:
An Instagram post URL
Returns:
JSON dictionary ouput
"""
page = urlopen(url).read() #read url
data=BeautifulSoup(page, 'html.parser') #get a BeautifulSoup object
body = data.find('body') #find body element
script = body.find('script') #find script element
#some string formatting
raw = script.text.strip().replace('window._sharedData =', '').replace(';', '')
#load string
json_data=json.loads(raw)
return json_data #return JSON dictonary
def userDetails(self):
"""
Functions that capture log in details and logs user into Instagram
Args:
None needed
Returns:
Nothing
"""
#capture username
username = input('Enter username...')
#capture password
password = getpass.getpass('Enter password...')
self._password = password #retain password as attribute
self._username = username #retain user name as attribute
return
def openWebdriver(self):
"""
Launches Chrome webdriver
Args:
None needed
Returns:
driver
"""
#intiate driver
print("Launching driver...")
#retain current driver as attribute
driver = webdriver.Chrome(self.driver_loc)
return driver
def closeWebdriver(self,driver):
"""
Closes Chrome webdriver
Args:
webDriver
Returns:
Nothing
"""
driver.close()
return
def instagramLogin(self,driver):
"""
Logs in to Instagram
Args:
Current webdriver
Returns:
Current webdriver - logged into Instagram
"""
#base url
driver.get('https://www.instagram.com/accounts/login/?source=auth_switcher')
sleep(2) #wait
#log in
#username_field = driver.find_element_by_xpath('//*[@id="react-root"]/section/main/div/article/div/div[1]/div/form/div[2]/div/label/input')
username_field = WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.CSS_SELECTOR, "input[name='username']")))
#username_field.click() #click on username button
#send username
username_field.send_keys(self._username)
#locate element to click
#try:
# password_field = driver.find_element_by_xpath('//*[@id="react-root"]/section/main/div/article/div/div[1]/div/form/div[3]/div/label/input')
#except:
# password_field = driver.find_element_by_xpath('//*[@id="react-root"]/section/main/div/article/div/div[1]/div/form/div[4]/div/label/input')
password_field = WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.CSS_SELECTOR, "input[name='password']")))
#password_field.click()
password_field.send_keys(self._password)
sleep(2)
#find log in button
#login_button = driver.find_element_by_xpath('//*[@id="react-root"]/section/main/div/article/div/div[1]/div/form/div[4]')
login_button = WebDriverWait(driver, 2).until(EC.element_to_be_clickable((By.CSS_SELECTOR, "button[type='submit']"))).click()
#login_button.click()
sleep(3)
#locate floating window to click and close
#floating_window = driver.find_element_by_class_name('piCib')
#button = floating_window.find_element_by_class_name('mt3GC')
#not_now = button.find_element_by_xpath('/html/body/div[4]/div/div/div[3]/button[2]')
#not_now.click()
alert = WebDriverWait(driver, 15).until(EC.element_to_be_clickable((By.XPATH, '//button[contains(text(), "Not Now")]'))).click()
alert2 = WebDriverWait(driver, 15).until(EC.element_to_be_clickable((By.XPATH, '//button[contains(text(), "Not Now")]'))).click()
return driver
def setTarget(self):
"""
Function that sets either a profile or a hashtag as a target
Args:
None
Returns:
base url to scrape - either a hashtag page or a profile page
"""
#tou can choose either hashtag search or a profile to search
route = input('What do you want to scrape, profile posts or hashtags? (p/h)')
#if hashtags
if route == 'h':
#set hashtag
hashtag = input('Which hashtag do you want to scrape posts for: ')
self.target_label = '#'+hashtag #retain hashtag as attribute
tag_url = 'https://www.instagram.com/explore/tags/' #set base url
self._target = tag_url+hashtag #set url to scrape from
return self._target #return url to scrape from
else:
profile = input('What profile do you want to scrape posts for: ')
self.target_label = '@'+profile #retain profile as attribute
profile_url = 'https://www.instagram.com/' #set base url
self._target = profile_url+profile #set url to scrape from
return self._target #return url to scrape from
def scrapeLinks(self,url):
"""
Function that scrapes the links needed
Args:
target_url
Returns:
Nothing - but retains a list of urls to scrape
"""
#pass url as argument to Selenium webDriver, loads url
self.activedriver.get(url)
options = webdriver.ChromeOptions()
#start maximised
options.add_argument("--start-maximized")
#gets scroll height
last_height = self.activedriver.execute_script("return document.body.scrollHeight")
#initiate empty list for unique Instagram links
links = []
#some lines for user interactivity / selection of link target(n)
print("\n")
target = input("How many links do you want to scrape (minimum)?: ")
print("\n")
print("Staring Selenium scrape, please keep browser open.")
print("\n")
#this loops round until n links achieved or page has ended
while True:
source = self.activedriver.page_source
data= BeautifulSoup(source, 'html.parser')
body = data.find('body')
#script = body.find('span')
for link in body.findAll('a'):
if re.match("/p", link.get('href')):
links.append('https://www.instagram.com'+link.get('href'))
else:
continue
# Scroll down to bottom
self.activedriver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# Wait to load page
time.sleep(2)
# Calculate new scroll height and compare with last scroll height
new_height = self.activedriver.execute_script("return document.body.scrollHeight")
#if no more content, scrape loop is terminated
if new_height == last_height:
break
last_height = new_height
#update on successful links scraped
print("Scraped ", len(links)," links, ", len(set(links)),' are unique')
#if n target met then while loop breaks
if len(set(links))>int(target):
break
#links are saved as an attribute for the class instance
self._links = list(unique_everseen(links))
#clear the screen and provide user feedback on performance
clear_output()
print("Finished scraping links. Maxed out at ", len(links)," links, of which ", len(self._links),' are unique.')
print("\n")
print("Unique links obtained. Closing driver")
print("\n")
# close driver
self.closeWebdriver(self.activedriver)
return
def postDate(self,data):
"""
Function that gets the date of post
Args:
JSON dictionary for post
Returns:
datetime of post
"""
return datetime.utcfromtimestamp(data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['taken_at_timestamp']).strftime('%Y-%m-%d %H:%M:%S')
def postUser(self,data):
"""
Function that gets the username of the person who posted
Args:
JSON dictionary for post
Returns:
username
"""
return data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['owner']['username']
def postVerifiedUser(self,data):
"""
Function gets the verified status of the user
Args:
JSON dictionary for post
Returns:
verified status
"""
return data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['owner']['is_verified']
def postLikes(self,data):
"""
Function that gets the number of likes the post received
Args:
JSON dictionary for post
Returns:
number of likes
"""
return data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['edge_media_preview_like']['count']
def postVerifiedTags(self,data):
"""
Function that gets the verified tags that a post contains
Args:
JSON dictionary for post
Returns:
the verified tags in the post
"""
tag_end_point = data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['edge_media_to_tagged_user']['edges']
entities = []
verif = []
for i in range(len(tag_end_point)):
entities.append(tag_end_point[i]['node']['user']['full_name'])
verif.append(tag_end_point[i]['node']['user']['is_verified'])
df = pd.DataFrame({'Brand':entities,'Verified':verif})
df = df[df.Verified == True]
if len(list(df.Brand)) < 1:
return np.nan
else:
return list(df.Brand)
def postUnverifiedTags(self,data):
"""
Function that gets the unverified tags a post contains
Args:
JSON dictionary for post
Returns:
the unverified tags in the post
"""
tag_end_point = data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['edge_media_to_tagged_user']['edges']
tags = [] #emoty list for entities
verif = [] #empty list for verified status
#loop through
for i in range(len(tag_end_point)):
#append entities
tags.append(tag_end_point[i]['node']['user']['full_name'])
#append verified status
verif.append(tag_end_point[i]['node']['user']['is_verified'])
#DataFrame of verified / unverified tags
df = pd.DataFrame({'Tag':tags,'Verified':verif})
#subset on unverified tags
df = df[df.Verified == False]
#if there are unverified tags then return NaN else return unverified tags
if len(list(df.Tag)) < 1:
return np.nan
else:
return ''.join(list(df.Tag))
def postComment(self,data):
return data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['edge_media_to_caption']['edges'][0]['node']['text']
#get location of post
def postLocation(self,data):
"""
Function that gets the post location if available
Args:
JSON dictionary for post
Returns:
the posts location
"""
try:
if len(list(data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['location']['name'])) > 0:
return data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['location']['name']
except:
return np.nan
#get accessibility / image data
def postAccessibility(self,data):
"""
Function that gets the post accessibility data if available
Args:
JSON dictionary for post
Returns:
the accessibility data
"""
try:
try:
image = data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['accessibility_caption'].replace('Image may contain: ','').replace(' and ',', ').replace('one or more ','')
return image
except:
image = data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['edge_sidecar_to_children']['edges'][0]['node']['accessibility_caption'].replace('Image may contain: ','').replace(' and ',', ').replace('one or more ','')
return image
except:
return np.nan
#return original post link
def postLink(self,data):
return data
"""
The three main methods that combine all above
"""
#get user details, log in and initiate driver
def logIn(self):
self.userDetails()
driver = self.openWebdriver()
self.activedriver = self.instagramLogin(driver)
clear_output()
print('Successfully logged in..ready to scrape')
#get all the unique links
def getLinks(self):
return self.scrapeLinks(self.setTarget())
#extract data and return dataframe
def getData(self):
#create empty lists for posts and comments
post_date_l = []
post_user_l = []
post_verif_l = []
post_likes_l = []
post_tags_v_l =[]
post_tags_u_l = []
post_l = []
post_location_l = []
post_insta_classifier_l = []
post_link_l = []
self._listStack = [post_date_l,post_user_l,post_verif_l,post_likes_l,post_tags_v_l,
post_tags_u_l,post_l,post_location_l,post_insta_classifier_l,post_link_l]
self._functionStack = [ self.postDate,
self.postUser,
self.postVerifiedUser,
self.postLikes,
self.postVerifiedTags,
self.postUnverifiedTags,
self.postComment,
self.postLocation,
self.postAccessibility,
self.postLink]
def extractData(links=self._links):
#loops through and calls each data collection function on each link
for i in tqdm_notebook(range(len(links))):
try:
data = self.getJson(links[i])
for function in self._functionStack:
if function != self._functionStack[-1]:
try:
self._listStack[self._functionStack.index(function)].append(function(data))
except:
self._listStack[self._functionStack.index(function)].append(np.nan)
else:
self._listStack[-1].append(self._functionStack[-1](links[i]))
except:
pass
return
# execute html parsing fuction using multi threading
print("Attemping multi-threading...")
print("\n")
threads = int(input("How many threads?: "))
print("\n")
print("Executing...")
self.multithreadExecute(self.multithreadCompile(threads,self._links,extractData))
#set up intial data structure
df = pd.DataFrame({'searched_for':[self.target_label]*len(post_l),
'post_link' :post_link_l,
'post_date':post_date_l,
'post':post_l,
'user':post_user_l,
'user_verified_status': post_verif_l,
'post_likes':post_likes_l,
'post_verified_tags':post_tags_v_l,
'post_unverified_tags':post_tags_u_l,
'post_location':post_location_l,
'post_image':post_insta_classifier_l,
})
df.sort_values(by='post_date',ascending=False,inplace=True)
df.reset_index(drop=True,inplace=True) #reset index
self._df = df #retain final DataFrame as attribute
return df
我使用几乎相同的代码,对 instagramLogin 函数进行了一些小改动,以便能够登录(不确定这是否会影响抓取)。
解决方案
推荐阅读
- python - 如何使用 .loc 和 .contains 设置列值
- mysql - MySQL计算数据透视表上至少有一条记录的所有记录
- simulation - 如何修复 Castalia 中的“网络初始化期间 SensorManager 模块中的错误”
- reactjs - 使用 ReactDOM.render 时无法将道具作为 JSON 传递
- python - 同一周数 Python Pandas 过去 3 年的滚动平均值
- python - GPIO 事件未显示在 QTreeModel / QWidget / QMainWindow
- javascript - 从 Stripe API 检索发票时,如何缩小结果范围以仅从对象内的一个属性中检索数据?
- tensorflow - tensorflow中定义的faster_rcnn_resnet101在哪里(比如层在哪里)
- nginx - Kestrel vs IIS+Kestrel(反向代理) vs Nginx
- javascript - 通过调用 php 文件动态构建 JavaScript