网络编程
位置:首页>> 网络编程>> 数据库>> Springboot使用influxDB时序数据库的实现

Springboot使用influxDB时序数据库的实现

作者:从此寂静无声  发布时间:2024-01-18 13:42:10 

标签:Springboot,influxDB
目录
  • 引入依赖

  • 配置

  • 构建实体类

  • 保存数据

  • 查询数据

项目中需要存放大量设备日志,且需要对其进行简单的数据分析,信息提取工作.

结合众多考量因素,项目决定使用时序数据库中的领头羊InfluxDB.

引入依赖

项目中使用influxdb-java,在pom文件中添加如下依赖(github地址:https://github.com/influxdata/influxdb-java):


   <dependency>
       <groupId>org.influxdb</groupId>
       <artifactId>influxdb-java</artifactId>
       <version>2.15</version>
   </dependency>

application.yaml文件配置如下所示(请按照实际情况填写):


spring:
 influx:
   url: *
   password: admin
   user: 123
   database: log_management

配置

(1) 创建配置类


@Configuration
public class InfluxDbConfig {

@Value("${spring.influx.url:''}")
   private String influxDBUrl;

@Value("${spring.influx.user:''}")
   private String userName;

@Value("${spring.influx.password:''}")
   private String password;

@Value("${spring.influx.database:''}")
   private String database;

@Bean
   public InfluxDbUtils influxDbUtils() {
       return new InfluxDbUtils(userName, password, influxDBUrl, database, "");
   }
}

@Data
public class InfluxDbUtils {
   private String userName;
   private String password;
   private String url;
   public String database;
   private String retentionPolicy;
   // InfluxDB实例
   private InfluxDB influxDB;

// 数据保存策略
   public static String policyNamePix = "logRetentionPolicy_";

public InfluxDbUtils(String userName, String password, String url, String database,
                        String retentionPolicy) {
       this.userName = userName;
       this.password = password;
       this.url = url;
       this.database = database;
       this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;
       this.influxDB = influxDbBuild();
   }

/**
    * 连接数据库 ,若不存在则创建
    *
    * @return influxDb实例
    */
   private InfluxDB influxDbBuild() {
       if (influxDB == null) {
           influxDB = InfluxDBFactory.connect(url, userName, password);
       }
       try {
           createDB(database);
           influxDB.setDatabase(database);
       } catch (Exception e) {
           log.error("create influx db failed, error: {}", e.getMessage());
       } finally {
           influxDB.setRetentionPolicy(retentionPolicy);
       }
       influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
       return influxDB;
   }
}

构建实体类

InfluxDB中,measurement对应于传统关系型数据库中的table(database为配置文件中的log_management).
InfluxDB里存储的数据称为时间序列数据,时序数据有零个或多个数据点.
数据点包括time(一个时间戳),measurement(例如logInfo),零个或多个tag,其对应于level,module,device_id),至少一个field(即日志内容,msg=something error).
InfluxDB会根据tag数值建立时间序列(因此tag数值不能选取诸如UUID作为特征值,易导致时间序列过多,导致InfluxDB崩溃),并建立相应索引,以便优化诸如查询速度.


@Builder
@Data
@Measurement(name = "logInfo")
public class LogInfo {

// Column中的name为measurement中的列名
   // 此外,需要注意InfluxDB中时间戳均是以UTC时保存,在保存以及提取过程中需要注意时区转换
   @Column(name = "time")
   private String time;
   // 注解中添加tag = true,表示当前字段内容为tag内容
   @Column(name = "module", tag = true)
   private String module;
   @Column(name = "level", tag = true)
   private String level;
   @Column(name = "device_id", tag = true)
   private String deviceId;
   @Column(name = "msg")
   private String msg;
}

保存数据

以下代码为单条日志保存,influxdb-java亦支持批量保存(因为与InfluxDB通讯均是通过http,因此建议批量保存以减少性能损耗).


   LogInfo logInfo = LogInfo.builder()
       .level(jsonObject.getString("level"))
       .module(module)
       .deviceId(deviceId)
       .msg(jsonObject.getString("msg"))
       .build();
   Point point = Point.measurementByPOJO(logInfo.getClass())
       .addFieldsFromPOJO(logInfo)
       .time(jsonObject.getLong("time"), TimeUnit.MILLISECONDS)
       .build();
   // 出于业务考量,设备可以设置不同的保存策略(策略名为固定前缀+设备ID)
   influxDB.write(influxDBUtils.database, InfluxDbUtils.policyNamePix + deviceId, point);

查询数据

因为代码与业务耦合比较厉害,因此此处仅截选做概要示范.


   // InfluxDB支持分页查询,因此可以设置分页查询条件
   String pageQuery = " LIMIT " + request.getPageSize() + " OFFSET " + ((request.getPageNum() - 1) * request.getPageSize());
   // 此处查询所有内容,如果
   String queryCmd = "SELECT * FROM "
       // 查询指定设备下的日志信息
       // 要指定从 RetentionPolicyName(保存策略前缀+设备ID).measurement(logInfo) 中查询指定数据)
       + InfluxDbUtils.policyNamePix + request.getDeviceId() + "." + "logInfo"
       // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
       + queryCondition
       // 查询结果需要按照时间排序
       + " ORDER BY time DESC"
       // 添加分页查询条件
       + pageQuery;

选择时序数据库,不建议使用删除以及更新操作,因此不做介绍.

可以通过创建或者RetentionPolicy,来添加或者更新数据的删除时间.

来源:https://www.cnblogs.com/jason1990/p/11076310.html

0
投稿

猜你喜欢

手机版 网络编程 asp之家 www.aspxhome.com