Python 调用 ES、Solr、Phoenix的示例代码
作者:LeoZhanggg 发布时间:2023-10-03 04:52:57
标签:Python,调用,ES,Solr,Phoenix
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# *************************************
# @Time : 2019/8/12
# @Author : Zhang Fan
# @Desc : Library
# @File : MyDatabases.py
# @Update : 2019/8/23
# *************************************
import elasticsearch
import phoenixdb
import pysolr
import pymysql
class MyELS(object):
"""
===================================================================
===================== MyELS =========================
===================================================================
"""
def __init__(self):
self.els_conn = None
def connect_to_els(self, host, port):
"""
连接到ElasticSearch服务器.
"""
self.els_conn = elasticsearch.Elasticsearch([{'host': host, 'port': port}])
print('Executing : Connect To Elastic Search | %s' % self.els_conn)
def get_els_data(self, query, index):
"""
获取ElasticSearch数据
"""
print('Executing : Search | %s' % query)
try:
rst = self.els_conn.search(index=index, q=query)
return rst['hits']
except Exception as e:
print('Elastic Search Error | %s' % e)
raise Exception(e)
class MyPhoenix(object):
"""
===================================================================
===================== MyPhoenix ======================
===================================================================
"""
def __init__(self):
self.phoenix_conn = None
self.phoenix_cursor = None
def connect_to_phoenix(self, host, port=8765):
"""
连接到phoenix服务器
"""
address = 'http://{0}:{1}/'.format(host, port)
print('Executing : Connect To Phoenix | %s' % address)
self.phoenix_conn = phoenixdb.connect(address, autocommit=True)
self.phoenix_cursor = self.phoenix_conn.cursor()
def set_schema(self, sql, schema):
"""
设置schema
"""
pre_sub, sub, fol_sub = sql.upper().partition('FROM')
fol_sub = ' ' + schema + '.' + fol_sub.strip()
new_sql = ''.join([pre_sub, sub, fol_sub])
return new_sql
def execute_phoenix_sql(self, sql):
"""
执行sql语句
"""
# sql = self.set_schema(sql, schema)
print('Executing : Execute | %s' % sql)
self.phoenix_cursor.execute(sql)
def get_from_phoenix(self, sql):
"""
获取phoenix数据
"""
# sql = self.set_schema(sql, schema)
print('Executing : Query | %s' % sql)
try:
self.phoenix_cursor.execute(sql)
except Exception as e:
print('Phoenix Error | %s' % e)
raise Exception(e)
return self.phoenix_cursor.fetchall()
def disconnect_from_phoenix(self):
"""
断开phoenix连接
"""
print('Executing : Disconnect From HBase')
self.phoenix_cursor.close()
self.phoenix_conn.close()
class MySolr(object):
"""
===================================================================
===================== MySolr =========================
===================================================================
"""
def __init__(self):
self.solr_conn = None
self.base_url = None
def connect_to_solr(self, address, selector):
"""连接到solr服务器.
"""
self.base_url = 'http://{0}/solr/{1}/'.format(address, selector)
self.solr_conn = pysolr.Solr(self.base_url)
print('Executing : Connect To Solr | %s' % self.base_url)
def get_solr_data(self, query):
"""
获取solr数据
"""
results = list()
print('Executing : Search | %s' % query)
try:
items = self.solr_conn.search(query)
for item in items:
results.append(item)
except Exception as e:
print('Solr Error | %s' % e)
raise Exception(e)
return results
def add_solr_data(self, data):
"""
添加solr数据
"""
print('Executing : add | %s' % data)
try:
self.solr_conn.add([data])
self.solr_conn.commit()
except Exception as e:
print('Solr Error | %s' % e)
raise Exception(e)
def del_solr_byId(self, data):
"""
删除solr数据
"""
print('Executing : del | %s' % data)
try:
self.solr_conn.delete(id=data)
self.solr_conn.commit()
except Exception as e:
print('Solr Error | %s' % e)
raise Exception(e)
if __name__ == '__main__':
print('This is test.')
ms = MySolr()
me = MyELS()
mp = MyPhoenix()
来源:https://www.cnblogs.com/leozhanggg/p/11401570.html
0
投稿
猜你喜欢
- 目的: 找出路径坐在的所有python文件(.py结尾),返回列表。代码:def list_py(path = None): if path
- 这是一款简单,方便,功能齐全的分页类,可以根据自己的需要更改CSS样式文件以实现分页颜色的控制,利用p
- 在处理数据的时候,因为没有及时的去重,所以需要重新对生成txt进行去重。可是一个文件夹下有很多txt,总不可能一个一个去操作,这样效率太低了
- 解析json数据并保存为csv文件首先导入两个包:import jsonimport pandas as pd打开json 文件并读取:wi
- 有时候我们想要的数据合并结果是数据的轴向连接,在pandas中这可以通过concat来实现。操作的对象通常是Series。Ipython中的
- 导语为什么有这么一个简单的游戏?这个游戏如此受欢迎?仅仅是因为它在游戏行业异常匮乏的年代出现,从而成为了一代人的记忆吗?恐怕并不是。玩过俄罗
- 前言学习Python的过程中,比较喜欢通过实际的小项目进行巩固学习,决定写一个弹跳小球的程序。这个实战例程是在公众号上看到的,他的编写过程比
- 前言原理:Windows逆向,通过内联汇编的形式调用发消息的函数下面的代码PC微信版本是:3.7.0.26 , python使用的32位的3
- 升序import pandas as pdimport numpy as npdata = np.random.randint(low=2,
- 概要:Oracle关系数据库系统以其卓越的性能获得了广泛的应用,而保证数据库的安全性 是数据库管理工作的重要内容。本文是笔者在总结Oracl
- 一、导入所需的库import turtleimport randomfrom math import *二、生成斐波那契数列斐波那契数列是指
- 一. 概述首先需要先介绍一下无监督学习,所谓无监督学习,就是训练样本中的标记信息是位置的,目标是通过对无标记训练样本的学习来揭示数据的内在性
- 环境:OS:Red Hat Enterprise Linux AS release 4 (Nahant)DB:Oracle Database
- 第一个坑:'geckodriver' executable needs to be in PATH1.如果启动浏览器过程中报
- 今天分享 3 个 Python 编程小技巧,来看看你是否用过?1、如何按照字典的值的大小进行排序我们知道,字典的本质是哈希表,本身是无法排序
- Python获取时间范围内日期列表和周列表的函数 1、获取日期列表# -*- coding=utf-8 -*-import datetime
- asp时间加减运算 和转换问题 a=2007-07-24 2:23:15 b=2005-06-25 2:23:15 问题1 如何将a转换成2
- 一个简单的php文件下载源代码,虽不支持断点续传等,但是可以满足一些常用的需求了。php下载文件其实用一个a标签就能实现,比如 <a
- PHP getName() 函数实例返回 XML 元素及其子元素的名称:<?php $xml=<<<XML<?
- 前言:我们在日常生活中,都离不开时间和日期。不仅我们的汉字都有大量描述时间日期的词语如斗转星移、分秒必争、只针朝夕、转眼之间等。同样,在我们