MySQL 与 Elasticsearch 数据不对称问题解决办法
作者:lqh 发布时间:2024-01-23 13:43:57
标签:MySQL,Elasticsearch
MySQL 与 Elasticsearch 数据不对称问题解决办法
jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。
当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。
这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化
mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id | int(11) | NO | | 0 | |
| title | mediumtext | NO | | NULL | |
| description | mediumtext | YES | | NULL | |
| author | varchar(100) | YES | | NULL | |
| source | varchar(100) | YES | | NULL | |
| content | longtext | YES | | NULL | |
| status | enum('Y','N')| NO | | 'N' | |
| ctime | timestamp | NO | | CURRENT_TIMESTAMP | |
| mtime | timestamp | YES | | ON UPDATE CURRENT_TIMESTAMP | |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)
logstash 增加 mtime 的查询规则
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "password"
schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次
statement => "select * from article where mtime > :sql_last_value"
use_column_value => true
tracking_column => "mtime"
tracking_column_type => "timestamp"
record_last_run => true
last_run_metadata_path => "/var/tmp/article-mtime.last"
}
创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。
CREATE TABLE `elasticsearch_trash` (
`id` int(11) NOT NULL,
`ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
为 article 表创建触发器
CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN
-- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。
IF NEW.status = 'N' THEN
insert into elasticsearch_trash(id) values(OLD.id);
END IF;
-- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。
IF NEW.status = 'Y' THEN
delete from elasticsearch_trash where id = OLD.id;
END IF;
END
CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN
-- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。
insert into elasticsearch_trash(id) values(OLD.id);
END
接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。
你还可以开发相关的程序,这里提供一个 Spring boot 定时任务例子。
实体
package cn.netkiller.api.domain.elasticsearch;
import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
@Entity
@Table
public class ElasticsearchTrash {
@Id
private int id;
@Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
private Date ctime;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public Date getCtime() {
return ctime;
}
public void setCtime(Date ctime) {
this.ctime = ctime;
}
}
仓库
package cn.netkiller.api.repository.elasticsearch;
import org.springframework.data.repository.CrudRepository;
import com.example.api.domain.elasticsearch.ElasticsearchTrash;
public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{
}
定时任务
package cn.netkiller.api.schedule;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.example.api.domain.elasticsearch.ElasticsearchTrash;
import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;
@Component
public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
@Autowired
private TransportClient client;
@Autowired
private ElasticsearchTrashRepository alasticsearchTrashRepository;
public ScheduledTasks() {
}
@Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务
public void cleanTrash() {
for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {
DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();
RestStatus status = response.status();
logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());
if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {
alasticsearchTrashRepository.delete(elasticsearchTrash);
}
}
}
}
Spring boot 启动主程序。
package cn.netkiller.api;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
来源:https://my.oschina.net/neochen/blog/1518679
0
投稿
猜你喜欢
- 前言:1.前面基于Springboot的单体项目介绍已经完结了,至于项目中的其他功能实现我这里就不打算介绍了,因为涉及的知识点不难,而且都是
- 前言这篇博客将介绍光流的概念以及如何使用 Lucas-Kanade 方法估计光流,并演示如何使用 cv2.calcOpticalFlowPy
- 1.手动协程操作:# pip install geventfrom greenlet import greenletdef test():
- Python 开发学习的意义:(1)学习相关安全工具原理.(2)掌握自定义工具及拓展开发解决实战中无工具或手工麻烦批量化等情况.(3)在二次
- python各类经纬度转换,具体代码如下所示:import mathimport urllibimport jsonx_pi = 3.141
- 本文实例讲述了PHP实现根据数组某个键值大小进行排序的方法。分享给大家供大家参考,具体如下:问题:针对给定数组的某个键的键值进行排序解决方法
- 借助zookeeper可以实现服务器的注册与发现,有需求的时候调用zookeeper来发现可用的服务器,将任务均匀分配到各个服务器上去.这样
- 即将上线的百度C2C平台百度“有啊”开始对百度HI用户进行邀请,其首页页面、“有啊”LOGO也首次曝光。从曝光的图片看,百度“有啊”的主色调
- 目录何为模式匹配下载pampy栗子单个字符匹配匹配开头和结尾匹配字典的key使用特性1: HEAD 和 TAIL特性2:甚至能匹配字典中的键
- 今天遇到一个奇怪的现象,使用tensorflow-gpu的时候,出现内存超额~~如果我训练什么大型数据也就算了,关键我就写了一个y=W*x…
- 1.SQL Server对于SQL Server 2000来说,它提供了两个全新的函数(IDENT_CURRENT,SCOPE_IDENTI
- mysql4.1以上版本连接时出现Client does not support authentic
- 前言: Socket又称为套接字,它是所有网络通信的基础。网络通信其实就是进程间的通信,Socket主要是使用IP
- 二级域名的解析指向ASP源代码,懂程序的人一看就明白怎么实现了。呵呵!真简单<%@ LANGUAGE =&nb
- 在使用Celery统计每日访问数量的时候,发现一个任务会同时执行两次,发现同一时间内(1s内)竟然同时发送了两次任务,也就是同时产生了两个w
- 本文实例讲述了php返回相对时间(如:20分钟前,3天前)的方法。分享给大家供大家参考。具体如下:function plural($num)
- PyMySQLPyMySQL概述PyMySQL是一个用于Python编程语言的纯Python MySQL客户端库,它实现了MySQL数据库协
- 本文实例讲述了Python使用百度翻译开发平台实现英文翻译为中文功能。分享给大家供大家参考,具体如下:#coding=utf8import
- 编程中有时候需要一个初始极大值(或极小值)作为temp,当然可以自定义设置为10000(whatever),不过python中有一个值可以代
- Azkaban是什么?Azkaban是由Linkedin公司推出的一个批量工作流任务调度器,主要用于在一个工作流内以一个特定的顺