python hbase读取数据发送kafka的方法

本例子实现从hbase获取数据,并发送kafka。

使用

#!/usr/bin/env python

#coding=utf-8

import sys

import time

import json

sys.path.append('/usr/local/lib/python3.5/site-packages')

from thrift import Thrift

from thrift.transport import TSocket

from thrift.transport import TTransport

from thrift.protocol import TBinaryProtocol

from hbase1 import Hbase #调用hbase thrif1

from hbase1.ttypes import *

from kafka import KafkaConsumer

from kafka import KafkaProducer

from kafka.errors import KafkaError

import unittest

class HbaseOpreator:

def __init__(self,host,port,table='test'):

self.tableName=table

self.transport=TTransport.TBufferedTransport(TSocket.TSocket(host,port))

self.protocol=TBinaryProtocol.TBinaryProtocol(self.transport)

self.client=Hbase.Client(self.protocol)

self.transport.open()

def __del__(self):

self.transport.close()

def scanTablefilter(self,table,*args):

d=dict()

L=[]

try:

tableName=table

# scan = Hbase.TScan(startRow, stopRow)

scan=TScan()

#主键首字母123

# filter = "PrefixFilter('123_')"

# filter = "RowFilter(=,'regexstring:.aaa')"

#过滤条件,当前为 statis_date 字段,值为20170223

# fitler = "SingleColumnValueFilter(tableName,'f','statis_date','20170223')"

# filter="SingleColumnValueFilter('f','statis_date',=,'binary:20170223') AND SingleColumnValueFilter('f','name',=,'binary:LXS')"

filter="SingleColumnValueFilter('info','name',=,'binary:lilei') OR SingleColumnValueFilter('info','name',=,'binary:lily')"

scan.filterString=filter

id=self.client.scannerOpenWithScan(tableName,scan,None)

result=self.client.scannerGet(id)

# result=self.client.scannerGetList(id,100)

while result:

for r in result:

key=r.row

name=r.columns.get('info:name').value

age=r.columns.get('info:age').value

phone=r.columns.get('info:phone').value

d['key']=key

d['name']=name

d['age']=age

d['phone']=phone

# encode_result_json=json.dumps(d).encode(encoding="utf-8")

# print(encode_result_json)

L.append(d)

result=self.client.scannerGet(id)

return json.dumps(L).encode(encoding="utf-8")

finally:

# self.client.scannerClose(scan)

print("scan finish")

def sendKfafkaProduct(data):

# self.host_port='localhost:9092'

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

for d in data:

producer.send('test', key=b'lxs', value=d)

time.sleep(5)

print(d)

while True:

producer.send('test', key=b'lxs', value=data)

time.sleep(5)

print(data)

if __name__== '__main__':

# unittest.main()

B=HbaseOpreator('10.27.1.138',9090)

value=B.scanTablefilter('ns_lbi:test_hbase_student')

print(value)

#sendKfafkaProduct(value)

以上这篇python hbase读取数据发送kafka的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。

以上是 python hbase读取数据发送kafka的方法 的全部内容, 来源链接: utcz.com/z/359070.html

回到顶部