python开启多进程(进程池)获取MongoDB数据库数据且数据量大的表单独开启终端命令下载
日常代码记录import os,timeimport pandas as pdfrom pymongo import MongoClientfrom multiprocessing import Pool,Managerclass JournalCloud():'''下载数据'''def __init__(self,mongo_path,save_dir_path,date,prefix='XX'
·
日常代码记录
import os,time
import pandas as pd
from pymongo import MongoClient
from multiprocessing import Pool,Manager
class JournalCloud():
'''
下载数据
'''
def __init__(self,mongo_path,save_dir_path,date,prefix='XX'):
self.mongo_path = mongo_path
self.save_dir_path = save_dir_path
self.prefix = prefix
self.date = date
def _get_mongo(self):
mongo_login = pd.read_csv(self.mongo_path)
return mongo_login
def login(self):
mongo_login = self._get_mongo
client = MongoClient(host = ,post = )
my_db = client[]
my_db.authenticate('账号','密码')
return my_db
def get_total_collection(self):
filter_string = self.prefix + self.date
filter = {"name":{"$regex":filter_string}}
my_db = self.login()
collection_names = my_db.list_collection_names(filter = filter)
print(len(collection_names))
return collection_names
def collection_put_quene(self,collection_names,q1):
for collection in collection_names:
q1.put(collection)
def get_collection_info(self,q1,q2):
collection = q1.get()
my_db = self.login()
table = my_db[collection]
try:
file_name = collection.replace("字符串的前缀","")
file_name = file_name.replace('/','-')
file_name = file_name + '.csv'
file_path = os.path.join(self.save_dir_path,file_name)
if table.count() < 阈值:
data = table.find({},{'_id':0,'A':1,'B':1})
if not os.path.exists(file_path):
pd.DataFrame(data).to_csv(file_path,index=False)
elif os.path.exists(file_path):
file_name = file_name + "1.csv"#文件夹中有重名的,加上1
file_path = os.path.join(self.save_dir_path,file_name)
pd.DataFrame(data).to_csv(file_path,index=False)
else:
q2.put(collection)
except Exception as e:
print(e)
q2.put(collection)
finally:
print('倒数{}个完成'.format(q1.size()))
def run(self):
q1 = Manager.Queen()
q2 = Manager.Queen()
pool = Pool(6)
collection_names = self.get_total_collection
self.collection_put_quene(collection_names,q1)
q1_size = q1.qsize()
print(q1_size)
for i in range(q1_size):
pool.apply_async(func = self.get_collection_info,args=(q1,q2,))
pool.close()
pool.join()
no_write_table = [q2.get() for i in range(q2.qsize())]
print(no_write_table)
return no_write_table
class TerminalDownload(object):
"""docstring for TerminalDownload
单个表数据量大于设定的阈值,单独用终端命令行下载
"""
def __init__(self, commend_path,list_collection):
self.commend_path = commend_path
self.list_collection = list_collection
self.commend = self.get_commend()
def get_commend(self):
with open(self.commend_path,encoding = 'utf_8') as f:
return f.read()
def execute_commend(self,q3):
collection = q3.ghet()
complete = collection.replace('XX',collection)
collection = collection.replace('字符串的前缀','')
complete_commend = complete_commend.replace('XX',collection)
os.system('start abd /k' + complete_commend)
def run(self):
q3 = Manager.Queue()
pool = Pool(3)
self.collection_put_queue(q3)
for i in range(q3.qsize()):
pool.apply_async(func = self.execute_commend,args=(q3,))
pool.close()
pool.join()
if __name__ == '__main__':
start = time.time()
mongo_path = r'xxxxxxxxxxxxx.csv'
date = '2020-12-11'
save_dir_path = r'xxxxx'
commend_path = r'xxxxxxxxxxxxx.txt'
save_dir_path = os.path.join(save_dir_path,date)
if not os.path.exists(save_dir_path):
os.makedirs(save_dir_path)
cloud = JournalCloud(mongo_path,save_dir_path,date)
list_collection = cloud.run()
if list_collection:
t = TerminalDownload(commend_path,list_collection)
t.run()
end = time.time()
print("运行时间:%.1f秒" % (end-start))

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)