网络编程
位置:首页>> 网络编程>> Python编程>> python hbase读取数据发送kafka的方法

python hbase读取数据发送kafka的方法

作者:meiguopai1  发布时间:2023-09-07 14:39:46 

标签:python,hbase,kafka

本例子实现从hbase获取数据,并发送kafka。

使用


#!/usr/bin/env python
#coding=utf-8

import sys
import time
import json

sys.path.append('/usr/local/lib/python3.5/site-packages')
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase1 import Hbase #调用hbase thrif1
from hbase1.ttypes import *
from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka.errors import KafkaError
import unittest

class HbaseOpreator:
def __init__(self,host,port,table='test'):
 self.tableName=table
 self.transport=TTransport.TBufferedTransport(TSocket.TSocket(host,port))
 self.protocol=TBinaryProtocol.TBinaryProtocol(self.transport)
 self.client=Hbase.Client(self.protocol)
 self.transport.open()

def __del__(self):
 self.transport.close()

def scanTablefilter(self,table,*args):
 d=dict()
 L=[]
 try:
  tableName=table
  # scan = Hbase.TScan(startRow, stopRow)
  scan=TScan()
  #主键首字母123
  # filter = "PrefixFilter('123_')"
  # filter = "RowFilter(=,'regexstring:.aaa')"
  #过滤条件,当前为 statis_date 字段,值为20170223
  # fitler = "SingleColumnValueFilter(tableName,'f','statis_date','20170223')"
  # filter="SingleColumnValueFilter('f','statis_date',=,'binary:20170223') AND SingleColumnValueFilter('f','name',=,'binary:LXS')"
  filter="SingleColumnValueFilter('info','name',=,'binary:lilei') OR SingleColumnValueFilter('info','name',=,'binary:lily')"
  scan.filterString=filter
  id=self.client.scannerOpenWithScan(tableName,scan,None)
  result=self.client.scannerGet(id)
  # result=self.client.scannerGetList(id,100)
  while result:
   for r in result:
    key=r.row
    name=r.columns.get('info:name').value
    age=r.columns.get('info:age').value
    phone=r.columns.get('info:phone').value
    d['key']=key
    d['name']=name
    d['age']=age
    d['phone']=phone
    # encode_result_json=json.dumps(d).encode(encoding="utf-8")
    # print(encode_result_json)
    L.append(d)        
   result=self.client.scannerGet(id)    
  return json.dumps(L).encode(encoding="utf-8")  
 finally:
  # self.client.scannerClose(scan)
  print("scan finish")

def sendKfafkaProduct(data):
# self.host_port='localhost:9092'
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
for d in data:
 producer.send('test', key=b'lxs', value=d)
 time.sleep(5)
 print(d)

while True:
 producer.send('test', key=b'lxs', value=data)
 time.sleep(5)
 print(data)

if __name__== '__main__':
# unittest.main()

B=HbaseOpreator('10.27.1.138',9090)
value=B.scanTablefilter('ns_lbi:test_hbase_student')
print(value)
#sendKfafkaProduct(value)

来源:https://blog.csdn.net/meiguopai1/article/details/70175069

0
投稿

猜你喜欢

手机版 网络编程 asp之家 www.aspxhome.com