75人参与 • 2025-01-08 • mongodb
#!/usr/local/python3/bin/python3 import configparser,logging.config,sys,os,subprocess import pymongo,ast # from pymongo import mongoclient from datetime import datetime,timedelta from urllib import parse def init_mongodb(mongodbauth): if mongodb_auth: username = parse.quote_plus(mongodb_user) password = parse.quote_plus(mongodb_passwd) connpasswd = "mongodb://" + username + ":" + password + "@" + mongodb_ip + ":" + mongodb_port + "/" try: clients = pymongo.mongoclient(connpasswd) logger.info("init mongodb conn: " + connpasswd) return clients except exception as e: logger.info("use mongodb user pass conn err: " + str(e)) return false else: try: clients = pymongo.mongoclient(mongodb_ip, int(mongodb_port)) logger.info("init mongodb conn: " + mongodb_ip +":" +mongodb_port) return clients except exception as e: logger.info("use mongodb user pass conn err: " + str(e)) return false #查看出全部db def get_mongodb_dbname(): db_names_list = [] db_names = mongo_client.list_database_names() for db_name in db_names: db_names_list.append(db_name) for filter_dbname in need_filter_dbname_list: if filter_dbname in db_names_list: db_names_list.remove(filter_dbname) logger.info("delete need filter dbname: " + filter_dbname) # logger.info("get all db_name: " +str(db_names_list)) return db_names_list #查询出db中全部表 def get_mongodb_tables(entid): db_collections_list = [] db=mongo_client[entid] collections = db.list_collection_names() for collection in collections: db_collections_list.append(collection) logger.debug("get " + entid + " all collections: " +str(db_collections_list)) return db_collections_list #查询集合中索引索引和是否分片 def get_index_key_tables(entid,collection_name): index_list = [] formatted_results = [] db=mongo_client[entid] collection=db[collection_name] indexes = collection.list_indexes() ns_name = entid + "." + collection_name for result in indexes: formatted_result = {k.upper(): v for k, v in result.items()} each_key = formatted_result.get("key") ns_name = formatted_result.get("ns") ok_index = {key: value for key, value in each_key.items()} index_list.append(ok_index) index_list = result = [d for d in index_list if not (isinstance(d, dict) and '_id' in d and d['_id'] == 1)] collection_stats = db.command("collstats", collection_name) collection_sharded = collection_stats.get("sharded", false) if len(index_list) != 0: logger.debug("get collection " + ns_name + " index: " +str(index_list)) #logger.info("get now in the collection " + ns_name + " sharded status: " +str(collection_sharded)) return index_list,collection_sharded #创建集合索引 def craete_index(entid,collection_name,index): db=mongo_client[entid] collection=db[collection_name] logger.info("need craete index: " + entid +"."+collection_name + " : "+ str(index)) # index = (list(index.keys())[0], list(index.values())[0]) index = [(k, v) for k, v in index.items()] result = collection.create_index(index) logger.info("mongodb " +entid +"."+collection_name + " create index return msg: " + str(result) ) #查看对应dbname是否已经是shards,弃用 def is_database_sharded(database_name): db = mongo_client["admin"] sharded_databases = db.command("listshards")["shards"] for shard in sharded_databases: if database_name in db.command("listdatabases")["databases"]: return true return false #创建分片索引片键 def create_sharded_func(entid, collection_name, shard_key): db = mongo_client["admin"] collection_path = '{}.{}'.format(entid, collection_name) logger.info("need craete sharded key : " + collection_path + " : " + str(shard_key)) sharding_colunm,sharding_type = "","" for key, value in shard_key.items(): sharding_colunm= key sharding_type = value try: db.command('enablesharding', entid) except exception as e: logger.error("create dbname sharded key error: return: " + str(e)) try: result = db.command('shardcollection', collection_path,key = {sharding_colunm:sharding_type}) logger.info(entid + "." + collection_path + " create sharded key return: " + str(result)) except exception as e: logger.error("create sharded key error: return: " + str(e)) #读取文件获取对应索引和片键key信息 def read_file_index(index_file): index_list = [] shard_list = [] with open(index_file, 'r') as f: for line in f.readlines(): line = line.replace(" ", "") #通过mongodbshard: 来区分那个片键的可以,写 # print(line) if "mongodbshard:" not in line: table, key_str = line.strip().split("=") key = ast.literal_eval(key_str) index_list.append({table: key}) else: shard_key_str = line.strip().split("mongodbshard:")[1] shard_key_str = ast.literal_eval(shard_key_str) shard_list.append(shard_key_str) return index_list,shard_list #获取多少天前的时间戳 def get_timestamp_days_ago(get_days): # 获取当前日期和时间 now = datetime.now() # 减去30天 date_30_days_ago = now - timedelta(days=int(get_days)) # 将结果转换为当天的整点00:00:00 date_start_of_day = date_30_days_ago.replace(hour=0, minute=0, second=0, microsecond=0) # 将结果转换为时间戳 timestamp = int(date_start_of_day .timestamp()) return timestamp #判断字符串类型和长度对应返回需要删除的时间字段值 def if_string_type(data_stamp): del_timestamp = "" get_need_del_timestamp = get_timestamp_days_ago(int(del_day)) if isinstance(data_stamp, str) and len(data_stamp) == 10: del_timestamp = str(get_need_del_timestamp) if isinstance(data_stamp, str) and len(data_stamp) == 13: del_timestamp = str(get_need_del_timestamp) + "000" if isinstance(data_stamp, int) and len(str(data_stamp)) == 10: del_timestamp = get_need_del_timestamp if isinstance(data_stamp, int) and len(str(data_stamp)) == 13: del_timestamp = int(get_need_del_timestamp) * 1000 return del_timestamp #获取该集合中一条数据 def get_one_data(entid,collection_name): db=mongo_client[entid] collection=db[collection_name] filter_conditions_key = str(need_del_table_field) result = collection.find_one({}, {**{filter_conditions_key: 1}, '_id': 0}) if result and filter_conditions_key in result: start_time_value = result.get(filter_conditions_key) logger.debug("get "+ entid + "." + collection_name + " corresponding " +filter_conditions_key + " field value: " + str(start_time_value) ) return start_time_value else: # logger.info("no " +filter_conditions_key + " field found in the document. return: " + str(result) ) return false # 按照日期删除该集合中历史数据 def del_data(entid,collection_name,get_del_timestamp): db=mongo_client[entid] collection=db[collection_name] filter_conditions_key = str(need_del_table_field) filter_conditions_value = get_del_timestamp try: result = collection.delete_many({filter_conditions_key: {"$lt": filter_conditions_value}}) logger.info(entid +" run sql: db"+"."+collection_name+".remove({"+filter_conditions_key+ ":"+"{$lt:"+str(filter_conditions_value) +"})") if result.deleted_count > 0: logger.info("by date delete " + str(entid) + "." + collection_name + " less than " + str(get_del_timestamp) + " del document count: " + str(result.deleted_count)) except exception as e: logger.error("error occurred while deleting documents: " + str(e)) # 删除该集合中全部历史数据 def del_all_data(entid,collection_name): db=mongo_client[entid] collection=db[collection_name] try: result = collection.delete_many({}) if result.deleted_count > 0: logger.info(entid + " run sql: db"+"."+collection_name+".remove({})") logger.info(entid + "." + collection_name + " del all document count: " + str(result.deleted_count)) except exception as e: logger.info(entid + "." + collection_name + " del all document error: " + str(result) ) # 备份数据 def dump_mongodb_data(dbname,table,not_quiet_dump,del_time): status_info = ["1"] if is_del_bakcup_data: if os.path.exists(mongodump_command_path): run_status = " && echo $?" run_commnd = "" if not_quiet_dump: if mongodb_auth: #run_commnd = mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " --authenticationdatabase=" +mongodb_auth_db + " -u " + mongodb_user + " -p " + mongodb_passwd + " -d " + dbname + " -c " + table + " -q '{" + need_del_table_field + ": {" + + "}}'" + " -o " + bakcup_dir_path run_command = f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} --authenticationdatabase={mongodb_auth_db} -u {mongodb_user} -p {mongodb_passwd} -d {dbname} -c {table} -q '{{\"{need_del_table_field}\": {{\"$lt\": \"{del_time}\"}}}}' -o {bakcup_dir_path}" else: # run_commnd = mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " -d " + dbname + " -c " + table + " -o " + bakcup_dir_path run_commnd = f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} -d {dbname} -c {table} -q '{{\"{need_del_table_field}\": {{\"$lt\": \"{del_time}\"}}}}' -o {bakcup_dir_path}" else: if mongodb_auth: # run_commnd = mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " --authenticationdatabase=" +mongodb_auth_db + " -u " + mongodb_user + " -p " + mongodb_passwd + " -d " + dbname + " -c " + table + " -o " + bakcup_dir_path run_command = f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} --authenticationdatabase={mongodb_auth_db} -u {mongodb_user} -p {mongodb_passwd} -d {dbname} -c {table} -o {bakcup_dir_path}" else: # run_commnd = mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " -d " + dbname + " -c " + table + " -o " + bakcup_dir_path run_commnd = f"{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} -d {dbname} -c {table} -o {bakcup_dir_path}" logger.info("run command: " + run_commnd) try: msg = os.popen(run_commnd + run_status) status_info = [line.strip() for line in msg.readlines()] logger.info("mongodump command result: " + str(status_info)) except exception as e: logger.error("mongodump command error: " + str(e)) else: logger.info("mongodump command file not exists ," + mongodump_command_path) else: logger.debug("config file not set is_del_bakcup_data = true, not dump data") return status_info if __name__=="__main__": cfgpath = "./cfg/config.ini" conf = configparser.configparser() conf.read(cfgpath) mongodb_ip = conf.get("main", "mongodb_ip") mongodb_port = conf.get("main", "mongodb_port") mongodb_auth = conf.getboolean("main", "mongodb_auth") mongodb_user = conf.get("main", "mongodb_user") mongodb_passwd = conf.get("main", "mongodb_passwd") mongodb_auth_db = conf.get("main", "mongodb_auth_db") need_filter_dbname = conf.get("main", "need_filter_dbname") is_del_bakcup_data = conf.getboolean("main", "is_del_bakcup_data") bakcup_dir_path = conf.get("main", "bakcup_dir_path") mongodump_command_path = conf.get("main", "mongodump_command_path") del_day = conf.get("main", "del_day") need_del_table_field = conf.get("main", "need_del_table_field") need_del_table_list = conf.get("main", "need_del_table_list") need_del_table_list = [item for item in need_del_table_list.split(",") if item != ''] need_del_null_table_list = conf.get("main", "need_del_null_table_list") need_del_null_table_list = [item for item in need_del_null_table_list.split(",") if item != ''] auth_get_entid = conf.getboolean("main", "auth_get_entid") need_filter_dbname_list = [item for item in need_filter_dbname.split(",") if item != ''] #获取配置项 all_ent_id = conf.get("main", "ent_id") get_dbname_list = all_ent_id.split(",") logging.config.fileconfig("./cfg/logger.conf") logger = logging.getlogger("rotatfile") # 初始化 mongodb mongo_client = init_mongodb(mongodb_auth) if mongo_client: logger.info("mongodb init successfully") else: logger.error("failed to initialize mongodb") sys.exit(10) if auth_get_entid: get_dbname_list = get_mongodb_dbname() logger.info("get all dbname list: " + str(get_dbname_list)) else: logger.info("file get dbname list: " + str(get_dbname_list)) for dbname in get_dbname_list: get_end_all_table = get_mongodb_tables(dbname) for table in need_del_table_list: get_one_data_mes = get_one_data(dbname,table) if table in get_end_all_table: get_index_key_tables(dbname,table) else: logger.error(dbname + " not have table: " + table) continue # break #删除按照日期数据 if get_one_data_mes: get_del_timestmap = if_string_type(get_one_data_mes) if dump_mongodb_data(dbname,table,true,get_del_timestmap)[0] == '0' or is_del_bakcup_data == false: if get_del_timestmap: del_data(dbname,table,get_del_timestmap) else: logger.error("get del timestmap fail") else: if is_del_bakcup_data == false: logger.error("is_del_bakcup_data seting false, dump mongodb data fail") else: logger.error("dump mongodb data fail, but is del backup data") for null_table in need_del_null_table_list: if dump_mongodb_data(dbname,null_table,false,"1")[0] == '0' or is_del_bakcup_data == false: if null_table in get_end_all_table: #删除全部历史数据 del_all_data(dbname,null_table) else: logger.error( dbname + " not have table: " + null_table) else: if is_del_bakcup_data == false: logger.error("is_del_bakcup_data seting false, dump mongodb data fail") else: logger.error("dump mongodb data fail, but is del backup data") mongo_client.close() logger.info("mongodb closed")
[default] mongodb_ip = 10.130.47.197 mongodb_port = 40000 mongodb_auth = false mongodb_user = admin mongodb_passwd = test@123 mongodb_auth_db = admin #从全部dbname中进行过滤不需要处理的dbname,使用逗号分割 need_filter_dbname = local,config,admin #指定需要按照日期删除的集合,使用逗号分割 need_del_table_list = new_r_ags_e_back,call_detail_back #指定需要按照日期删除的集合字段过滤 need_del_table_field = start_time #指定清空删除的集合,使用逗号分割 need_del_null_table_list = call_duration_cache,duration_cache [main] #是否自动获取对应mongodb中全部dbname auth_get_entid = false #从配置文件中获取dbname ent_id = 20241205,20250107 #需要删除多少天以前的数据 del_day = 97 #是否需要备份数据 is_del_bakcup_data = false #备份目录 bakcup_dir_path = ./data #备份命令路径 mongodump_command_path = /home/devops/python/mongodb_del_history/mongodump
[devops@db1 mongodb_del_history]$ tar xf mongodb_del_history.tar.gz [devops@db1 mongodb_del_history]$ cd mongodb_del_history [devops@db1 mongodb_del_history]$ nohup ./del_history_data & 2025-01-06 14:15:01 139749303605056 del_history_data.py:24 info init mongodb conn: 10.130.47.197:40000 2025-01-06 14:15:01 139749303605056 del_history_data.py:303 info mongodb init successfully 2025-01-06 14:15:01 139749303605056 del_history_data.py:39 info delete need filter dbname: local 2025-01-06 14:15:01 139749303605056 del_history_data.py:310 info get all dbname list: ['0103290010', '0103290012', '0103290013', '0103290015'] 2025-01-06 14:15:01 139749303605056 del_history_data.py:321 error 0103290010 not have table: jhk_task_status 2025-01-06 14:15:01 139749303605056 del_history_data.py:321 error 0103290010 not have table: sd_call_detail_back 2025-01-06 14:15:01 139749303605056 del_history_data.py:229 info run command: /home/devops/python/mongodb_del_history/mongodump -h 10.130.47.197:40000 -d 0103290010 -c call_duration_cache -o ./data 2025-01-06 14:15:01 139749303605056 del_history_data.py:233 info mongodump command result: ['0'] 2025-01-06 14:15:01 139749303605056 del_history_data.py:229 info run command: /home/devops/python/mongodb_del_history/mongodump -h 10.130.47.197:40000 -d 0103290010 -c duration_cache -o ./data 2025-01-06 14:15:01 139749303605056 del_history_data.py:233 info mongodump command result: ['0'] 2025-01-06 14:15:01 139749303605056 del_history_data.py:347 info mongodb closed
wget https://zhao138969.com/linuxpackage/python/del_history_data
到此这篇关于批量清理mongodb历史数据的方法详解的文章就介绍到这了,更多相关清理mongodb历史数据内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论