MySQL單表千萬(wàn)級(jí)數(shù)據(jù)處理的思路分享
在處理過(guò)程中,今天上午需要更新A字段,下午爬蟲組完成了規(guī)格書或圖片的爬取又需要更新圖片和規(guī)格書字段,由于單表千萬(wàn)級(jí)深度翻頁(yè)會(huì)導(dǎo)致處理速度越來(lái)越慢。
select a,b,c from db.tb limit 10000 offset 9000000
但是時(shí)間是有限的,是否有更好的方法去解決這種問(wèn)題呢?
改進(jìn)思路是否有可以不需要深度翻頁(yè)也可以進(jìn)行數(shù)據(jù)更新的憑據(jù)?是的,利用自增id列
觀察數(shù)據(jù)特征此單表有自增id列且為主鍵,根據(jù)索引列查詢數(shù)據(jù)和更新數(shù)據(jù)是最理想的途徑。
select a,b, c from db.tb where id=9999999;update db.tb set a=x where id=9999999;多進(jìn)程處理
每個(gè)進(jìn)程處理一定id范圍內(nèi)的數(shù)據(jù),這樣既避免的深度翻頁(yè)又可以同時(shí)多進(jìn)程處理數(shù)據(jù)。提高數(shù)據(jù)查詢速度的同時(shí)也提高了數(shù)據(jù)處理速度。下面是我編寫的任務(wù)分配函數(shù),供參考:
def mission_handler(all_missions, worker_mission_size): ''' 根據(jù)總?cè)蝿?wù)數(shù)和每個(gè)worker的任務(wù)數(shù)計(jì)算出任務(wù)列表, 任務(wù)列表元素為(任務(wù)開始id, 任務(wù)結(jié)束id)。 例: 總?cè)蝿?wù)數(shù)100個(gè),每個(gè)worker的任務(wù)數(shù)40, 那么任務(wù)列表為:[(1, 40), (41, 80), (81, 100)] :param all_missions: 總?cè)蝿?wù)數(shù) :param worker_mission_size: 每個(gè)worker的最大任務(wù)數(shù) :return: [(start_id, end_id), (start_id, end_id), ...] ''' worker_mission_ids = [] current_id = 0 while current_id <= all_missions:start_id = all_missions if current_id + 1 >= all_missions else current_id + 1end_id = all_missions if current_id + worker_mission_size >= all_missions else current_id + worker_mission_sizeif start_id == end_id: if worker_mission_ids[-1][1] == start_id:breakworker_mission_ids.append((start_id, end_id))current_id += worker_mission_size return worker_mission_ids
假設(shè)單表id最大值為100, 然后我們希望每個(gè)進(jìn)程處理20個(gè)id,那么任務(wù)列表將為:
>>> mission_handler(100, 40)[(1, 40), (41, 80), (81, 100)]
那么,進(jìn)程1將只需要處理id between 1 to 40的數(shù)據(jù);進(jìn)程2將只需要處理id between 41 to 80的數(shù)據(jù);進(jìn)程3將只需要處理id between 81 to 100的數(shù)據(jù)。
from concurrent.futures import ProcessPoolExecutordef main(): # 自增id最大值 max_id = 30000000 # 單worker處理數(shù)據(jù)量 worker_mission_size = 1000000 # 使用多進(jìn)程進(jìn)行處理 missions = mission_handler(max_id, worker_mission_size) workers = [] executor = ProcessPoolExecutor() for idx, mission in enumerate(missions):start_id, end_id = missionworkers.append(executor.submit(data_handler, start_id, end_id, idx))def data_handler(start_id, end_id, worker_id): pass思路總結(jié) 避免深度翻頁(yè)進(jìn)而使用自增id進(jìn)行查詢數(shù)據(jù)和數(shù)據(jù) 使用多進(jìn)程處理數(shù)據(jù) 數(shù)據(jù)處理技巧
記錄處理成功與處理失敗的數(shù)據(jù)id,以便后續(xù)跟進(jìn)處理
# 用另外一張表記錄處理狀態(tài)insert into db.tb_handle_status(row_id, success) values (999, 0);
循環(huán)體內(nèi)進(jìn)行異常捕獲,避免程序異常退出
def data_handler(start_id, end_id, worker_id): # 數(shù)據(jù)連接 conn, cursor = mysql() current_id = start_idtry: while current_id <= end_id:try: # TODO 數(shù)據(jù)處理代碼 passexcept Exception as e: # TODO 記錄處理結(jié)果 # 數(shù)據(jù)移動(dòng)到下一條 current_id += 1 continueelse: # 無(wú)異常,繼續(xù)處理下一條數(shù)據(jù) current_id += 1except Exception as e: return ’worker_id({}): result({})’.format(worker_id, False)finally: # 數(shù)據(jù)庫(kù)資源釋放 cursor.close() conn.close()return ’worker_id({}): result({})’.format(worker_id, True)
更新數(shù)據(jù)庫(kù)數(shù)據(jù)盡量使用批量提交
sql = '''update db.tb set a=%s, b=%s where id=%s'''values = [ (’a_value’, ’b_value’, 9999), (’a_value’, ’b_value’, 9998), ... ]# 批量提交,減少網(wǎng)絡(luò)io以及鎖獲取頻率cursor.executemany(sql, values)
以上就是MySQL單表千萬(wàn)級(jí)數(shù)據(jù)處理的思路分享的詳細(xì)內(nèi)容,更多關(guān)于MySQL單表千萬(wàn)級(jí)數(shù)據(jù)處理的資料請(qǐng)關(guān)注好吧啦網(wǎng)其它相關(guān)文章!
相關(guān)文章:
1. Oracle災(zāi)難防護(hù)的關(guān)鍵技術(shù)2. Microsoft Office Access凍結(jié)字段的方法3. 提高商業(yè)智能環(huán)境中DB2查詢的性能(2)4. 關(guān)于SQL server中字段值為null的查詢5. 傳甲骨文將增加對(duì)MySQL投資與微軟競(jìng)爭(zhēng)6. 關(guān)于Sql server數(shù)據(jù)庫(kù)日志滿的快速解決辦法7. Access創(chuàng)建一個(gè)簡(jiǎn)單MIS管理系統(tǒng)8. SQL Server數(shù)據(jù)庫(kù)連接查詢和子查詢實(shí)戰(zhàn)案例9. Microsoft Office Access復(fù)制數(shù)據(jù)表的方法10. SQL Server靜態(tài)頁(yè)面導(dǎo)出技術(shù)2
