java连接ElasticSearch集群操作
作者:java界的守门员 发布时间:2023-11-28 04:06:24
标签:java,ElasticSearch,集群
我就废话不多说了,大家还是直接看代码吧~
/*
*es配置类
*
*/
@Configuration
public class ElasticSearchDataSourceConfigurer {
private static final Logger LOG = LogManager.getLogger(ElasticSearchDataSourceConfigurer.class);
@Bean
public TransportClient getESClient() {
//设置集群名称
Settings settings = Settings.builder().put("cluster.name", "bigData-cluster").put("client.transport.sniff", true).build();
//创建client
TransportClient client = null;
try {
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(""), 9300));//集群ip
LOG.info("ESClient连接建立成功");
} catch (UnknownHostException e) {
LOG.info("ESClient连接建立失败");
e.printStackTrace();
}
return client;
}
}
/**
* Simple to Introduction
*
* @Description: [添加类]
*/
@Repository
public class UserDaoImpl implements userDao {
private static final String INDEXNAME = "user";//小写
private static final String TYPENAME = "info";
@Resource
TransportClient transportClient;
@Override
public int addUser(User[] user) {
IndexResponse indexResponse = null;
int successNum = 0;
for (int i = 0; i < user.length; i++) {
UUID uuid = UUID.randomUUID();
String str = uuid.toString();
String jsonValue = null;
try {
jsonValue = JsonUtil.object2JsonString(user[i]);
if (jsonValue != null) {
indexResponse = transportClient.prepareIndex(INDEXNAME, TYPENAME, str).setSource(jsonValue)
.execute().actionGet();
successNum++;
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
return successNum;
}
}
/**
*批量插入
*/
public static void bathAddUser(TransportClient client, List<User> users) {
BulkRequestBuilder bulkRequest = transportClient.prepareBulk();
for (int i = 0; i < users.size(); i++) {
UUID uuid = UUID.randomUUID();
String str = uuid.toString();
String jsonValue = null;
try {
jsonValue = JsonUtil.object2JsonString(users.get(i));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
bulkRequest.add(client.prepareIndex("user", "info", str).setSource(jsonValue));
// 一万条插入一次
if (i % 10000 == 0) {
bulkRequest.execute().actionGet();
}
System.out.println("已经插入第" + i + "多少条");
}
}
补充知识:使用java创建ES(ElasticSearch)连接池
1.首先要有一个创建连接的工厂类
package com.aly.util;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
/**
* EliasticSearch连接池工厂对象
* @author 00000
*
*/
public class EsClientPoolFactory implements PooledObjectFactory<RestHighLevelClient>{
@Override
public void activateObject(PooledObject<RestHighLevelClient> arg0) throws Exception {
System.out.println("activateObject");
}
/**
* 销毁对象
*/
@Override
public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
RestHighLevelClient highLevelClient = pooledObject.getObject();
highLevelClient.close();
}
/**
* 生产对象
*/
//@SuppressWarnings({ "resource" })
@Override
public PooledObject<RestHighLevelClient> makeObject() throws Exception {
//Settings settings = Settings.builder().put("cluster.name","elasticsearch").build();
RestHighLevelClient client = null;
try {
/*client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"),9300));*/
client = new RestHighLevelClient(RestClient.builder(
new HttpHost("192.168.1.121", 9200, "http"), new HttpHost("192.168.1.122", 9200, "http"),
new HttpHost("192.168.1.123", 9200, "http"), new HttpHost("192.168.1.125", 9200, "http"),
new HttpHost("192.168.1.126", 9200, "http"), new HttpHost("192.168.1.127", 9200, "http")));
} catch (Exception e) {
e.printStackTrace();
}
return new DefaultPooledObject<RestHighLevelClient>(client);
}
@Override
public void passivateObject(PooledObject<RestHighLevelClient> arg0) throws Exception {
System.out.println("passivateObject");
}
@Override
public boolean validateObject(PooledObject<RestHighLevelClient> arg0) {
return true;
}
}
2.然后再写我们的连接池工具类
package com.aly.util;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.elasticsearch.client.RestHighLevelClient;
/**
* ElasticSearch 连接池工具类
*
* @author 00000
*
*/
public class ElasticSearchPoolUtil {
// 对象池配置类,不写也可以,采用默认配置
private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
// 采用默认配置maxTotal是8,池中有8个client
static {
poolConfig.setMaxTotal(8);
}
// 要池化的对象的工厂类,这个是我们要实现的类
private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory();
// 利用对象工厂类和配置类生成对象池
private static GenericObjectPool<RestHighLevelClient> clientPool = new GenericObjectPool<>(esClientPoolFactory,
poolConfig);
/**
* 获得对象
*
* @return
* @throws Exception
*/
public static RestHighLevelClient getClient() throws Exception {
// 从池中取一个对象
RestHighLevelClient client = clientPool.borrowObject();
return client;
}
/**
* 归还对象
*
* @param client
*/
public static void returnClient(RestHighLevelClient client) {
// 使用完毕之后,归还对象
clientPool.returnObject(client);
}
}
来源:https://blog.csdn.net/baidu_16217779/article/details/71633284


猜你喜欢
- DownloadManager三大组件介绍DownloadManager类似于下载队列,管理所有当前正在下载或者等待下载的项目;他可以维持
- 在JavaWeb的相关开发中经常会涉及到多级菜单的展示,为了方便菜单的管理需要使用数据库进行支持,本例采用相关算法讲数据库中的条形记录进行相
- 本人新手,有什么问题 还请指出来,大家一起学习进步,话不多说。首先,搭建dubbo项目,要有三个工程。它们分别是:maven java工程
- 概述源码就是能够被用来执行,生成机器能够识别的代码,通过开源源码,可以引用其功能。重要性1、mybatis中的sql执行,不仅要知道返回的结
- 本文实例为大家分享了Android读写文件工具类的具体代码,供大家参考,具体内容如下public class Utils { p
- 很长时间以来一直代码中用的比较多的数据列表主要是List,而且都是ArrayList,感觉有这个玩意就够了。ArrayList是用于实现动态
- 本文实例为大家分享了Android自定义广播接收的具体代码,供大家参考,具体内容如下实现效果:MainActivity.java代码:pac
- Android UI中TextView的使用方法一、TextView不同区域设置颜色,大小、点击事件String msg = getReso
- XSS是一种经常出现在web应用中的计算机安全漏洞,具体信息请自行Google。本文只分享在Spring Cloud Gateway中执行通
- 本文实例讲述了Java基于swing实现的弹球游戏代码。分享给大家供大家参考。主要功能代码如下:package Game;import ja
- 前言开发做得久了,总免不了会遇到各种坑。而在Android开发的路上,『软键盘挡住了输入框』这个坑,可谓是一个旷日持久的巨坑——来来来,我们
- Java 两种延时thread和timer详解及实例代码在Java中有时候需要使程序暂停一点时间,称为延时。普通延时用Thread.slee
- 在ios手机上经常看到页面上下滑动回弹效果,安卓中没有原生控件支持,这里自己就去自定义一个scrollview实现回弹效果1. 新建MySc
- 线程死锁是指由于两个或者多个线程互相持有对方所需要的资源,导致这些线程处于等待状态,无法前往执行。当线程进入对象的synchronized代
- 实现的功能1.导入非xls和xlsx格式的文件2.导入空数据的excel文件3.数据缺失4.导入的excel文件中有重复的数据5.导入的ex
- 1.对原生态jdbc程序中问题总结1.1 jdbc程序需求:使用jdbc查询mysql数据库中用户表的记录statement:向数据库中发送
- 当我们使用Jmeter工具进行接口测试,可利用CSV Data Set Config配置元件,对测试数据进行参数化,循环读取csv文档中每一
- 背景某项目,客户要求使用已有的 weblogic 部署已经开发好的 springboot,于是乎对 springboot 进行了部分配置的调
- 异常处理增强错误恢复能力是提高代码健壮性的最有力的途径之一,C语言中采用的错误处理方法被认为是紧耦合的,函数的使用者必须在非常靠近函数调用的
- 在Android 5.0以后的版本中,定义一个button时,系统自动会加一个阴影的效果,有的时候这种效果看起来比较好,有的时候不符合UI的