国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁技術文章
文章詳情頁

python multiprocessing 多進程并行計算的操作

瀏覽:3日期:2022-06-26 11:42:14

python的multiprocessing包是標準庫提供的多進程并行計算包,提供了和threading(多線程)相似的API函數,但是相比于threading,將任務分配到不同的CPU,避免了GIL(Global Interpreter Lock)的限制。

下面我們對multiprocessing中的Pool和Process類做介紹。

Pool

采用Pool進程池對任務并行處理更加方便,我們可以指定并行的CPU個數,然后 Pool 會自動把任務放到進程池中運行。 Pool 包含了多個并行函數。

apply apply_async

apply 要逐個執行任務,在python3中已經被棄用,而apply_async是apply的異步執行版本。并行計算一定要采用apply_async函數。

import multiprocessingimport timefrom random import randint, seeddef f(num): seed() rand_num = randint(0,10) # 每次都隨機生成一個停頓時間 time.sleep(rand_num) return (num, rand_num)start_time = time.time()cores = multiprocessing.cpu_count()pool = multiprocessing.Pool(processes=cores)pool_list = []result_list = []start_time = time.time()for xx in xrange(10): pool_list.append(pool.apply_async(f, (xx, ))) # 這里不能 get, 會阻塞進程result_list = [xx.get() for xx in pool_list]#在這里不免有人要疑問,為什么不直接在 for 循環中直接 result.get()呢?這是因為pool.apply_async之后的語句都是阻塞執行的,調用 result.get() 會等待上一個任務執行完之后才會分配下一個任務。事實上,獲取返回值的過程最好放在進程池回收之后進行,避免阻塞后面的語句。# 最后我們使用一下語句回收進程池: pool.close()pool.join()print result_listprint ’并行花費時間 %.2f’ % (time.time() - start_time)print ’串行花費時間 %.2f’ % (sum([xx[1] for xx in result_list]))#[(0, 8), (1, 2), (2, 4), (3, 9), (4, 0), (5, 1), (6, 8), (7, 3), (8, 4), (9, 6)]#并行花費時間 14.11#串行花費時間 45.00map map_async

map_async 是 map的異步執行函數。

相比于 apply_async, map_async 只能接受一個參數。

import timefrom multiprocessing import Pooldef run(fn): #fn: 函數參數是數據列表的一個元素 time.sleep(1) return fn*fnif __name__ == '__main__': testFL = [1,2,3,4,5,6] print ’串行:’ #順序執行(也就是串行執行,單進程) s = time.time() for fn in testFL: run(fn) e1 = time.time() print '順序執行時間:', int(e1 - s) print ’并行:’ #創建多個進程,并行執行 pool = Pool(4) #創建擁有5個進程數量的進程池 #testFL:要處理的數據列表,run:處理testFL列表中數據的函數 rl =pool.map(run, testFL) pool.close()#關閉進程池,不再接受新的進程 pool.join()#主進程阻塞等待子進程的退出 e2 = time.time() print '并行執行時間:', int(e2-e1) print rl# 串行:# 順序執行時間: 6# 并行:# 并行執行時間: 2# [1, 4, 9, 16, 25, 36]Process

采用Process必須注意的是,Process對象來創建進程,每一個進程占據一個CPU,所以要建立的進程必須 小于等于 CPU的個數。

如果啟動進程數過多,特別是當遇到CPU密集型任務,會降低并行的效率。

#16.6.1.1. The Process classfrom multiprocessing import Process, cpu_countimport osimport timestart_time = time.time()def info(title):# print(title) if hasattr(os, ’getppid’): # only available on Unix print ’parent process:’, os.getppid() print ’process id:’, os.getpid() time.sleep(3)def f(name): info(’function f’) print ’hello’, nameif __name__ == ’__main__’:# info(’main line’) p_list = [] # 保存Process新建的進程 cpu_num = cpu_count() for xx in xrange(cpu_num): p_list.append(Process(target=f, args=(’xx_%s’ % xx,))) for xx in p_list: xx.start() for xx in p_list: xx.join() print(’spend time: %.2f’ % (time.time() - start_time))parent process: 11741# parent process: 11741# parent process: 11741# process id: 12249# process id: 12250# parent process: 11741# process id: 12251# process id: 12252# hello xx_1# hello xx_0# hello xx_2# hello xx_3# spend time: 3.04進程間通信

Process和Pool均支持Queues 和 Pipes 兩種類型的通信。

Queue 隊列

隊列遵循先進先出的原則,可以在各個進程間使用。

# 16.6.1.2. Exchanging objects between processes# Queuesfrom multiprocessing import Process, Queuedef f(q): q.put([42, None, ’hello’])if __name__ == ’__main__’: q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() # prints '[42, None, ’hello’]' p.join()pipe

