当前位置:首页 > PHP教程 > php高级应用 > 列表

Python实现 多进程导入CSV数据到 MySQL

发布:smiling 来源: PHP粉丝网  添加日期:2018-08-02 17:20:18 浏览: 评论:0 

前段时间帮同事处理了一个把 CSV 数据导入到 MySQL 的需求。两个很大的 CSV 文件, 分别有 3GB、2100 万条记录和  7GB、3500 万条记录。对于这个量级的数据,用简单的单进程/单线程导入  会耗时很久,最终用了多进程的方式来实现。具体过程不赘述,记录一下几个要点:

批量插入而不是逐条插入

为了加快插入速度,先不要建索引

生产者和消费者模型,主进程读文件,多个 worker 进程执行插入

注意控制 worker 的数量,避免对 MySQL 造成太大的压力

注意处理脏数据导致的异常

原始数据是 GBK 编码,所以还要注意转换成 UTF-8

用 click 封装命令行工具

具体的代码实现如下:

  1. #!/usr/bin/env python 
  2. # -*- coding: utf-8 -*- 
  3.   
  4. importcodecs 
  5. importcsv 
  6. importlogging 
  7. importmultiprocessing 
  8. importos 
  9. importwarnings 
  10.   
  11. importclick 
  12. importMySQLdb 
  13. importsqlalchemy 
  14.   
  15. warnings.filterwarnings('ignore', category=MySQLdb.Warning) 
  16.   
  17. # 批量插入的记录数量 
  18. BATCH=5000 
  19.   
  20. DB_URI='mysql://root@localhost:3306/example?charset=utf8' 
  21.   
  22. engine=sqlalchemy.create_engine(DB_URI) 
  23.   
  24.   
  25. defget_table_cols(table): 
  26.   sql='SELECT * FROM `{table}` LIMIT 0'.format(table=table) 
  27.   res=engine.execute(sql) 
  28.   returnres.keys() 
  29.   
  30.   
  31. definsert_many(table, cols, rows, cursor): 
  32.   sql='INSERT INTO `{table}` ({cols}) VALUES ({marks})'.format( 
  33.       table=table, 
  34.       cols=', '.join(cols), 
  35.       marks=', '.join(['%s']*len(cols))) 
  36.   cursor.execute(sql,*rows) 
  37.   logging.info('process %s inserted %s rows into table %s', os.getpid(),len(rows), table) 
  38.   
  39.   
  40. definsert_worker(table, cols, queue): 
  41.   rows=[] 
  42.   # 每个子进程创建自己的 engine 对象 
  43.   cursor=sqlalchemy.create_engine(DB_URI) 
  44.   whileTrue: 
  45.     row=queue.get() 
  46.     ifrowisNone: 
  47.       ifrows: 
  48.         insert_many(table, cols, rows, cursor) 
  49.       break 
  50.   
  51.     rows.append(row) 
  52.     iflen(rows)==BATCH: 
  53.       insert_many(table, cols, rows, cursor) 
  54.       rows=[] 
  55.   
  56.   
  57. definsert_parallel(table, reader, w=10): 
  58.   cols=get_table_cols(table) 
  59.   
  60.   # 数据队列,主进程读文件并往里写数据,worker 进程从队列读数据 
  61.   # 注意一下控制队列的大小,避免消费太慢导致堆积太多数据,占用过多内存 
  62.   queue=multiprocessing.Queue(maxsize=w*BATCH*2) 
  63.   workers=[] 
  64.   foriinrange(w): 
  65.     p=multiprocessing.Process(target=insert_worker, args=(table, cols, queue)) 
  66.     p.start() 
  67.     workers.append(p) 
  68.     logging.info('starting # %s worker process, pid: %s...', i+1, p.pid) 
  69.   
  70.   dirty_data_file='./{}_dirty_rows.csv'.format(table) 
  71.   xf=open(dirty_data_file,'w'
  72.   writer=csv.writer(xf, delimiter=reader.dialect.delimiter) 
  73.   
  74.   forlineinreader: 
  75.     # 记录并跳过脏数据: 键值数量不一致 
  76.     iflen(line) !=len(cols): 
  77.       writer.writerow(line) 
  78.       continue 
  79.   
  80.     # 把 None 值替换为 'NULL' 
  81.     clean_line=[Noneifx=='NULL'elsexforxinline] 
  82.   
  83.     # 往队列里写数据 
  84.     queue.put(tuple(clean_line)) 
  85.     ifreader.line_num%500000==0: 
  86.       logging.info('put %s tasks into queue.', reader.line_num) 
  87.   
  88.   xf.close() 
  89.   
  90.   # 给每个 worker 发送任务结束的信号 
  91.   logging.info('send close signal to worker processes'
  92.   foriinrange(w): 
  93.     queue.put(None) 
  94.   
  95.   forpinworkers: 
  96.     p.join() 
  97.   
  98.   
  99. defconvert_file_to_utf8(f, rv_file=None): 
  100.   ifnotrv_file: 
  101.     name, ext=os.path.splitext(f) 
  102.     ifisinstance(name,unicode): 
  103.       name=name.encode('utf8'
  104.     rv_file='{}_utf8{}'.format(name, ext) 
  105.   logging.info('start to process file %s', f) 
  106.   withopen(f) as infd: 
  107.     withopen(rv_file,'w'as outfd: 
  108.       lines=[] 
  109.       loop=0 
  110.       chunck=200000 
  111.       first_line=infd.readline().strip(codecs.BOM_UTF8).strip()+'\n' 
  112.       lines.append(first_line) 
  113.       forlineininfd: 
  114.         clean_line=line.decode('gb18030').encode('utf8'
  115.         clean_line=clean_line.rstrip()+'\n' 
  116.         lines.append(clean_line) 
  117.         iflen(lines)==chunck: 
  118.           outfd.writelines(lines) 
  119.           lines=[] 
  120.           loop+=1 
  121.           logging.info('processed %s lines.', loop*chunck) 
  122.   
  123.       outfd.writelines(lines) 
  124.       logging.info('processed %s lines.', loop*chunck+len(lines)) 
  125.   
  126.   
  127. @click.group() 
  128. defcli(): 
  129.   logging.basicConfig(level=logging.INFO, 
  130.             format='%(asctime)s - %(levelname)s - %(name)s - %(message)s'
  131.   
  132.   
  133. @cli.command('gbk_to_utf8'
  134. @click.argument('f'
  135. defconvert_gbk_to_utf8(f): 
  136.   convert_file_to_utf8(f) 
  137.   
  138.   
  139. @cli.command('load'
  140. @click.option('-t','--table', required=True,help='表名'
  141. @click.option('-i','--filename', required=True,help='输入文件'
  142. @click.option('-w','--workers'default=10,help='worker 数量,默认 10'
  143. defload_fac_day_pro_nos_sal_table(table, filename, workers): 
  144.   withopen(filename) as fd: 
  145.     fd.readline() # skip header 
  146.     reader=csv.reader(fd) 
  147.     insert_parallel(table, reader, w=workers) 
  148.  //phpfensi.com 
  149.   
  150. if__name__=='__main__'
  151.   cli() 

Tags: 进程 数据

分享到: