it编程 > 硬件开发 > 驱动开发

基于selenium实现的tweet的爬虫(不需要tweetAPI)

79人参与 2024-08-06 驱动开发

声明:

        本文章中所有内容仅供学习交流,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关,若有侵权,请联系我立即删除!

前言:一个简单的tweet爬虫,如果功能与期望不符还望多多见谅,如果有什么好的建议的话欢迎提出。

 需求工具:对应浏览器的selenium驱动,同时需要一个tweet账号

一、使用的库:

from selenium import webdriver
from selenium.webdriver.chrome.service import service
from webdriver_manager.chrome import chromedrivermanager
from selenium.webdriver.common.by import by
from selenium.webdriver.support.ui import webdriverwait
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.chrome.options import options
import pymysql
from twitter.util import standardization_time
from twitter.util import standardization_cout
注:此处是两个自建函数在下面附有

 二、构建目标页面网址

def crawl_daily_tweets(keywords, start_date, days):
    from datetime import datetime, timedelta

    # 将关键词字符串分割成列表
    keyword_list = keywords.split(',')

    # 转换字符串日期为datetime对象
    start = datetime.strptime(start_date, "%y-%m-%d")

    for keyword in keyword_list:
        for i in range(days):
            day = start + timedelta(days=i)
            # 对于每一天,构建一个网址
            day_str = day.strftime("%y-%m-%d")  # 转换回字符串格式
            next_day_str = (day + timedelta(days=1)).strftime("%y-%m-%d")
            website_address = f"https://twitter.com/search?q={keyword.strip()}%20until%3a{next_day_str}%20since%3a{day_str}&src=typed_query"

            # 在这里调用你的爬取函数,传入构建的网址
            accessing_web_pages(website_address,keyword)

 三、使用selenium模拟操作绕过tweetapi限制(此处两个版本)

注:不推荐使用自动登录,容易被检测封号。此次提供的两种方法都没有使用自动登录。

 1.人工获取并且添加cookies版本(需要先在网站登录并且获取自己账号的cookies并且手动添加)

def accessing_web_pages(target_url,keyword):
# 创建一个chromeoptions实例
    options = options()
    # 设置为无头模式
    options.add_argument('--headless')
    # 创建浏览器实例
    driver = webdriver.chrome(service=service(chromedrivermanager().install()))

    # 首先访问twitter的主页以设置域
    driver.get("https://twitter.com")

    # 添加cookie到浏览器实例
    cookies = {
        
    }

    # 添加cookie
    for key, value in cookies.items():
        cookie = {'name': key, 'value': value, 'domain': '.twitter.com'}
        driver.add_cookie(cookie)

    # 重新加载或访问目标网页
    driver.get(target_url)
    try:
        # 等待直到“接受cookies”按钮出现
        accept_button = webdriverwait(driver, 10).until(
            ec.element_to_be_clickable((by.xpath, "//span[contains(@class, 'css-1qaijid') and contains(text(), 'accept all cookies')]"))
        )

        # 点击页面同意cookies按钮
        accept_button.click()
    except:
        pass

    # 获取数据
    get_data(driver,keyword)

2.自动获取cookies版本(需要手动登录账号)

def accessing_web_pages(target_url,keyword):
# 创建浏览器实例
    driver = webdriver.chrome(service=service(chromedrivermanager().install()))

    # 导航到登录页面
    driver.get("https://twitter.com/login")

    # 等待足够的时间,以便手动登录
    input("请登录后按enter键继续...")

    # 登录成功后,保存cookies到一个变量
    cookies = driver.get_cookies()

    # 重用之前保存的cookies
    for cookie in cookies:
        driver.add_cookie(cookie)

    # 使用带有之前cookies的driver访问需要登录状态的页面
    driver.get(target_url)

    try:
        # 等待直到“接受cookies”按钮出现
        accept_button = webdriverwait(driver, 10).until(
            ec.element_to_be_clickable((by.xpath, "//span[contains(@class, 'css-1qaijid') and contains(text(), 'accept all cookies')]"))
        )

        # 点击页面同意cookies按钮
        accept_button.click()
    except:
        pass

    # 获取数据
    get_data(driver,keyword)

四、获取相应爬取数据(此处使用mysql进行数据储存)

         此处使用selenium模拟翻页操作进行数据爬取,同时使用random函数每次下滑距离随机,降低被网站监测的概率,通过将网页内的推文网址储存在在一个列表里,在每次下滑后再获取推文网址,然后判断是否有新内容出现。接下来,判断是否下滑至底部,此处设置五次重试次数,超过限制后,结束本页爬取。