from multiprocessing import Process, Pipedef f(conn): conn.send([42, None, ’hello’]) conn.close()if __name__ == ’__main__’: parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print parent_conn.recv() # prints '[42, None, ’hello’]' p.join()queue 與 pipe比較

Pipe() can only have two endpoints.

Queue() can have multiple producers and consumers.

When to use them

If you need more than two points to communicate, use a Queue().

If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().

參考:

https://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue

共享資源

多進程應該避免共享資源。在多線程中,我們可以比較容易地共享資源,比如使用全局變量或者傳遞參數。

在多進程情況下,由于每個進程有自己獨立的內存空間,以上方法并不合適。

此時我們可以通過共享內存和Manager的方法來共享資源。

但這樣做提高了程序的復雜度,并因為同步的需要而降低了程序的效率。

共享內存

共享內存僅適用于 Process 類,不能用于進程池 Pool

# 16.6.1.4. Sharing state between processes# Shared memoryfrom multiprocessing import Process, Value, Arraydef f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i]if __name__ == ’__main__’: num = Value(’d’, 0.0) arr = Array(’i’, range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print num.value print arr[:]# 3.1415927# [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]Manager Class

Manager Class 既可以用于Process 也可以用于進程池 Pool。

from multiprocessing import Manager, Processdef f(d, l, ii): d[ii] = ii l.append(ii)if __name__ == ’__main__’: manager = Manager() d = manager.dict() l = manager.list(range(10)) p_list = [] for xx in range(4): p_list.append(Process(target=f, args=(d, l, xx))) for xx in p_list: xx.start() for xx in p_list: xx.join() print d print l# {0: 0, 1: 1, 2: 2, 3: 3}# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3]

補充:python程序多進程運行時間計算/多進程寫數據/多進程讀數據

import timetime_start=time.time()time_end=time.time()print(’time cost’,time_end-time_start,’s’)

單位為秒,也可以換算成其他單位輸出

注意寫測試的時候,函數名要以test開頭,否則運行不了。

多線程中的問題:

1)多線程存數據:

def test_save_features_to_db(self): df1 = pd.read_csv(’/home/sc/PycharmProjects/risk-model/xg_test/statis_data/shixin_company.csv’) com_list = df1[’company_name’].values.tolist() # com_list = com_list[400015:400019] # print ’test_save_features_to_db’ # print(com_list) p_list = [] # 進程列表 i = 1 p_size = len(com_list) for company_name in com_list: # 創建進程 p = Process(target=self.__save_data_iter_method, args=[company_name]) # p.daemon = True p_list.append(p) # 間歇執行進程 if i % 20 == 0 or i == p_size: # 20頁處理一次, 最后一頁處理剩余for p in p_list: p.start()for p in p_list: p.join() # 等待進程結束p_list = [] # 清空進程列表 i += 1

總結:多進程寫入的時候,不需要lock,也不需要返回值。

核心p = Process(target=self.__save_data_iter_method, args=[company_name]),其中target指向多進程的一次完整的迭代,arg則是該迭代的輸入。

注意寫法args=[company_name]才對,原來寫成:args=company_name,args=(company_name)會報如下錯:只需要1個參數,而給出了34個參數。

多進程外層循環則是由輸入決定的,有多少個輸入就為多少次循環,理解p.start和p.join;

