it编程 > 数据库 > mongodb

使用Locust对MongoDB进行负载测试的操作步骤

91人参与 2025-01-09 mongodb

1.安装环境

pip install pymongo locust

2.设置测试环境

开启mongodb服务

打开navicat,新建mongodb连接

新建test数据库和sample集合

3.编写脚本

load_mongo.py

# coding=utf-8
from locust import user, task, between, events
from pymongo import mongoclient
import random
import string
import logging
 
# 设置日志
logging.basicconfig(level=logging.info)
logger = logging.getlogger("mongodb load test")
 
# mongodb连接配置
mongo_uri = "mongodb://admin:admin@localhost:27017/?authsource=admin"
database_name = "test"
collection_name = "sample"
 
 
# 自定义mongodb客户端
class mongodbclient:
    def __init__(self):
        try:
            self.client = mongoclient(mongo_uri)
            self.db = self.client[database_name]
            self.collection = self.db[collection_name]
            logger.info("mongodb client initialized successfully.")
        except exception as e:
            logger.error(f"failed to connect to mongodb: {e}")
            self.client = none
 
    def insert_document(self, document):
        if self.collection is not none:
            self.collection.insert_one(document)
            logger.info("document inserted successfully.")
        else:
            logger.error("mongodb collection is not available.")
 
    def close_connection(self):
        if self.client is not none:
            self.client.close()
            logger.info("mongodb client connection closed.")
 
 
# locust用户类
class mongodbuser(user):
    wait_time = between(1, 2)
    mongo_client = none
 
    @staticmethod
    def initialize_mongo_client():
        if mongodbuser.mongo_client is none:
            logger.info("initializing mongodb client...")
            mongodbuser.mongo_client = mongodbclient()
 
    @staticmethod
    def cleanup_mongo_client():
        if mongodbuser.mongo_client:
            mongodbuser.mongo_client.close_connection()
            mongodbuser.mongo_client = none
            logger.info("mongodb client cleaned up.")
 
    # 监听测试开始
    @staticmethod
    @events.test_start.add_listener
    def on_test_start(environment, **kwargs):
        mongodbuser.initialize_mongo_client()
 
    # 监听测试结束
    @staticmethod
    @events.test_stop.add_listener
    def on_test_stop(environment, **kwargs):
        mongodbuser.cleanup_mongo_client()
 
    @task(2)
    def insert_data(self):
        """模拟插入数据的任务"""
        if mongodbuser.mongo_client is none:
            logger.error("mongodb client is not initialized.")
            return
        document = {
            "name": ''.join(random.choices(string.ascii_letters, k=10)),
            "age": random.randint(18, 65),
            "address": ''.join(random.choices(string.ascii_letters + string.digits, k=20)),
        }
        mongodbuser.mongo_client.insert_document(document)
        logger.info(f"inserted document: {document}")

4.运行测试

打开终端执行命令

locust -f load_mongo.py --headless -u 5 -r 1 -t 5s

测试结果

打开navicat查看sample表可以看到数据增多

以上就是使用locust对mongodb进行负载测试的操作步骤的详细内容,更多关于locust对mongodb负载测试的资料请关注代码网其它相关文章!

(0)
打赏 微信扫一扫 微信扫一扫

您想发表意见!!点此发布评论

推荐阅读

mongodb数据迁移详细步骤(亲测成功!)

01-08

批量清理mongodb历史数据的方法详解

01-08

MongoDB配置用户名和密码的操作步骤

12-12

基于 MongoTemplate实现MongoDB的复杂查询功能

12-05

MongoDB 的批量查找符号的方法

02-15

如何通过MongoDB Atlas 实现语义搜索与 RAG(迈向AI的搜索机制)

11-16

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论