def get_data(driver,keyword):
    import time
    import random
    from datetime import datetime
    # 数据库连接配置
    config = {
        'host': 'localhost',
        'user': 'root',
        'password': 'root',
        'database': 'weibo',
        'charset': 'utf8mb4',
    }

# 建立数据库连接
    mysql_db = pymysql.connect(**config)

    # 创建游标对象
    cursor = mysql_db.cursor()

    # 创建表的sql语句(如果还没有创建)
    create_table_sql = """
                            create table if not exists tweets (
                                tweet_url varchar(255) not null primary key,
                                username varchar(100) not null,
                                tweet_content text not null,
                                publish_date datetime,
                                comments int default 0,
                                retweets int default 0,
                                likes int default 0,
                                views int default 0,
                                get_time datetime not null,
                                keyword varchar(100)
                            );
                            """

    # 执行创建表的sql语句
    try:
        cursor.execute(create_table_sql)
        mysql_db.commit()
        print("table created successfully.")
    except exception as e:
        mysql_db.rollback()
        print(f"failed to create table: {e}")
    finally:
        cursor.close()

    crawled_tweets_urls  = []
    # 初始化变量以跟踪滚动
    last_height = driver.execute_script("return document.body.scrollheight")
    max_retries = 5  # 允许的最大重试次数
    retries = 0  # 当前重试次数
    while retries < max_retries:
        driver.execute_script("window.scrollby(0, {});".format(random.randint(200, 800)))
        time.sleep(random.uniform(2, 4.5))

        # 这里设置最长等待时间为10秒
        article_content = webdriverwait(driver, 10).until(
            ec.presence_of_all_elements_located((by.xpath, "//div[@data-testid='cellinnerdiv']/div/div/article"))
        )
        for article in article_content:
            try:
                # 获取推文的网址
                tweet_url = article.find_element(by.xpath, ".//time/..").get_attribute("href")

                if tweet_url not in crawled_tweets_urls:
                    crawled_tweets_urls.append(tweet_url)
                    try:
                        # 获取用户名
                        username = article.find_element(by.xpath, ".//div[@data-testid='user-name']//span").text

                        # 获取推文内容
                        tweet_content = article.find_element(by.xpath, ".//div[@data-testid='tweettext']").text

                        # 获取发布日期
                        publish_date = article.find_element(by.xpath, ".//time").get_attribute('datetime')
                        publish_date = standardization_time(publish_date)

                        # 获取评论数
                        comments = article.find_element(by.xpath,
                                                        ".//div[@data-testid='reply']//span").text if article.find_elements(
                            by.xpath, ".//div[@data-testid='reply']//span") else "0"
                        comments = standardization_cout(comments)

                        # 获取转发数
                        retweets = article.find_element(by.xpath,
                                                        ".//div[@data-testid='retweet']//span").text if article.find_elements(
                            by.xpath, ".//div[@data-testid='retweet']//span") else "0"
                        retweets = standardization_cout(retweets)

                        # 获取点赞数
                        likes = article.find_element(by.xpath,
                                                     ".//div[@data-testid='like']//span").text if article.find_elements(
                            by.xpath, ".//div[@data-testid='like']//span") else "0"
                        likes = standardization_cout(likes)

                        # 获取查看次数(如果可用)
                        views = article.find_element(by.xpath,
                                                     ".//a[contains(@href,'analytics')]//span").text if article.find_elements(
                            by.xpath, ".//a[contains(@href,'analytics')]//span") else "0"
                        views = standardization_cout(views)

                        #关键词
                        keyword = keyword

                        #爬取时间
                        get_time = datetime.now().strftime("%y-%m-%d %h:%m:%s")

                        # 抓取数据并存储到字典
                        tweet_data = {
                            "tweet_url": tweet_url,
                            "username": username,
                            "tweet_content": tweet_content,
                            "publish_date": publish_date,
                            "comments": comments if comments else "0",
                            "retweets": retweets if retweets else "0",
                            "likes": likes if likes else "0",
                            "views": views if views else "0",
                            "get_time": get_time,
                            "keyword": keyword
                        }

                        # 数据库连接配置
                        config = {
                            'host': 'localhost',
                            'user': 'root',
                            'password': 'root',
                            'database': 'weibo',
                            'charset': 'utf8mb4',
                        }
 # 建立数据库连接
                        mysql_db = pymysql.connect(**config)

                        # 创建游标对象
                        cursor = mysql_db.cursor()

                        # 插入或更新数据的sql语句模板
                        insert_or_update_sql = """
                        insert into tweets (
                            tweet_url, username, tweet_content, publish_date, 
                            comments, retweets, likes, views, get_time, keyword
                        ) values (%(tweet_url)s, %(username)s, %(tweet_content)s, %(publish_date)s, 
                        %(comments)s, %(retweets)s, %(likes)s, %(views)s, %(get_time)s, %(keyword)s)
                        on duplicate key update 
                            username=values(username), 
                            tweet_content=values(tweet_content), 
                            publish_date=values(publish_date), 
                            comments=values(comments), 
                            retweets=values(retweets), 
                            likes=values(likes), 
                            views=values(views), 
                            get_time=values(get_time), 
                            keyword=values(keyword);
                        """

                        # 执行插入或更新数据的sql语句
                        try:
                            cursor.execute(insert_or_update_sql, tweet_data)
                            mysql_db.commit()
                            print("data inserted or updated successfully.")
                        except exception as e:
                            mysql_db.rollback()
                            print(f"insert or update data error: {e}")
                        finally:
                            cursor.close()

                        print(
                            f"tweet url: {tweet_url}, username: {username}, tweet_content: {tweet_content}, date: {publish_date}, comments: {comments}, retweets: {retweets}, likes: {likes}, views: {views}, get_time: {get_time}, keyword: {keyword}")
                        print('-----------')
                    except exception as e:
                        print(f'提取信息时出错: {e}')
                else:
                    continue
            except exception as e:
                print(f'提取信息时出错: {e}')

        # 检查页面滚动高度是否有变化
        new_height = driver.execute_script("return document.body.scrollheight")
        if new_height == last_height:
            retries += 1
            print(f"第{retries}次重试...")
        else:
            last_height = new_height
            retries = 0  # 重置重试次数
            print("检测到新内容,继续爬取...")

    #input("已到达页面底部或重试次数已达上限,按enter键继续...")

五、调用函数

if __name__ == '__main__':
    crawl_daily_tweets("亚伦.布什内尔","2024-02-26",2)

此处参数分别是关键词(此处keyword可以是多个以","分隔),爬取开始时间,爬取天数

六、时间标准化函数

def standardization_time(publish_date):
    # 将日期转换为标准格式
    from datetime import datetime, timedelta

    # 将字符串转换为datetime对象
    utc_dt = datetime.strptime(publish_date, "%y-%m-%dt%h:%m:%s.%fz")

    # 由于中国是utc+8,我们将utc时间加上8小时得到中国时间
    # 注意:这里简单地加上时差,如果需要考虑夏令时等复杂情况,使用pytz库会更准确
    china_dt = utc_dt + timedelta(hours=8)

    # 将日期格式化为所需格式
    formatted_date = china_dt.strftime("%y-%m-%d %h:%m:%s")
    return formatted_date

def standardization_cout(str):
    try:
        # 移除逗号
        views_str_cleaned = str.replace(',', '')

        # 将清理后的字符串转换为整数
        views_int = int(views_str_cleaned)
        return views_int
    except:
        pass

         第一个函数的作用是将爬取的时间转换为国区时间,第二个函数的作用是将获取的评论数,点赞数等转换为数字格式,使可以符合mysql里表格的字段要求。

七、整体代码

from selenium import webdriver
from selenium.webdriver.chrome.service import service
from webdriver_manager.chrome import chromedrivermanager
from selenium.webdriver.common.by import by
from selenium.webdriver.support.ui import webdriverwait
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.chrome.options import options
from twitter.util import standardization_time
from twitter.util import standardization_cout
import pymysql

def crawl_daily_tweets(keywords, start_date, days):
    from datetime import datetime, timedelta

    # 将关键词字符串分割成列表
    keyword_list = keywords.split(',')

    # 转换字符串日期为datetime对象
    start = datetime.strptime(start_date, "%y-%m-%d")

    for keyword in keyword_list:
        for i in range(days):
            day = start + timedelta(days=i)
            # 对于每一天,构建一个网址
            day_str = day.strftime("%y-%m-%d")  # 转换回字符串格式
            next_day_str = (day + timedelta(days=1)).strftime("%y-%m-%d")
            website_address = f"https://twitter.com/search?q={keyword.strip()}%20until%3a{next_day_str}%20since%3a{day_str}&src=typed_query"

            # 在这里调用你的爬取函数,传入构建的网址
            accessing_web_pages(website_address,keyword)

