java连接ElasticSearch集群操作
作者:java界的守门员 发布时间:2023-11-28 04:06:24
标签:java,ElasticSearch,集群
我就废话不多说了,大家还是直接看代码吧~
/*
*es配置类
*
*/
@Configuration
public class ElasticSearchDataSourceConfigurer {
private static final Logger LOG = LogManager.getLogger(ElasticSearchDataSourceConfigurer.class);
@Bean
public TransportClient getESClient() {
//设置集群名称
Settings settings = Settings.builder().put("cluster.name", "bigData-cluster").put("client.transport.sniff", true).build();
//创建client
TransportClient client = null;
try {
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(""), 9300));//集群ip
LOG.info("ESClient连接建立成功");
} catch (UnknownHostException e) {
LOG.info("ESClient连接建立失败");
e.printStackTrace();
}
return client;
}
}
/**
* Simple to Introduction
*
* @Description: [添加类]
*/
@Repository
public class UserDaoImpl implements userDao {
private static final String INDEXNAME = "user";//小写
private static final String TYPENAME = "info";
@Resource
TransportClient transportClient;
@Override
public int addUser(User[] user) {
IndexResponse indexResponse = null;
int successNum = 0;
for (int i = 0; i < user.length; i++) {
UUID uuid = UUID.randomUUID();
String str = uuid.toString();
String jsonValue = null;
try {
jsonValue = JsonUtil.object2JsonString(user[i]);
if (jsonValue != null) {
indexResponse = transportClient.prepareIndex(INDEXNAME, TYPENAME, str).setSource(jsonValue)
.execute().actionGet();
successNum++;
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
return successNum;
}
}
/**
*批量插入
*/
public static void bathAddUser(TransportClient client, List<User> users) {
BulkRequestBuilder bulkRequest = transportClient.prepareBulk();
for (int i = 0; i < users.size(); i++) {
UUID uuid = UUID.randomUUID();
String str = uuid.toString();
String jsonValue = null;
try {
jsonValue = JsonUtil.object2JsonString(users.get(i));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
bulkRequest.add(client.prepareIndex("user", "info", str).setSource(jsonValue));
// 一万条插入一次
if (i % 10000 == 0) {
bulkRequest.execute().actionGet();
}
System.out.println("已经插入第" + i + "多少条");
}
}
补充知识:使用java创建ES(ElasticSearch)连接池
1.首先要有一个创建连接的工厂类
package com.aly.util;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
/**
* EliasticSearch连接池工厂对象
* @author 00000
*
*/
public class EsClientPoolFactory implements PooledObjectFactory<RestHighLevelClient>{
@Override
public void activateObject(PooledObject<RestHighLevelClient> arg0) throws Exception {
System.out.println("activateObject");
}
/**
* 销毁对象
*/
@Override
public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception {
RestHighLevelClient highLevelClient = pooledObject.getObject();
highLevelClient.close();
}
/**
* 生产对象
*/
//@SuppressWarnings({ "resource" })
@Override
public PooledObject<RestHighLevelClient> makeObject() throws Exception {
//Settings settings = Settings.builder().put("cluster.name","elasticsearch").build();
RestHighLevelClient client = null;
try {
/*client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"),9300));*/
client = new RestHighLevelClient(RestClient.builder(
new HttpHost("192.168.1.121", 9200, "http"), new HttpHost("192.168.1.122", 9200, "http"),
new HttpHost("192.168.1.123", 9200, "http"), new HttpHost("192.168.1.125", 9200, "http"),
new HttpHost("192.168.1.126", 9200, "http"), new HttpHost("192.168.1.127", 9200, "http")));
} catch (Exception e) {
e.printStackTrace();
}
return new DefaultPooledObject<RestHighLevelClient>(client);
}
@Override
public void passivateObject(PooledObject<RestHighLevelClient> arg0) throws Exception {
System.out.println("passivateObject");
}
@Override
public boolean validateObject(PooledObject<RestHighLevelClient> arg0) {
return true;
}
}
2.然后再写我们的连接池工具类
package com.aly.util;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.elasticsearch.client.RestHighLevelClient;
/**
* ElasticSearch 连接池工具类
*
* @author 00000
*
*/
public class ElasticSearchPoolUtil {
// 对象池配置类,不写也可以,采用默认配置
private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
// 采用默认配置maxTotal是8,池中有8个client
static {
poolConfig.setMaxTotal(8);
}
// 要池化的对象的工厂类,这个是我们要实现的类
private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory();
// 利用对象工厂类和配置类生成对象池
private static GenericObjectPool<RestHighLevelClient> clientPool = new GenericObjectPool<>(esClientPoolFactory,
poolConfig);
/**
* 获得对象
*
* @return
* @throws Exception
*/
public static RestHighLevelClient getClient() throws Exception {
// 从池中取一个对象
RestHighLevelClient client = clientPool.borrowObject();
return client;
}
/**
* 归还对象
*
* @param client
*/
public static void returnClient(RestHighLevelClient client) {
// 使用完毕之后,归还对象
clientPool.returnObject(client);
}
}
来源:https://blog.csdn.net/baidu_16217779/article/details/71633284
0
投稿
猜你喜欢
- 提到java里的注解,和我们平时的注释还是有很大的区别,主要是作为java特性来使用的,跟我们常见的类是同一个使用的层面。关于java注解的
- SpringBoot找不到映射文件org.apache.ibatis.binding.BindingException: Invalid b
- 一 应用规划: ※ 确定功能。 ※ 必须的界面及界面跳转的流程。
- 一、Thread.start()与Thread.run()的区别通过调用Thread类的start()方法来启动一个线程,这时此线程是处于就
- 本文实例讲述了C#使用Ado.net读取Excel表的方法。分享给大家供大家参考。具体分析如下:微软NET提供了一个交互的方法,通过使用AD
- 引言C#应用通过 Microsoft.Toolkit.Uwp.Notifications NuGet包可以很方便的发送本地通知(Window
- 本文设计一个简单的班级管理系统,满足如下要求:1、设计学生类Student,包含学号(String型)、姓名(String型)、
- 目录断言对象、数组、集合ObjectUtilsStringUtilsCollectionUtils文件、资源、IO 流FileCopyUti
- 前言:文件的上传和下载在日常开发中很是常见,那么这一功能是如何实现的呢,下面我给大家介绍一下实现条件:1、需要一个form标签,method
- 1安装eclipse插件步骤,点击help,选择Eclipse Marketplace2.输入Scala,点击go3.选择搜索到的Scala
- @Profile注解详解@Profile:Spring为我们提供的可以根据当前环境,动态的激活和切换一系列组件的功能;开发环境develop
- 上篇文章中介绍了聊天功能,这里介绍通讯录是如何实现的。首先要加载公司的所有部门,树形结构,然后点击进入部门的人员列表,点击人员能查看详细信息
- 本文实例为大家分享了C++实现企业职工工资管理系统的具体代码,供大家参考,具体内容如下课程设计目的和要求工资管理要和人事管理相联系,生成企业
- 实际开发中订单往往都包含着订单状态,用户每进行一次操作都要切换对应的状态,而每次切换判断当前的状态是必须的,就不可避免的引入一系列判断语句,
- 文章主要涉及到以下几个问题:怎么实现Java的序列化为什么实现了java.io.Serializable接口才能被序列化transient的
- 最初,XML 语言仅仅是意图用来作为 HTML 语言的替代品而出现的,但是随着该语言的不断发展和完善,人们越来越发现它所具有的优点:例如标记
- 三层架构将整个业务应用划分为:(1)界面UI层(2)业务逻辑层(3)数据访问层对于复杂的系统分层可以让结构更加清晰,模块更加独立,便于维护。
- 这篇文章主要介绍了Jmeter如何添加循环控制器,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以
- 一、场景描述接《Java设计模式(一)工厂模式》工厂模式有一缺点,就是破坏了类的封闭性原则。例如,如果需要增加Word文件的数据采集,此时按
- 导入redis的jar包<!-- redis --> <dependency>