事情缘由:
昨日下午工信部前来,几个看似很专业搞安全的非要让现场写脚本导出几百万条Redis记录中的IP字段,由于之间确实没想过如何快速导出这么多数据,只能尴尬认怂~但下来仔细想想我们可以做到~办法总比困难多~
具体需求:
1. 快速导出Redis中只包含[0-9a-z]组成的16序列号下的WlanIP字段
实现思路:
1. 必然先想到多线程/多进程/多协程,最终选择gevent协程池的原因的是涉及到Redis读和文件写操作,相对于多进程/多线程更容易控制且时间不会浪费在阻塞上,异步来回切换更适合
2. 为了减轻每次调用Redis接口keys指令,所以先计算出[0-9a-z]组成的2位序列号排列组合,顺便将其作为250多个文件名
3. 为了减轻每次调用Redis接口hgetall指令,所以使用pipeline,超过1000个指令后统一通过各自管道发送来防止反复连接执行耗时
4. 由于Redis接口连接池只是提供了keepalive功能,所以在协程内部直接维护100个"假"连接池,但还有一个最主要的功能是为了解决pipeline混乱的问题
具体代码:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import os
from gevent import monkey
from gevent.pool import Pool
from itertools import product
from redis.exceptions import ResponseError
from redis import StrictRedis, ConnectionPool
# 说明: 导入其它模块
host = ‘127.0.0.1‘
port = 5123
monkey.patch_all()
def write_serialdata(key):
rds_pool = ConnectionPool(host=host, port=port)
rds_inst = StrictRedis(connection_pool=rds_pool, retry_on_timeout=True, max_connections=10)
rds_pipe = rds_inst.pipeline()
prefix = ‘‘.join(key)
redis_key = ‘{0}*‘.format(prefix)
rds_pipe.keys(redis_key)
redis_val = rds_pipe.execute()
if not redis_val:
return
with open(prefix, ‘a+b‘) as fd:
for index, serial_key in enumerate(redis_val[0]):
if index % 1000 == 0:
serial_val = rds_pipe.execute()
if not serial_val:
continue
for item in serial_val:
if ‘WanIP‘ not in item:
continue
ip = item[‘WanIP‘]
print ‘record ip => {0} to file {1}‘.format(ip, prefix)
fd.write(‘‘.join([ip, os.linesep]))
if not serial_key.isalnum():
continue
rds_pipe.hgetall(serial_key)
serial_val = rds_pipe.execute()
if not serial_val:
return
for item in serial_val:
if ‘WanIP‘ not in item:
continue
ip = item[‘WanIP‘]
print ‘record ip => {0} to file {1}‘.format(ip, prefix)
fd.write(‘‘.join([ip, os.linesep]))
def generat_product():
s_bit = ‘abcdef0123456789‘
return product(s_bit, repeat=2)
if __name__ == ‘__main__‘:
pool = Pool(100)
keys = generat_product()
path = []
for key_pairs in keys:
fpath = ‘‘.join(key_pairs)
path.append(fpath)
if not os.path.exists(fpath):
file(fpath, ‘w+b‘).close()
pool.map(write_serialdata, path)有图有相:
本文出自 “满满李 - 运维开发之路” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1869691
每日一题_Python.利用gevent和pipeline快速导出近千万Redis字段值?
原文:http://xmdevops.blog.51cto.com/11144840/1869691