def accessing_web_pages(target_url,keyword):
    # # 创建一个chromeoptions实例
    # options = options()
    # # 设置为无头模式
    # options.add_argument('--headless')
    # # 创建浏览器实例
    # driver = webdriver.chrome(service=service(chromedrivermanager().install()))
    #
    # # 首先访问twitter的主页以设置域
    # driver.get("https://twitter.com")
    #
    # # 添加cookie到浏览器实例
    # cookies = {
    #     
    # }
    #
    # # 添加cookie
    # for key, value in cookies.items():
    #     cookie = {'name': key, 'value': value, 'domain': '.twitter.com'}
    #     driver.add_cookie(cookie)
    #
    # # 重新加载或访问目标网页
    # driver.get(target_url)

    # 创建浏览器实例
    driver = webdriver.chrome(service=service(chromedrivermanager().install()))

    # 导航到登录页面
    driver.get("https://twitter.com/login")

    # 等待足够的时间,以便手动登录
    input("请登录后按enter键继续...")

    # 登录成功后,保存cookies到一个变量
    cookies = driver.get_cookies()

    # 重用之前保存的cookies
    for cookie in cookies:
        driver.add_cookie(cookie)

    # 使用带有之前cookies的driver访问需要登录状态的页面
    driver.get(target_url)

    try:
        # 等待直到“接受cookies”按钮出现
        accept_button = webdriverwait(driver, 10).until(
            ec.element_to_be_clickable((by.xpath, "//span[contains(@class, 'css-1qaijid') and contains(text(), 'accept all cookies')]"))
        )

        # 点击页面同意cookies按钮
        accept_button.click()
    except:
        pass

    # 获取数据
    get_data(driver,keyword)