def __save_data_iter_method(self, com): # time_start = time.time() # print(com) f_d_t = ShiXinFeaturesDealSvc() res = f_d_t.get_time_features(company_name=com) # 是否失信 shixin_label = res.shixin_label key1 = res.shixin_time if key1: public_at = res.shixin_time company_name = res.time_map_features[key1].company_name # print(company_name) established_years = res.time_map_features[key1].established_years industry_dx_rate = res.time_map_features[key1].industry_dx_rate regcap_change_cnt = res.time_map_features[key1].regcap_change_cnt share_change_cnt = res.time_map_features[key1].share_change_cnt industry_dx_cnt = res.time_map_features[key1].industry_dx_cnt address_change_cnt = res.time_map_features[key1].address_change_cnt fr_change_cnt = res.time_map_features[key1].fr_change_cnt judgedoc_cnt = res.time_map_features[key1].judgedoc_cnt bidding_cnt = res.time_map_features[key1].bidding_cnt trade_mark_cnt = res.time_map_features[key1].trade_mark_cnt network_share_cancel_cnt = res.time_map_features[key1].network_share_cancel_cnt cancel_cnt = res.time_map_features[key1].cancel_cnt industry_all_cnt = res.time_map_features[key1].industry_all_cnt network_share_zhixing_cnt = res.time_map_features[key1].network_share_zhixing_cnt network_share_judge_doc_cnt = res.time_map_features[key1].network_share_judge_doc_cnt net_judgedoc_defendant_cnt = res.time_map_features[key1].net_judgedoc_defendant_cnt judge_doc_cnt = res.time_map_features[key1].judge_doc_cnt f_d_do = ShixinFeaturesDto(company_name=company_name, established_years=established_years, industry_dx_rate=industry_dx_rate, regcap_change_cnt=regcap_change_cnt, share_change_cnt=share_change_cnt, industry_all_cnt=industry_all_cnt, industry_dx_cnt=industry_dx_cnt, address_change_cnt=address_change_cnt, fr_change_cnt=fr_change_cnt, judgedoc_cnt=judgedoc_cnt, bidding_cnt=bidding_cnt, trade_mark_cnt=trade_mark_cnt, network_share_cancel_cnt=network_share_cancel_cnt, cancel_cnt=cancel_cnt, network_share_zhixing_cnt=network_share_zhixing_cnt, network_share_judge_doc_cnt=network_share_judge_doc_cnt, net_judgedoc_defendant_cnt=net_judgedoc_defendant_cnt, judge_doc_cnt=judge_doc_cnt, public_at=public_at, shixin_label=shixin_label) # time_end = time.time() # print(’totally cost’, time_end - time_start) self.cfdbsvc.save_or_update_features(f_d_do)def save_or_update_features(self, shixin_features_dto): ''' 添加或更新: 插入一行數據, 如果不存在則插入,存在則更新 ''' self._pg_util = PgUtil() p_id = None if isinstance(shixin_features_dto, ShixinFeaturesDto): p_id = str(uuid.uuid1()) self._pg_util.execute_sql(self.s_b.insert_or_update_row( self.model.COMPANY_NAME, { self.model.ID: p_id, # 公司名 self.model.COMPANY_NAME: shixin_features_dto.company_name, # 失信時間 self.model.PUBLIC_AT: shixin_features_dto.public_at, self.model.SHIXIN_LABEL : shixin_features_dto.shixin_label, self.model.ESTABLISHED_YEARS: shixin_features_dto.established_years, self.model.INDUSTRY_DX_RATE: shixin_features_dto.industry_dx_rate, self.model.REGCAP_CHANGE_CNT: shixin_features_dto.regcap_change_cnt, self.model.SHARE_CHANGE_CNT: shixin_features_dto.share_change_cnt, self.model.INDUSTRY_ALL_CNT: shixin_features_dto.industry_all_cnt, self.model.INDUSTRY_DX_CNT: shixin_features_dto.industry_dx_cnt, self.model.ADDRESS_CHANGE_CNT: shixin_features_dto.address_change_cnt, self.model.NETWORK_SHARE_CANCEL_CNT: shixin_features_dto.network_share_cancel_cnt, self.model.CANCEL_CNT: shixin_features_dto.cancel_cnt, self.model.NETWORK_SHARE_ZHIXING_CNT: shixin_features_dto.network_share_zhixing_cnt, self.model.FR_CHANGE_CNT: shixin_features_dto.fr_change_cnt, self.model.JUDGEDOC_CNT: shixin_features_dto.judgedoc_cnt, self.model.NETWORK_SHARE_JUDGE_DOC_CNT: shixin_features_dto.network_share_judge_doc_cnt, self.model.BIDDING_CNT: shixin_features_dto.bidding_cnt, self.model.TRADE_MARK_CNT: shixin_features_dto.trade_mark_cnt, self.model.JUDGE_DOC_CNT: shixin_features_dto.judge_doc_cnt }, [self.model.ADDRESS_CHANGE_CNT,self.model.BIDDING_CNT,self.model.CANCEL_CNT, self.model.ESTABLISHED_YEARS,self.model.FR_CHANGE_CNT,self.model.INDUSTRY_ALL_CNT, self.model.INDUSTRY_DX_RATE,self.model.INDUSTRY_DX_CNT,self.model.JUDGE_DOC_CNT, self.model.JUDGEDOC_CNT,self.model.NETWORK_SHARE_CANCEL_CNT,self.model.NETWORK_SHARE_JUDGE_DOC_CNT, self.model.NETWORK_SHARE_ZHIXING_CNT,self.model.REGCAP_CHANGE_CNT,self.model.TRADE_MARK_CNT, self.model.SHARE_CHANGE_CNT,self.model.SHIXIN_LABEL,self.model.PUBLIC_AT]) ) return p_id

