79人参与 • 2024-08-06 • 驱动开发
前言:一个简单的tweet爬虫,如果功能与期望不符还望多多见谅,如果有什么好的建议的话欢迎提出。
需求工具:对应浏览器的selenium驱动,同时需要一个tweet账号
from selenium import webdriver
from selenium.webdriver.chrome.service import service
from webdriver_manager.chrome import chromedrivermanager
from selenium.webdriver.common.by import by
from selenium.webdriver.support.ui import webdriverwait
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.chrome.options import options
import pymysql
from twitter.util import standardization_time
from twitter.util import standardization_cout
注:此处是两个自建函数在下面附有
def crawl_daily_tweets(keywords, start_date, days):
from datetime import datetime, timedelta
# 将关键词字符串分割成列表
keyword_list = keywords.split(',')
# 转换字符串日期为datetime对象
start = datetime.strptime(start_date, "%y-%m-%d")
for keyword in keyword_list:
for i in range(days):
day = start + timedelta(days=i)
# 对于每一天,构建一个网址
day_str = day.strftime("%y-%m-%d") # 转换回字符串格式
next_day_str = (day + timedelta(days=1)).strftime("%y-%m-%d")
website_address = f"https://twitter.com/search?q={keyword.strip()}%20until%3a{next_day_str}%20since%3a{day_str}&src=typed_query"
# 在这里调用你的爬取函数,传入构建的网址
accessing_web_pages(website_address,keyword)
注:不推荐使用自动登录,容易被检测封号。此次提供的两种方法都没有使用自动登录。
def accessing_web_pages(target_url,keyword):
# 创建一个chromeoptions实例
options = options()
# 设置为无头模式
options.add_argument('--headless')
# 创建浏览器实例
driver = webdriver.chrome(service=service(chromedrivermanager().install()))
# 首先访问twitter的主页以设置域
driver.get("https://twitter.com")
# 添加cookie到浏览器实例
cookies = {
}
# 添加cookie
for key, value in cookies.items():
cookie = {'name': key, 'value': value, 'domain': '.twitter.com'}
driver.add_cookie(cookie)
# 重新加载或访问目标网页
driver.get(target_url)
try:
# 等待直到“接受cookies”按钮出现
accept_button = webdriverwait(driver, 10).until(
ec.element_to_be_clickable((by.xpath, "//span[contains(@class, 'css-1qaijid') and contains(text(), 'accept all cookies')]"))
)
# 点击页面同意cookies按钮
accept_button.click()
except:
pass
# 获取数据
get_data(driver,keyword)
def accessing_web_pages(target_url,keyword):
# 创建浏览器实例
driver = webdriver.chrome(service=service(chromedrivermanager().install()))
# 导航到登录页面
driver.get("https://twitter.com/login")
# 等待足够的时间,以便手动登录
input("请登录后按enter键继续...")
# 登录成功后,保存cookies到一个变量
cookies = driver.get_cookies()
# 重用之前保存的cookies
for cookie in cookies:
driver.add_cookie(cookie)
# 使用带有之前cookies的driver访问需要登录状态的页面
driver.get(target_url)
try:
# 等待直到“接受cookies”按钮出现
accept_button = webdriverwait(driver, 10).until(
ec.element_to_be_clickable((by.xpath, "//span[contains(@class, 'css-1qaijid') and contains(text(), 'accept all cookies')]"))
)
# 点击页面同意cookies按钮
accept_button.click()
except:
pass
# 获取数据
get_data(driver,keyword)
此处使用selenium模拟翻页操作进行数据爬取,同时使用random函数每次下滑距离随机,降低被网站监测的概率,通过将网页内的推文网址储存在在一个列表里,在每次下滑后再获取推文网址,然后判断是否有新内容出现。接下来,判断是否下滑至底部,此处设置五次重试次数,超过限制后,结束本页爬取。
def get_data(driver,keyword):
import time
import random
from datetime import datetime
# 数据库连接配置
config = {
'host': 'localhost',
'user': 'root',
'password': 'root',
'database': 'weibo',
'charset': 'utf8mb4',
}
# 建立数据库连接
mysql_db = pymysql.connect(**config)
# 创建游标对象
cursor = mysql_db.cursor()
# 创建表的sql语句(如果还没有创建)
create_table_sql = """
create table if not exists tweets (
tweet_url varchar(255) not null primary key,
username varchar(100) not null,
tweet_content text not null,
publish_date datetime,
comments int default 0,
retweets int default 0,
likes int default 0,
views int default 0,
get_time datetime not null,
keyword varchar(100)
);
"""
# 执行创建表的sql语句
try:
cursor.execute(create_table_sql)
mysql_db.commit()
print("table created successfully.")
except exception as e:
mysql_db.rollback()
print(f"failed to create table: {e}")
finally:
cursor.close()
crawled_tweets_urls = []
# 初始化变量以跟踪滚动
last_height = driver.execute_script("return document.body.scrollheight")
max_retries = 5 # 允许的最大重试次数
retries = 0 # 当前重试次数
while retries < max_retries:
driver.execute_script("window.scrollby(0, {});".format(random.randint(200, 800)))
time.sleep(random.uniform(2, 4.5))
# 这里设置最长等待时间为10秒
article_content = webdriverwait(driver, 10).until(
ec.presence_of_all_elements_located((by.xpath, "//div[@data-testid='cellinnerdiv']/div/div/article"))
)
for article in article_content:
try:
# 获取推文的网址
tweet_url = article.find_element(by.xpath, ".//time/..").get_attribute("href")
if tweet_url not in crawled_tweets_urls:
crawled_tweets_urls.append(tweet_url)
try:
# 获取用户名
username = article.find_element(by.xpath, ".//div[@data-testid='user-name']//span").text
# 获取推文内容
tweet_content = article.find_element(by.xpath, ".//div[@data-testid='tweettext']").text
# 获取发布日期
publish_date = article.find_element(by.xpath, ".//time").get_attribute('datetime')
publish_date = standardization_time(publish_date)
# 获取评论数
comments = article.find_element(by.xpath,
".//div[@data-testid='reply']//span").text if article.find_elements(
by.xpath, ".//div[@data-testid='reply']//span") else "0"
comments = standardization_cout(comments)
# 获取转发数
retweets = article.find_element(by.xpath,
".//div[@data-testid='retweet']//span").text if article.find_elements(
by.xpath, ".//div[@data-testid='retweet']//span") else "0"
retweets = standardization_cout(retweets)
# 获取点赞数
likes = article.find_element(by.xpath,
".//div[@data-testid='like']//span").text if article.find_elements(
by.xpath, ".//div[@data-testid='like']//span") else "0"
likes = standardization_cout(likes)
# 获取查看次数(如果可用)
views = article.find_element(by.xpath,
".//a[contains(@href,'analytics')]//span").text if article.find_elements(
by.xpath, ".//a[contains(@href,'analytics')]//span") else "0"
views = standardization_cout(views)
#关键词
keyword = keyword
#爬取时间
get_time = datetime.now().strftime("%y-%m-%d %h:%m:%s")
# 抓取数据并存储到字典
tweet_data = {
"tweet_url": tweet_url,
"username": username,
"tweet_content": tweet_content,
"publish_date": publish_date,
"comments": comments if comments else "0",
"retweets": retweets if retweets else "0",
"likes": likes if likes else "0",
"views": views if views else "0",
"get_time": get_time,
"keyword": keyword
}
# 数据库连接配置
config = {
'host': 'localhost',
'user': 'root',
'password': 'root',
'database': 'weibo',
'charset': 'utf8mb4',
}
# 建立数据库连接
mysql_db = pymysql.connect(**config)
# 创建游标对象
cursor = mysql_db.cursor()
# 插入或更新数据的sql语句模板
insert_or_update_sql = """
insert into tweets (
tweet_url, username, tweet_content, publish_date,
comments, retweets, likes, views, get_time, keyword
) values (%(tweet_url)s, %(username)s, %(tweet_content)s, %(publish_date)s,
%(comments)s, %(retweets)s, %(likes)s, %(views)s, %(get_time)s, %(keyword)s)
on duplicate key update
username=values(username),
tweet_content=values(tweet_content),
publish_date=values(publish_date),
comments=values(comments),
retweets=values(retweets),
likes=values(likes),
views=values(views),
get_time=values(get_time),
keyword=values(keyword);
"""
# 执行插入或更新数据的sql语句
try:
cursor.execute(insert_or_update_sql, tweet_data)
mysql_db.commit()
print("data inserted or updated successfully.")
except exception as e:
mysql_db.rollback()
print(f"insert or update data error: {e}")
finally:
cursor.close()
print(
f"tweet url: {tweet_url}, username: {username}, tweet_content: {tweet_content}, date: {publish_date}, comments: {comments}, retweets: {retweets}, likes: {likes}, views: {views}, get_time: {get_time}, keyword: {keyword}")
print('-----------')
except exception as e:
print(f'提取信息时出错: {e}')
else:
continue
except exception as e:
print(f'提取信息时出错: {e}')
# 检查页面滚动高度是否有变化
new_height = driver.execute_script("return document.body.scrollheight")
if new_height == last_height:
retries += 1
print(f"第{retries}次重试...")
else:
last_height = new_height
retries = 0 # 重置重试次数
print("检测到新内容,继续爬取...")
#input("已到达页面底部或重试次数已达上限,按enter键继续...")
if __name__ == '__main__':
crawl_daily_tweets("亚伦.布什内尔","2024-02-26",2)
此处参数分别是关键词(此处keyword可以是多个以","分隔),爬取开始时间,爬取天数
def standardization_time(publish_date):
# 将日期转换为标准格式
from datetime import datetime, timedelta
# 将字符串转换为datetime对象
utc_dt = datetime.strptime(publish_date, "%y-%m-%dt%h:%m:%s.%fz")
# 由于中国是utc+8,我们将utc时间加上8小时得到中国时间
# 注意:这里简单地加上时差,如果需要考虑夏令时等复杂情况,使用pytz库会更准确
china_dt = utc_dt + timedelta(hours=8)
# 将日期格式化为所需格式
formatted_date = china_dt.strftime("%y-%m-%d %h:%m:%s")
return formatted_date
def standardization_cout(str):
try:
# 移除逗号
views_str_cleaned = str.replace(',', '')
# 将清理后的字符串转换为整数
views_int = int(views_str_cleaned)
return views_int
except:
pass
第一个函数的作用是将爬取的时间转换为国区时间,第二个函数的作用是将获取的评论数,点赞数等转换为数字格式,使可以符合mysql里表格的字段要求。
from selenium import webdriver
from selenium.webdriver.chrome.service import service
from webdriver_manager.chrome import chromedrivermanager
from selenium.webdriver.common.by import by
from selenium.webdriver.support.ui import webdriverwait
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.chrome.options import options
from twitter.util import standardization_time
from twitter.util import standardization_cout
import pymysql
def crawl_daily_tweets(keywords, start_date, days):
from datetime import datetime, timedelta
# 将关键词字符串分割成列表
keyword_list = keywords.split(',')
# 转换字符串日期为datetime对象
start = datetime.strptime(start_date, "%y-%m-%d")
for keyword in keyword_list:
for i in range(days):
day = start + timedelta(days=i)
# 对于每一天,构建一个网址
day_str = day.strftime("%y-%m-%d") # 转换回字符串格式
next_day_str = (day + timedelta(days=1)).strftime("%y-%m-%d")
website_address = f"https://twitter.com/search?q={keyword.strip()}%20until%3a{next_day_str}%20since%3a{day_str}&src=typed_query"
# 在这里调用你的爬取函数,传入构建的网址
accessing_web_pages(website_address,keyword)
def accessing_web_pages(target_url,keyword):
# # 创建一个chromeoptions实例
# options = options()
# # 设置为无头模式
# options.add_argument('--headless')
# # 创建浏览器实例
# driver = webdriver.chrome(service=service(chromedrivermanager().install()))
#
# # 首先访问twitter的主页以设置域
# driver.get("https://twitter.com")
#
# # 添加cookie到浏览器实例
# cookies = {
#
# }
#
# # 添加cookie
# for key, value in cookies.items():
# cookie = {'name': key, 'value': value, 'domain': '.twitter.com'}
# driver.add_cookie(cookie)
#
# # 重新加载或访问目标网页
# driver.get(target_url)
# 创建浏览器实例
driver = webdriver.chrome(service=service(chromedrivermanager().install()))
# 导航到登录页面
driver.get("https://twitter.com/login")
# 等待足够的时间,以便手动登录
input("请登录后按enter键继续...")
# 登录成功后,保存cookies到一个变量
cookies = driver.get_cookies()
# 重用之前保存的cookies
for cookie in cookies:
driver.add_cookie(cookie)
# 使用带有之前cookies的driver访问需要登录状态的页面
driver.get(target_url)
try:
# 等待直到“接受cookies”按钮出现
accept_button = webdriverwait(driver, 10).until(
ec.element_to_be_clickable((by.xpath, "//span[contains(@class, 'css-1qaijid') and contains(text(), 'accept all cookies')]"))
)
# 点击页面同意cookies按钮
accept_button.click()
except:
pass
# 获取数据
get_data(driver,keyword)
def get_data(driver,keyword):
import time
import random
from datetime import datetime
# 数据库连接配置
config = {
'host': 'localhost',
'user': 'root',
'password': 'root',
'database': 'weibo',
'charset': 'utf8mb4',
}
# 建立数据库连接
mysql_db = pymysql.connect(**config)
# 创建游标对象
cursor = mysql_db.cursor()
# 创建表的sql语句(如果还没有创建)
create_table_sql = """
create table if not exists tweets (
tweet_url varchar(255) not null primary key,
username varchar(100) not null,
tweet_content text not null,
publish_date datetime,
comments int default 0,
retweets int default 0,
likes int default 0,
views int default 0,
get_time datetime not null,
keyword varchar(100)
);
"""
# 执行创建表的sql语句
try:
cursor.execute(create_table_sql)
mysql_db.commit()
print("table created successfully.")
except exception as e:
mysql_db.rollback()
print(f"failed to create table: {e}")
finally:
cursor.close()
crawled_tweets_urls = []
# 初始化变量以跟踪滚动
last_height = driver.execute_script("return document.body.scrollheight")
max_retries = 5 # 允许的最大重试次数
retries = 0 # 当前重试次数
while retries < max_retries:
driver.execute_script("window.scrollby(0, {});".format(random.randint(200, 800)))
time.sleep(random.uniform(2, 4.5))
# 这里设置最长等待时间为10秒
article_content = webdriverwait(driver, 10).until(
ec.presence_of_all_elements_located((by.xpath, "//div[@data-testid='cellinnerdiv']/div/div/article"))
)
for article in article_content:
try:
# 获取推文的网址
tweet_url = article.find_element(by.xpath, ".//time/..").get_attribute("href")
if tweet_url not in crawled_tweets_urls:
crawled_tweets_urls.append(tweet_url)
try:
# 获取用户名
username = article.find_element(by.xpath, ".//div[@data-testid='user-name']//span").text
# 获取推文内容
tweet_content = article.find_element(by.xpath, ".//div[@data-testid='tweettext']").text
# 获取发布日期
publish_date = article.find_element(by.xpath, ".//time").get_attribute('datetime')
publish_date = standardization_time(publish_date)
# 获取评论数
comments = article.find_element(by.xpath,
".//div[@data-testid='reply']//span").text if article.find_elements(
by.xpath, ".//div[@data-testid='reply']//span") else "0"
comments = standardization_cout(comments)
# 获取转发数
retweets = article.find_element(by.xpath,
".//div[@data-testid='retweet']//span").text if article.find_elements(
by.xpath, ".//div[@data-testid='retweet']//span") else "0"
retweets = standardization_cout(retweets)
# 获取点赞数
likes = article.find_element(by.xpath,
".//div[@data-testid='like']//span").text if article.find_elements(
by.xpath, ".//div[@data-testid='like']//span") else "0"
likes = standardization_cout(likes)
# 获取查看次数(如果可用)
views = article.find_element(by.xpath,
".//a[contains(@href,'analytics')]//span").text if article.find_elements(
by.xpath, ".//a[contains(@href,'analytics')]//span") else "0"
views = standardization_cout(views)
#关键词
keyword = keyword
#爬取时间
get_time = datetime.now().strftime("%y-%m-%d %h:%m:%s")
# 抓取数据并存储到字典
tweet_data = {
"tweet_url": tweet_url,
"username": username,
"tweet_content": tweet_content,
"publish_date": publish_date,
"comments": comments if comments else "0",
"retweets": retweets if retweets else "0",
"likes": likes if likes else "0",
"views": views if views else "0",
"get_time": get_time,
"keyword": keyword
}
# 数据库连接配置
config = {
'host': 'localhost',
'user': 'root',
'password': 'root',
'database': 'weibo',
'charset': 'utf8mb4',
}
# 建立数据库连接
mysql_db = pymysql.connect(**config)
# 创建游标对象
cursor = mysql_db.cursor()
# 插入或更新数据的sql语句模板
insert_or_update_sql = """
insert into tweets (
tweet_url, username, tweet_content, publish_date,
comments, retweets, likes, views, get_time, keyword
) values (%(tweet_url)s, %(username)s, %(tweet_content)s, %(publish_date)s,
%(comments)s, %(retweets)s, %(likes)s, %(views)s, %(get_time)s, %(keyword)s)
on duplicate key update
username=values(username),
tweet_content=values(tweet_content),
publish_date=values(publish_date),
comments=values(comments),
retweets=values(retweets),
likes=values(likes),
views=values(views),
get_time=values(get_time),
keyword=values(keyword);
"""
# 执行插入或更新数据的sql语句
try:
cursor.execute(insert_or_update_sql, tweet_data)
mysql_db.commit()
print("data inserted or updated successfully.")
except exception as e:
mysql_db.rollback()
print(f"insert or update data error: {e}")
finally:
cursor.close()
print(
f"tweet url: {tweet_url}, username: {username}, tweet_content: {tweet_content}, date: {publish_date}, comments: {comments}, retweets: {retweets}, likes: {likes}, views: {views}, get_time: {get_time}, keyword: {keyword}")
print('-----------')
except exception as e:
print(f'提取信息时出错: {e}')
else:
continue
except exception as e:
print(f'提取信息时出错: {e}')
# 检查页面滚动高度是否有变化
new_height = driver.execute_script("return document.body.scrollheight")
if new_height == last_height:
retries += 1
print(f"第{retries}次重试...")
else:
last_height = new_height
retries = 0 # 重置重试次数
print("检测到新内容,继续爬取...")
#input("已到达页面底部或重试次数已达上限,按enter键继续...")
if __name__ == '__main__':
crawl_daily_tweets("亚伦.布什内尔","2024-02-26",2)
此代码一共可以爬取包括推文网址,推文作者,推文内容,推文点赞数,转发数,浏览数,评论数,爬取时间,推文发布时间还有关键词等10个字段。
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论