def get_data(driver,keyword):
    import time
    import random
    from datetime import datetime
    # 数据库连接配置
    config = {
        'host': 'localhost',
        'user': 'root',
        'password': 'root',
        'database': 'weibo',
        'charset': 'utf8mb4',
    }
 
    # 建立数据库连接
    mysql_db = pymysql.connect(**config)

    # 创建游标对象
    cursor = mysql_db.cursor()

    # 创建表的sql语句(如果还没有创建)
    create_table_sql = """
                            create table if not exists tweets (
                                tweet_url varchar(255) not null primary key,
                                username varchar(100) not null,
                                tweet_content text not null,
                                publish_date datetime,
                                comments int default 0,
                                retweets int default 0,
                                likes int default 0,
                                views int default 0,
                                get_time datetime not null,
                                keyword varchar(100)
                            );
                            """

    # 执行创建表的sql语句
    try:
        cursor.execute(create_table_sql)
        mysql_db.commit()
        print("table created successfully.")
    except exception as e:
        mysql_db.rollback()
        print(f"failed to create table: {e}")
    finally:
        cursor.close()

    crawled_tweets_urls  = []
    # 初始化变量以跟踪滚动
    last_height = driver.execute_script("return document.body.scrollheight")
    max_retries = 5  # 允许的最大重试次数
    retries = 0  # 当前重试次数
    while retries < max_retries:
        driver.execute_script("window.scrollby(0, {});".format(random.randint(200, 800)))
        time.sleep(random.uniform(2, 4.5))

        # 这里设置最长等待时间为10秒
        article_content = webdriverwait(driver, 10).until(
            ec.presence_of_all_elements_located((by.xpath, "//div[@data-testid='cellinnerdiv']/div/div/article"))
        )
        for article in article_content:
            try:
                # 获取推文的网址
                tweet_url = article.find_element(by.xpath, ".//time/..").get_attribute("href")

                if tweet_url not in crawled_tweets_urls:
                    crawled_tweets_urls.append(tweet_url)
                    try:
                        # 获取用户名
                        username = article.find_element(by.xpath, ".//div[@data-testid='user-name']//span").text

                        # 获取推文内容
                        tweet_content = article.find_element(by.xpath, ".//div[@data-testid='tweettext']").text

                        # 获取发布日期
                        publish_date = article.find_element(by.xpath, ".//time").get_attribute('datetime')
                        publish_date = standardization_time(publish_date)

                        # 获取评论数
                        comments = article.find_element(by.xpath,
                                                        ".//div[@data-testid='reply']//span").text if article.find_elements(
                            by.xpath, ".//div[@data-testid='reply']//span") else "0"
                        comments = standardization_cout(comments)

                        # 获取转发数
                        retweets = article.find_element(by.xpath,
                                                        ".//div[@data-testid='retweet']//span").text if article.find_elements(
                            by.xpath, ".//div[@data-testid='retweet']//span") else "0"
                        retweets = standardization_cout(retweets)

                        # 获取点赞数
                        likes = article.find_element(by.xpath,
                                                     ".//div[@data-testid='like']//span").text if article.find_elements(
                            by.xpath, ".//div[@data-testid='like']//span") else "0"
                        likes = standardization_cout(likes)

                        # 获取查看次数(如果可用)
                        views = article.find_element(by.xpath,
                                                     ".//a[contains(@href,'analytics')]//span").text if article.find_elements(
                            by.xpath, ".//a[contains(@href,'analytics')]//span") else "0"
                        views = standardization_cout(views)

                        #关键词
                        keyword = keyword

                        #爬取时间
                        get_time = datetime.now().strftime("%y-%m-%d %h:%m:%s")

                        # 抓取数据并存储到字典
                        tweet_data = {
                            "tweet_url": tweet_url,
                            "username": username,
                            "tweet_content": tweet_content,
                            "publish_date": publish_date,
                            "comments": comments if comments else "0",
                            "retweets": retweets if retweets else "0",
                            "likes": likes if likes else "0",
                            "views": views if views else "0",
                            "get_time": get_time,
                            "keyword": keyword
                        }

                        # 数据库连接配置
                        config = {
                            'host': 'localhost',
                            'user': 'root',
                            'password': 'root',
                            'database': 'weibo',
                            'charset': 'utf8mb4',
                        }
                        

                        # 建立数据库连接
                        mysql_db = pymysql.connect(**config)

                        # 创建游标对象
                        cursor = mysql_db.cursor()

                        # 插入或更新数据的sql语句模板
                        insert_or_update_sql = """
                        insert into tweets (
                            tweet_url, username, tweet_content, publish_date, 
                            comments, retweets, likes, views, get_time, keyword
                        ) values (%(tweet_url)s, %(username)s, %(tweet_content)s, %(publish_date)s, 
                        %(comments)s, %(retweets)s, %(likes)s, %(views)s, %(get_time)s, %(keyword)s)
                        on duplicate key update 
                            username=values(username), 
                            tweet_content=values(tweet_content), 
                            publish_date=values(publish_date), 
                            comments=values(comments), 
                            retweets=values(retweets), 
                            likes=values(likes), 
                            views=values(views), 
                            get_time=values(get_time), 
                            keyword=values(keyword);
                        """

                        # 执行插入或更新数据的sql语句
                        try:
                            cursor.execute(insert_or_update_sql, tweet_data)
                            mysql_db.commit()
                            print("data inserted or updated successfully.")
                        except exception as e:
                            mysql_db.rollback()
                            print(f"insert or update data error: {e}")
                        finally:
                            cursor.close()

                        print(
                            f"tweet url: {tweet_url}, username: {username}, tweet_content: {tweet_content}, date: {publish_date}, comments: {comments}, retweets: {retweets}, likes: {likes}, views: {views}, get_time: {get_time}, keyword: {keyword}")
                        print('-----------')
                    except exception as e:
                        print(f'提取信息时出错: {e}')
                else:
                    continue
            except exception as e:
                print(f'提取信息时出错: {e}')

        # 检查页面滚动高度是否有变化
        new_height = driver.execute_script("return document.body.scrollheight")
        if new_height == last_height:
            retries += 1
            print(f"第{retries}次重试...")
        else:
            last_height = new_height
            retries = 0  # 重置重试次数
            print("检测到新内容,继续爬取...")

    #input("已到达页面底部或重试次数已达上限,按enter键继续...")


if __name__ == '__main__':
    crawl_daily_tweets("亚伦.布什内尔","2024-02-26",2)

爬取数据字段:

        此代码一共可以爬取包括推文网址,推文作者,推文内容,推文点赞数,转发数,浏览数,评论数,爬取时间,推文发布时间还有关键词等10个字段。

(0)
打赏 微信扫一扫 微信扫一扫

您想发表意见!!点此发布评论

推荐阅读

Scrum的3355?

08-06

【经验】项目管理:瀑布式、Scrum

08-06

查阅相关资料,了解什么是scrum中的3355?

08-06

scrum项目管理系统,免费scrum管理工具

08-06

项目经理考哪个证书 CSM、PSM、PMP (Scrum)

08-06

敏捷开发之Scrum

08-06

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论