函數中重新初始化了self._pg_util = PgUtil(),否則會報ssl error 和ssl decryption 的錯誤,背后原因有待研究!

**2)多進程取數據——(思考取數據為何要多進程)** def flush_process(self, lock): #需要傳入lock; ''' 運行待處理的方法隊列 :type lock Lock :return 返回一個dict ''' # process_pool = Pool(processes=20) # data_list = process_pool.map(one_process, self.__process_data_list) # # for (key, value) in data_list: # # 覆蓋上期變量 self.__dct_share = self.__manager.Value(’tmp’, {}) # 進程共享變量 p_list = [] # 進程列表 i = 1 p_size = len(self.__process_data_list) for process_data in self.__process_data_list: **#循環遍歷需要同時查找的公司列表!!!self.__process_data_list包含多個process_data,每個process_data包含三種屬性?類對象也可以循環????** # 創建進程 p = Process(target=self.__one_process, args=(process_data, lock)) #參數需要lock # p.daemon = True p_list.append(p) # 間歇執行進程 if i % 20 == 0 or i == p_size: # 20頁處理一次, 最后一頁處理剩余for p in p_list: p.start()for p in p_list: p.join() # 等待進程結束p_list = [] # 清空進程列表 i += 1 # end for self.__process_data_list = [] # 清空訂閱 return self.__dct_share.value def __one_process(self, process_data, lock): #迭代函數 ''' 處理進程 :param process_data: 方法和參數集等 :param lock: 保護鎖 ''' fcn = process_data.fcn params = process_data.params data_key = process_data.data_key if isinstance(params, tuple): data = fcn(*params) #**注意:*params 與 params區別** else: data = fcn(params) with lock: temp_dct = dict(self.__dct_share.value) if data_key not in temp_dct:temp_dct[data_key] = [] temp_dct[data_key].append(data) self.__dct_share.value = temp_dct

主程序調用:

def exe_process(self, company_name, open_from, time_nodes): ''' 多進程執行pre訂閱的數據 :param company_name: 公司名 :return: ''' mul_process_helper = MulProcessHelper() lock = Lock() self.__get_time_bidding_statistic(company_name, mul_process_helper) data = mul_process_helper.flush_process(lock) return data def __get_time_bidding_statistic(self, company_name, mul_process_helper): # 招投標信息 process_data = ProcessData(f_e_t_svc.get_bidding_statistic_time_node_api, company_name, self.__BIDDING_STATISTIC_TIME) **#此處怎么理解?ProcessData是一個類!!!** mul_process_helper.add_process_data_list(process_data) #同時調用多個api???將api方法當做迭代????用于同時查找多個公司???? def add_process_data_list(self, process_data): ''' 添加用于進程處理的方法隊列 :type process_data ProcessData :param process_data: :return: ''' self.__process_data_list.append(process_data) class ProcessData(object): ''' 用于進程處理的的數據 ''' def __init__(self, fcn, params, data_key): self.fcn = fcn # 方法 self.params = params # 參數 self.data_key = data_key # 存儲到進程共享變量中的名字

以上為個人經驗,希望能給大家一個參考,也希望大家多多支持好吧啦網。如有錯誤或未考慮完全的地方,望不吝賜教。

標簽: Python 編程
相關文章:
主站蜘蛛池模板: 国产男女免费完整视频 | 热伊人99re久久精品最新地 | 国产91美女 | 精品三级视频 | 久久精品片| 欧美日韩视频一区二区在线观看 | 免费aa在线观看 男人的天堂 | 亚洲人成网站观看在线播放 | 久久国产欧美日韩精品免费 | 日韩成人精品日本亚洲 | 精品国产三级在线观看 | 久久成人精品视频 | 欧美a一级 | 波多野结衣在线视频观看 | 宅男69免费永久网站 | avtt加勒比手机版天堂网 | 日韩一级片免费在线观看 | 日韩特级毛片免费观看视频 | 久草在线在线观看 | 免费看欧美毛片大片免费看 | 和日本免费不卡在线v | 亚洲免费小视频 | 亚洲毛片一级巨乳 | 一本本久综合久久爱 | 美国成人免费视频 | 九九九精品视频 | 亚洲性在线 | 国产成人做受免费视频 | 国产一区二区在线视频播放 | 国产成人综合高清在线观看 | 久久精品呦女 | 成人免费手机在线看网站 | 亚洲国产精品综合久久 | 欧美视频一区二区 | 91久久亚洲精品国产一区二区 | 中文字幕曰韩一区二区不卡 | 明星国产欧美日韩在线观看 | 特级深夜a级毛片免费观看 特级生活片 | 99国产精品高清一区二区二区 | 久久精品免视国产 | 干女人逼视频 |