Springboot使用influxDB时序数据库的实现
作者:从此寂静无声 发布时间:2024-01-18 13:42:10
目录
引入依赖
配置
构建实体类
保存数据
查询数据
项目中需要存放大量设备日志,且需要对其进行简单的数据分析,信息提取工作.
结合众多考量因素,项目决定使用时序数据库中的领头羊InfluxDB.
引入依赖
项目中使用influxdb-java,在pom文件中添加如下依赖(github地址:https://github.com/influxdata/influxdb-java):
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.15</version>
</dependency>
application.yaml文件配置如下所示(请按照实际情况填写):
spring:
influx:
url: *
password: admin
user: 123
database: log_management
配置
(1) 创建配置类
@Configuration
public class InfluxDbConfig {
@Value("${spring.influx.url:''}")
private String influxDBUrl;
@Value("${spring.influx.user:''}")
private String userName;
@Value("${spring.influx.password:''}")
private String password;
@Value("${spring.influx.database:''}")
private String database;
@Bean
public InfluxDbUtils influxDbUtils() {
return new InfluxDbUtils(userName, password, influxDBUrl, database, "");
}
}
@Data
public class InfluxDbUtils {
private String userName;
private String password;
private String url;
public String database;
private String retentionPolicy;
// InfluxDB实例
private InfluxDB influxDB;
// 数据保存策略
public static String policyNamePix = "logRetentionPolicy_";
public InfluxDbUtils(String userName, String password, String url, String database,
String retentionPolicy) {
this.userName = userName;
this.password = password;
this.url = url;
this.database = database;
this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;
this.influxDB = influxDbBuild();
}
/**
* 连接数据库 ,若不存在则创建
*
* @return influxDb实例
*/
private InfluxDB influxDbBuild() {
if (influxDB == null) {
influxDB = InfluxDBFactory.connect(url, userName, password);
}
try {
createDB(database);
influxDB.setDatabase(database);
} catch (Exception e) {
log.error("create influx db failed, error: {}", e.getMessage());
} finally {
influxDB.setRetentionPolicy(retentionPolicy);
}
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
return influxDB;
}
}
构建实体类
InfluxDB中,measurement对应于传统关系型数据库中的table(database为配置文件中的log_management).
InfluxDB里存储的数据称为时间序列数据,时序数据有零个或多个数据点.
数据点包括time(一个时间戳),measurement(例如logInfo),零个或多个tag,其对应于level,module,device_id),至少一个field(即日志内容,msg=something error).
InfluxDB会根据tag数值建立时间序列(因此tag数值不能选取诸如UUID作为特征值,易导致时间序列过多,导致InfluxDB崩溃),并建立相应索引,以便优化诸如查询速度.
@Builder
@Data
@Measurement(name = "logInfo")
public class LogInfo {
// Column中的name为measurement中的列名
// 此外,需要注意InfluxDB中时间戳均是以UTC时保存,在保存以及提取过程中需要注意时区转换
@Column(name = "time")
private String time;
// 注解中添加tag = true,表示当前字段内容为tag内容
@Column(name = "module", tag = true)
private String module;
@Column(name = "level", tag = true)
private String level;
@Column(name = "device_id", tag = true)
private String deviceId;
@Column(name = "msg")
private String msg;
}
保存数据
以下代码为单条日志保存,influxdb-java亦支持批量保存(因为与InfluxDB通讯均是通过http,因此建议批量保存以减少性能损耗).
LogInfo logInfo = LogInfo.builder()
.level(jsonObject.getString("level"))
.module(module)
.deviceId(deviceId)
.msg(jsonObject.getString("msg"))
.build();
Point point = Point.measurementByPOJO(logInfo.getClass())
.addFieldsFromPOJO(logInfo)
.time(jsonObject.getLong("time"), TimeUnit.MILLISECONDS)
.build();
// 出于业务考量,设备可以设置不同的保存策略(策略名为固定前缀+设备ID)
influxDB.write(influxDBUtils.database, InfluxDbUtils.policyNamePix + deviceId, point);
查询数据
因为代码与业务耦合比较厉害,因此此处仅截选做概要示范.
// InfluxDB支持分页查询,因此可以设置分页查询条件
String pageQuery = " LIMIT " + request.getPageSize() + " OFFSET " + ((request.getPageNum() - 1) * request.getPageSize());
// 此处查询所有内容,如果
String queryCmd = "SELECT * FROM "
// 查询指定设备下的日志信息
// 要指定从 RetentionPolicyName(保存策略前缀+设备ID).measurement(logInfo) 中查询指定数据)
+ InfluxDbUtils.policyNamePix + request.getDeviceId() + "." + "logInfo"
// 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
+ queryCondition
// 查询结果需要按照时间排序
+ " ORDER BY time DESC"
// 添加分页查询条件
+ pageQuery;
选择时序数据库,不建议使用删除以及更新操作,因此不做介绍.
可以通过创建或者RetentionPolicy,来添加或者更新数据的删除时间.
来源:https://www.cnblogs.com/jason1990/p/11076310.html
猜你喜欢
- 引言微信群的用户添加逻辑是 —— 当群人数达到100人后,用户无法再通过扫描群二维码加入,只能让用户先添加群内联系人微信,再由联系人把用户拉
- 最近准备使用Python+Hadoop+Pandas进行一些深度的分析与机器学习相关工作。(当然随着学习过程的进展,现在准备使用Python
- 打开php.ini,首先找到file_uploads = on ;是否允许通过HTTP上传文件的开关。默认为ON即是开upload_tmp_
- 一般上电子商务网站买东西的用户分三种:随便看看,就是不买先看看,买不买再说就是来买东西的这样的需求反应到产品页的购买按钮上,我们一般会看到购
- 本文实例讲述了Python多线程应用于自动化测试操作。分享给大家供大家参考,具体如下:多线程执行测试用例实例:import threadin
- 我们知道为了提高代码的运行速度,我们需要对书写的python代码进行性能测试,而代码性能的高低的直接反馈是电脑运行代码所需要的时间。这里将介
- 创建Deque序列:from collections import dequed = deque()Deque提供了类似list的操作方法:
- 要实现标题的功能,总共分四步:1.创建html错误页2.配置settings3.编写视图4.配置url我的开发环境:django1.10.3
- 方法一:torch.nn.DataParallel1. 原理如下图所示:小朋友一个人做4份作业,假设1份需要60min,共需要240min。
- 零、配置Tomcat默认情况下Tomcat是没有配置用户角色权限的但是,后续Jenkins部署项目到Tomcat服务器,需要用到Tomcat
- 1,检查默认安装的mysql的字符集mysql> show variables like '%char%';+----
- 本文实例讲述了Python list列表中删除多个重复元素操作。分享给大家供大家参考,具体如下:我们以下面这个list为例,删除其中所有值为
- 本文实例讲述了Python使用defaultdict读取文件各列的方法。分享给大家供大家参考,具体如下:#!/usr/bin/python&
- 本章将覆盖所有在Python中使用的基本I/O功能。有关更多函数,请参考标准Python文档。打印到屏幕上:产生输出最简单的方法
- 之前,我们在另外一篇文章中使用Prim算法生成了一个完美迷宫,利用的是遍历网格的方法,这一次,我们要教教大家用遍历墙的方法生成,上一篇文章链
- PHP mysqli_set_charset()函数设置默认客户端字符集:<?php// 假定数据库用户名:root,密码:12345
- 上篇文章介绍了什么是进程、进程与程序的关系、进程的创建与使用、创建进程池等,接下来就来介绍一下进程同步及进程通信。进程同步当多个进程使用同一
- Python实现模拟时钟代码推荐# coding=utf8import sys, pygame, math, randomfrom pyga
- 1:为什么每个layout下都有个inlayout?我们将layout的宽/浮动等属性设置好之后,对于layout内的padding和mar
- 1 Support Vector Machines1.1 Example Dataset 1%matplotlib inlineimport