首页 > 数据库技术 > 详细

python读取kafka,输出到Vertica数据库

时间:2017-02-22 13:14:19      阅读:651      评论:0      收藏:0      [点我收藏+]
# 主测试
# https://docs.python.org/2/library/json.html
import sys
import json
import vertica_python
import time
import os
from pykafka import KafkaClient  # 导入的vertica_python和pykafka包需要pip install安装

# 显示当前时间
print(开始时间, time.strftime(%Y-%m-%d %H:%M:%S, time.localtime(time.time())))
print(sys.getdefaultencoding())
client = KafkaClient(hosts="192.168.1.1:9092")  # 填写kafka地址和端口,一般是9092端口
# client.topics  # 查看所有topic
topic = client.topics[btopic]  # 选择一个topic
consumer = topic.get_simple_consumer(consumer_timeout_ms=2000, auto_commit_enable=1)  # 等待5秒无新数据,退出
data_group = []
conn_info = {host: 192.168.1.1, port: 1, user: a, password: b,
             database: c, read_timeout: 600, unicode_error: strict, ssl: False}  # 填写数据库连接信息
# simple connection, with manual close
connection = vertica_python.connect(**conn_info)
cur = connection.cursor()
a_error_count = 0
a_success_count = 0
path_os = os.path.abspath(offset.txt) # 将数据偏移量offset写入文件
f1 = open(path_os, r, encoding=utf8)
a_offset_start = int(f1.readline())  # 从a_offset_start开始读数据
print(a_offset_start)
# a_offset_start = 3000 # 可以手工指定从哪里开始读取数据,排错用
f1.close()
for message in consumer:  # 循环0
    if message is not None and message.offset > a_offset_start:
        try:
            a = message.value.decode(UTF-8)
            data_group.append(json.loads(a))
            c = message.offset
            for item in data_group:
                str1 = "insert into 表名(列名) values "+ "(‘" + str(c) \ # 将offset值也写入数据库 
          + "‘," + "‘%s‘,‘%s‘);\r\n" % ( item[列名1], item[列名2]) print(str1) cur.execute(str1) connection.commit() a_success_count += 1 data_group.pop() except: print(error_message) a_error_count += 1 continue c1 = message.offset f = open(path_os, w+ , encoding=utf8) f.truncate() f.write(str(c1)) f.write(\n + 开始时间= + time.strftime(%Y-%m-%d %H:%M:%S, time.localtime(time.time()))) f.write(\n + a_success_count= + str(a_success_count)) f.write(\n + a_error_count= + str(a_error_count)) f.close()

 

python读取kafka,输出到Vertica数据库

原文:http://www.cnblogs.com/castlevania/p/6428213.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!