Java之Rsync并发迁移数据并校验详解
作者:王者丶丿风范 发布时间:2023-07-17 23:01:22
标签:Java,Rsync
java调用Rsync并发迁移数据并执行校验
java代码如下
RsyncFile.java
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import java.io.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.*;
/**
* @ClassName RsyncFile
* @Descriptiom TODO rsync多线程同步迁移数据
* @Author KING
* @Date 2019/11/25 09:17
* @Version 1.2.2
* rsync -vzrtopg --progress --delete //镜像同步
**/
@NoArgsConstructor
public class RsyncFile implements Runnable{
private static final int availProcessors = Runtime.getRuntime().availableProcessors();
//构造以cpu核心数为核心池,cpu线程数为最大池,超时时间为1s,线程队列为大小为 * 的安全阻塞线程队列,拒绝策略为DiscardOldestPolicy()的线程池。(同步数据当然不能丢下拒绝任务)
private ExecutorService ThreadPool = new ThreadPoolExecutor(availProcessors >> 1,
availProcessors,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
//保存扫描得到的文件列表
private static ArrayList<String> fileNameList = new ArrayList<String>();
private String shellname;
private String filename;
private String userip;
private CountDownLatch countDownLatch;
private static int deep = 0;
public RsyncFile(String ShellName, String filename, String UserIP, CountDownLatch countDownLatch) {
this.shellname = ShellName;
this.filename = filename;
this.userip = UserIP;
this.countDownLatch = countDownLatch;
}
public static void main(String[] args) {
try {
new RsyncFile().Do(args[0],args[1],Integer.parseInt(args[2]));
}catch (ArrayIndexOutOfBoundsException e){
System.out.println(e);
System.out.println("Error , args send fault");
System.out.println("please send localAddress remote username @ remote IP or hostname and catalogue");
System.out.println("like this [ /home/test/ root@node1:/test/ 1 ]");
}catch(NumberFormatException e1){
System.out.println(e1);
System.out.println("please input Right Directory depth, this number must be int");
System.out.println("like this [ /home/test/ root@node1:/test/ 1 ]");
}
}
@SneakyThrows
private void Do(String content,String UserIP,int setdeep){
System.out.println("开始执行");
System.out.println("开始时间:" + new Date());
Long a = System.nanoTime();
File file = new File(content);
System.out.println("开始扫描本地指定目录");
GetAllFile(file,setdeep);//按深度扫描非空文件夹和文件
System.out.println("扫描本地目录完成");
//给脚本赋予权限
String [] cmd={"/bin/sh","-c","chmod 755 ./do*"};
Runtime.getRuntime().exec(cmd);
//创建远端目录操作
System.out.println("开始创建远端目录结构");
//一次计数锁用于保证目录创建完成
CountDownLatch doDirLock = new CountDownLatch(1);
ThreadPool.execute(new RsyncFile("./doDirc.sh",content,UserIP,doDirLock));
doDirLock.await();
System.out.println("创建远端目录结构完成");
//开始同步工作
System.out.println("开始执行同步工作");
System.out.println("同步的文件夹或文件总数: " + fileNameList.size());
System.out.println("正在同步。。。。。");
//fileNameList.size()次计数锁用于保证数据同步完成(保证计时准确)
CountDownLatch rsyncLock = new CountDownLatch(fileNameList.size());
System.out.println(fileNameList.size());
for (String fileName:fileNameList) {
//除去文件名中与UserIP重复的文件路径
String RemoteDir = UserIP.concat(fileName.replace(content, ""));
System.out.println("要同步的本地目录或文件: " + fileName);
System.out.println("要同步的远端目录或文件: " + RemoteDir);
ThreadPool.execute(new RsyncFile("./doRync.sh",fileName, RemoteDir,rsyncLock));
}
rsyncLock.await();
System.out.println("执行同步工作完成");
//开始文件校验工作
System.out.println("执行文件校验及重传");
//一次计数锁用于保证校验完成
CountDownLatch chechSumLock = new CountDownLatch(1);
ThreadPool.execute(new RsyncFile("./doChecksum.sh",content,UserIP,chechSumLock));
chechSumLock.await();
System.out.println("文件校验及重传完成");
ThreadPool.shutdown();
Long b = System.nanoTime();
Long aLong = (b - a)/1000000L;
System.out.println("处理时间" + aLong + "ms");
System.out.println("结束时间:" + new Date());
}
/**
* 执行rsync脚本的线程方法,使用PrintWriter来与linux Terminal交互
*/
@Override
public void run() {
try {
String command=shellname.concat(" ").concat(filename).concat(" ").concat(userip);
File wd = new File("/bin");
Process process = null;
process = Runtime.getRuntime().exec("/bin/bash", null, wd);
if (process != null) {
InputStream is = process.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())),
true);
//切换到当前class文件所在目录
out.println("cd " + System.getProperty("user.dir"));
out.println(command);
out.println("exit");
StringBuilder sb = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
sb.append(line + System.lineSeparator());
}
process.waitFor();
reader.close();
out.close();
process.destroy();
System.out.println("result:" + sb.toString());
}else {
System.out.println("找不到系统bash工具,请检查系统是否异常,并为系统创建/bin/sh的bash工具软连接");
}
} catch (Exception e) {
System.err.println(e.getMessage());
}finally {
//倒记数锁释放一次
countDownLatch.countDown();
}
}
/**遍历指定的目录并能指定深度
* @param file 指定要遍历的目录
* @param setDeep 设定遍历深度
*/
@SneakyThrows
private static void GetAllFile(File file, int setDeep) {
if(file != null){
if(file.isDirectory() && deep<setDeep){
deep++;
File f[] = file.listFiles();
if(f != null) {
int length = f.length;
for(int i = 0; i < length; i++)
GetAllFile(f[i],setDeep);
deep--;
}
} else {
if(file.isDirectory()){
//如果为目录末尾添加 / 保证rsync正常处理
fileNameList.add(file.getAbsolutePath().concat("/"));
}else {
fileNameList.add(file.getAbsolutePath());
}
}
}
}
}
doDir.sh
rsync -av --include='*/' --exclude='*' $1 $2 |tee -a /tmp/rsync.log 2>&1
echo "创建目录结构操作"
doRsync.sh
rsync -avzi --stats --progress $1 $2 |tee -a /tmp/rsync.log 2>&1
doChecksum.sh
rsync -acvzi --stats --progress $1 $2 |tee -a /tmp/checksum.log 2>&1
附录
rsync输出日志说明如下
YXcstpoguax path/to/file
|||||||||||
||||||||||╰- x: The extended attribute information changed
|||||||||╰-- a: The ACL information changed
||||||||╰--- u: The u slot is reserved for future use
|||||||╰---- g: Group is different
||||||╰----- o: Owner is different
|||||╰------ p: Permission are different
||||╰------- t: Modification time is different
|||╰-------- s: Size is different
||╰--------- c: Different checksum (for regular files), or
|| changed value (for symlinks, devices, and special files)
|╰---------- the file type:
| f: for a file,
| d: for a directory,
| L: for a symlink,
| D: for a device,
| S: for a special file (e.g. named sockets and fifos)
╰----------- the type of update being done::
<: file is being transferred to the remote host (sent)
>: file is being transferred to the local host (received)
c: local change/creation for the item, such as:
- the creation of a directory
- the changing of a symlink,
- etc.
h: the item is a hard link to another item (requires
--hard-links).
.: the item is not being updated (though it might have
attributes that are being modified)
*: means that the rest of the itemized-output area contains
a message (e.g. "deleting")
来源:https://blog.csdn.net/w4187402/article/details/103274618
0
投稿
猜你喜欢
- 刚接触了一个微服务架构的项目,了解到了启动方式,记录一下1、找到workspace.xml2.打开workspace.xml,找到其中的配置
- AbstractAutowireCapableBeanFactory#applyMergedBeanDefinitionPostProces
- 写在前面SpringBoot创建定时任务的方式很简单,主要有两种方式:一、基于注解的方式(@Scheduled)二、数据库动态配置。实际开发
- 属性属性是一种成员,它提供灵活的机制来读取、写入或计算私有字段的值。属性可用作公共数据成员,但它们实际上是称为“访问器”的特殊方法。这使得可
- 中断线程线程的thread.interrupt()方法是中断线程,将会设置该线程的中断状态位,即设置为true,中断的结果线程是死亡、还是等
- 英文意思随机数可以做什么?生成一些随机的数字用途非常的广泛, 例如随机抽取数据库的一条记录,把生成的数字给变量,某一个时间点执行一些代码,随
- 话说2016年的直播比较火,2017年短视频又火了。但对于开发者来说隐藏在这背后的技术才是我们所关心的,毕竟我们是靠技术吃饭的。现在在安卓中
- yml配置规则属性跟属性值之间使用“:”和一个“空格”隔开,
- 本文实例为大家分享了Android绘制钟表的具体代码,供大家参考,具体内容如下首先要画一个表,我们要先知道步骤如何:1、仪表盘----外面最
- 前言使用Java8的新特性Stream流式处理,可以提高对于集合的一些操作效率,再配合lambda表达式,可以极致的简化代码,尤其还有并行流
- 前言先放一个官网吧,其实本案例就是根据官网案例来的,只是进行了修改配置。Mybatis-plus官网一、搭建一个springboot项目&n
- 实现“摇一摇”功能,其实很简单,就是检测手机的重力感应,具体实现代码如下:1、在 AndroidManifest.xml 中添加操作权限2、
- 在 fluro 中,定义路由处理器 Handler 时可以指定该页面的默认转场形式,或者在使用 navigateTo 方法是可以设置页面跳转
- using System; using System.Drawing; using System.Windows.Forms; using
- UI 妹纸又给了个图叫我做,我一看是这样的:我们首先把这个控件划分成 几个部分:1.底下部分的直线 :2.左右两边的半圆
- 本文实例讲述了Android编程中activity启动时出现白屏、黑屏问题的解决方法。分享给大家供大家参考,具体如下:默认情况下 activ
- 1、注解是什么Java 注解用于为 Java 代码提供元数据,看完这句话也许你还是一脸懵逼,用人话说就是注解不直接影响你的代码执行,仅提供信
- Spring AOP预处理Controller的参数实际编程中,可能会有这样一种情况,前台传过来的参数,我们需要一定的处理才能使用比如有这样
- 一:获取根目录的方法取得控制台应用程序的根目录方法方法1、Environment.CurrentDirectory 取得或设置当前工作目录的
- //计算字符串的MD5值 public string G