博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于Rsync算法的简单云盘实现(上)
阅读量:6238 次
发布时间:2019-06-22

本文共 10880 字,大约阅读时间需要 36 分钟。

  hot3.png

1、Rsync算法包括4大部分:

    1)分块Checksum算法;

    2)传输算法;

    3)checksum查找算法;

    4)比对算法。

以上算法不懂的可以百度哟,也可以参考我给的链接~~~

(PS:)

在此着重讲一下比对算法中弱校验和,采用Adler32算法;强校验和,采用MD5算法。而Rsync算法的真正的精髓在于roll一次之后不用重新计算checksum,而是仅仅在原有checksum的基础上进行调整就可以得出新的checksum,所以弱校验和算法是核心。

其中Adler32算法如下:

Adler-32通过求解两个16位的数值A、B实现,并将结果连结成一个32位整数。

A就是字符串中每个字节的和,而B是A在相加时每一步的阶段值之和。在Adler-32开始运行时,A初始化为1,B初始化为0,最后的校验和要模上65521(最大的素数)。

A = 1 + D1 + D2 + ... + Dn (mod 65521)

B = (1 + D1) + (1 + D1 + D2) + ... + (1 + D1 + D2 + ... + Dn) (mod 65521) = n×D1 + (n−1)×D2 + (n−2)×D3 + ... + Dn + n (mod 65521)

Adler-32(D) = B × 65536 + A

其中D为字符串的字节,n是D的字节长度。

可以根据上述公式推出移动k个字节后的A和B。

2、本地文件和云端文件如下图:

161455_Jxvv_2756867.png

操作解析:

Remote端:1)将数据按照Chunck进行切分

                    2)计算每一个chunk的弱校验和与强校验和

                    3)保存至Map中,key为弱校验ID,value为chunk链表。

                        (Map<Long,List<Chunk>>)

Local端:1)读取一个Chunck 判断数据是否够一个chunck大小

                2)根据读取到的Chunk 根Remote的Map中是否匹配,如果匹配则检查是否有differ                       Data,如果有differ Data则将其加入patch中

                3)最终得到一个匹配的chunk index + differ Data的链表

最后遍历补丁(patch),在Remote端生成新的文件。

3、已经知道了上述原理,接下来就是代码实现,首先需准备Adler32算法和MD5算法类,如下:

/**

 * 粗略比较Adler32算法,比对的时候为O(1),总时间复杂度为O(n)
 * Adler32算法:
 *         A(1, n) = 1 + D1 + D2 + ... + Dn (mod 65521)
 *     A(2,    n+1) = 1+ D2 + D3 +.....+ (Dn+1)  (mod 65521)
 *     B(1,n) =    (1 + D1) + (1 + D1 + D2) + ... + (1 + D1 + D2 + ... + Dn) (mod 65521)
 *            = n×D1 + (n−1)×D2 + (n−2)×D3 + ... + Dn + n (mod 65521)
 *        B(2,n+1) = (1 + D2) + (1 + D2 + D3) + ... + (1 + D2 + D3 + ... + Dn+1) (mod 65521)
 *     Adler-32(D) = B × 65536 + A
 *     
 *   
 */
public class Adler32 {
    /*
     * the largest prime number smaller than 2^16
     */
    protected static final int MOD_ADLER = 65521;

    /**

     * datas  待计算的数据块
     * offset   待计算的偏移
     * length 待计算的长度
     *  校验和
     */
    public static int adler32( byte[] datas , int offset , int length ){
        int A = 1;
        int B = 0;
        for( int i = offset ; i< offset + length ; i++ ){
            //1. 计算A 
            A += datas[i];
            if( A >= MOD_ADLER ){
                A -= MOD_ADLER;
            }
            //2. 计算B
            B += A;
            if( B >= MOD_ADLER){
                B -= MOD_ADLER;
            }
        }
        //3. 返回校验和
        return B << 16 | A;
    }
    
    /**
     *  Adler-32 核心算法在于计算下一块的checksum复杂度为 O( 1 )
     * oldAdler32 初始chunksum
     * @param chunkSize  块大小
     * @param preByte  前一个字节
     * @param nextByte 后一个字节
     * @return
     */
    public static int nextAdler32(int oldAdler32 , int chunkSize, byte preByte , byte nextByte ){
        int oldA = oldAdler32 & 0xFFFF ;
        int oldB = (oldAdler32 >>> 16) & 0xFFFF;
        int A = oldA - preByte + nextByte;  //向右移动一字节
        int B = oldB - preByte *  (chunkSize ) -1 + A  ;
        return (B << 16) | A;
    }
    
    /**
     * 
     * @param datas 待加密数据块
     * @return  校验和
     */
    public static int adler32(byte[] datas){
        return adler32( datas , 0 , datas.length );
    }    
}

public class MD5 {

    static MessageDigest md = null;
    static {
        try {
            md = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException ne) {
            ne.printStackTrace();
        }
    }

    public static String getMD5(byte[] source) {

        String s = null;
        char hexDigits[] = { // 用来将字节转换成 16 进制表示的字符
        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd',
                'e', 'f' };
        try {
            md.update(source);
            byte tmp[] = md.digest(); // MD5 的计算结果是一个 128 位的长整数,
                                        // 用字节表示就是 16 个字节
            char str[] = new char[16 * 2]; // 每个字节用 16 进制表示的话,使用两个字符,
                                            // 所以表示成 16 进制需要 32 个字符
            int k = 0; // 表示转换结果中对应的字符位置
            for (int i = 0; i < 16; i++) { // 从第一个字节开始,对 MD5 的每一个字节
                                            // 转换成 16 进制字符的转换
                byte byte0 = tmp[i]; // 取第 i 个字节
                str[k++] = hexDigits[byte0 >>> 4 & 0xf]; // 取字节中高 4 位的数字转换,
                                                            // >>>
                                                            // 为逻辑右移,将符号位一起右移
                str[k++] = hexDigits[byte0 & 0xf]; // 取字节中低 4 位的数字转换
            }
            s = new String(str); // 换后的结果转换为字符串

        } catch (Exception e) {

            e.printStackTrace();
        }
        return s;
    }
}

 

然后需要Chunk类和补丁类(Patch),因为Local端最终得到可能是一个匹配的chunk index + differ Data的链表,所以补丁内容(PatchPart)需要分为两类,一个是匹配chunk的index(PatchPartChunk),另一个是不存在的或修改的数据内容(PatchPartData)。如下:

/**

 *  校验和,根据Rsync算法原理,里面包括块号,弱校验和,强校验和
 *  分别采取Adler32与MD5算法
 */
public class Chunk implements Serializable{
    private static final long serialVersionUID = 1L;
    private long index; //块号
    private int weakCheckSum;//弱校验和 adler32算法
    private String strongCheckSum;//强校验和 md5算法
    private int length;//长度

    public long getIndex() {

        return index;
    }
    
    public void setIndex(long index) {
        this.index = index;
    }
    
    public int getWeakCheckSum() {
        return weakCheckSum;
    }
    
    public void setWeakCheckSum(int weakCheckSum) {
        this.weakCheckSum = weakCheckSum;
    }
    
    public String getStrongCheckSum() {
        return strongCheckSum;
    }
    
    public void setStrongCheckSum(String strongCheckSum) {
        this.strongCheckSum = strongCheckSum;
    }
    
    public int getLength() {
        return length;
    }
    
    public void setLength(int length) {
        this.length = length;
    }
}

/**

 *  抽象的补丁类 , 源文件 + 补丁  = 目标文件 
 */
public class Patch implements Serializable{
    private static final long serialVersionUID = 1L;
    private List<PatchPart> parts = new ArrayList<PatchPart>();
    
    public void add(List<Byte> datas){
        parts.add(new PatchPartData(datas));
    }
    
    public void add(Chunk chunk){
        parts.add(new PatchPartChunk(chunk.getIndex()));
    }

    public List<PatchPart> getParts() {

        return parts;
    }

    public void setParts(List<PatchPart> parts) {

        this.parts = parts;
    }
}

 /**

 *  Patch内容抽象类 , 因为有2种不同的内容,对于已经存在的内容
 *  我们只需要记录index , 不存在的内容,我们需要存byte数据。
 */
public abstract class PatchPart implements Serializable{
    private static final long serialVersionUID = 1L;
}

 /** 

 *  匹配的chunk,只需要记录index;
 */
public class PatchPartChunk extends PatchPart{
    private static final long serialVersionUID = 1L;
    private long index;
    
    public PatchPartChunk(long index){
        this.index = index;
    }

    public long getIndex() {

        return index;
    }

    public void setIndex(long index) {

        this.index = index;
    }
}

/**

 * 不存在数据内容保存其数据
 */
public class PatchPartData extends PatchPart{
    private static final long serialVersionUID = 1L;
    private byte[] datas;
    
    public PatchPartData(List<Byte> datas){
        this.datas = new byte[datas.size()];
        for(int i = 0 ; i < datas.size() ; i++){
            this.datas[i] = datas.get(i);
        }
    }

    public byte[] getDatas() {

        return datas;
    }

    public void setDatas(byte[] datas) {

        this.datas = datas;
    }
}

最后将分块、比对、记录id和data在Remote端生成新的文件等操作形成一个工具类,并验证其正确性。代码如下:

/**

 *  将Rsync中的一些方法抽象为工具方法。
 */
public class RsyncUtil {
    //块大小
    //public final static int CHUNK_SIZE = 512;
    
    /**
     * 计算文件的CheckSum 列表
     * @param fileName 文件名
     * @return checksum 集合
     */
    public static Map<Long,List<Chunk>> calcCheckSumFromFile(String fileName) {
        Map<Long,List<Chunk>> chunkMaps = new HashMap<Long,List<Chunk>>();
        try {
            File file = new File(fileName);
            long fileLength = file.length();
            long remainedLength = fileLength ;
            int read = 0;
            long  index = 0;//块号
            RandomAccessFile raf = new RandomAccessFile(fileName,"r"); //RandomAccessFile随机文件读取流
            while (remainedLength > 0) {
                read =  (remainedLength < Constant.CHUNK_SIZE) ? (int)remainedLength : Constant.CHUNK_SIZE; 
                byte[] buffer = new byte[read];
                raf.read(buffer);
                Chunk  chunk = calcCheckSum(buffer,read);
                chunk.setIndex(index++);
                //判断chunks中是否存在
                if(chunkMaps.containsKey(chunk.getWeakCheckSum())){
                    //如果存在,则加入链表中
                    chunkMaps.get(chunk.getWeakCheckSum()).add(chunk);
                }else{
                    //如果不存在,则创建链表加入其中
                    List<Chunk> chunks = new ArrayList<Chunk>();
                    chunks.add(chunk);
                    chunkMaps.put(new Long(chunk.getWeakCheckSum()), chunks);
                }
                remainedLength -=read;
            }
            raf.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return chunkMaps;
    }
    
    /**
     * 计算每块的checksum
     * @param buffer
     * @param length
     * @return
     */
    private static Chunk calcCheckSum(byte[] buffer,int length){
        Chunk chunk = new Chunk();
        chunk.setWeakCheckSum(Adler32.adler32(buffer,0, length));
        chunk.setStrongCheckSum(MD5.getMD5(buffer));
        chunk.setLength(length);
        return chunk;
    }
    
    /**
     * 根据传入的字节数组去匹配,如果匹配到则返回该Chunk ,否者为空
     * @param chunkMaps
     * @param datas
     * @return
     */
    public static Chunk match(Map<Long,List<Chunk>> chunkMaps , byte[] datas){
        Long weakChecksum = new Long(Adler32.adler32(datas));
        //先比较弱校验和 再比较强校验和,如果相等则返回这块数据
        if(chunkMaps.containsKey(weakChecksum)){
            for(Chunk chunk : chunkMaps.get(weakChecksum)){
                if(MD5.getMD5(datas).equals(chunk.getStrongCheckSum())){
                    return chunk;
                }
            }
        }
        return null;
    }
    
    /**
     * 读取下一个block
     * @param raf
     * @param remainedLength
     * @return
     * @throws Exception
     */
    public static byte[] readNextBlock(RandomAccessFile raf , long remainedLength) throws Exception{
        int bufferSize =  (remainedLength < Constant.CHUNK_SIZE) ? (int)remainedLength : Constant.CHUNK_SIZE; 
        byte[] buffer = new byte[bufferSize];
        raf.read(buffer);
        return buffer;
    }
    
    /**
     * 读取下一个byte
     * @param raf
     * @param block
     * @return
     * @throws Exception
     */
    public static byte[] readNextByte(RandomAccessFile raf , byte[] block) throws Exception{
        byte[] next = new byte[1];
        raf.read(next);
        for(int i = 0 ; i < block.length-1 ; i++){
            block[i]=block[i+1];
        }
        block[block.length-1] = next[0];
        return block;
    }
    
    /**
     * 创建补丁
     * @param localFile
     * @return
     * @throws Exception
     */
    public static Patch createPatch(File localFile ,  Map<Long,List<Chunk>> chunkMaps) throws Exception{
        //分块计算文件的弱校验与强校验,并且设置块号
        Patch patch = new Patch();
        RandomAccessFile raf = new RandomAccessFile(localFile,"r");
        long fileLength = localFile.length();
        long remainedLength = fileLength;
        byte[] bytes = {};
        Chunk chunk = null;
        List<Byte> diffBytes = new ArrayList<Byte>();
        boolean nextBlock = true;

        while(remainedLength > 0){

            if(nextBlock){
                bytes =  readNextBlock(raf, remainedLength);
                remainedLength -=bytes.length;
            }else{
                bytes = readNextByte(raf,bytes);
                remainedLength--;
            }
            //判断是否匹配
            chunk = match(chunkMaps, bytes);    
            if(chunk != null){
                //匹配的chunk之前存在differ data ,先将它加进patch
                if( diffBytes.size() > 0 ){
                    patch.add(diffBytes);
                    diffBytes  = new ArrayList<Byte>(); //重新创建一个空的differ bytes
                }
                patch.add(chunk);
                nextBlock = true;
            }else{
                //将最左的byte保存, 不能读block 只能读下一个byte
                diffBytes.add(bytes[0]); 
                nextBlock = false;
            }
        }
        //最后一个block没有匹配上 需要把bytes 中的剩余数据加入到diffBytes中,0 已经加入了。
        if(chunk == null ){
            for(int i = 1 ; i < bytes.length ; i++){
                diffBytes.add(bytes[i]);
            }
        }
        patch.add(diffBytes);
        raf.close();
        return patch;
        
    }

    public static void applyPatch(Patch patch, String fileName) throws Exception{        

        RandomAccessFile remoteFile = new RandomAccessFile(fileName,"r");
        RandomAccessFile tempFile = new RandomAccessFile( fileName+".temp" , "rw" );
        long srcFileLength = remoteFile.length();
        //遍历补丁    
        for(PatchPart part : patch.getParts()){
            if(part instanceof PatchPartData){
                tempFile.write(((PatchPartData) part).getDatas());
            }else{
                PatchPartChunk chunkPart = (PatchPartChunk)part;
                long off =  chunkPart.getIndex()*Constant.CHUNK_SIZE;
                long remainedFileLength = srcFileLength - off;
                int length = remainedFileLength < Constant.CHUNK_SIZE ? (int)remainedFileLength : Constant.CHUNK_SIZE;
                byte[] bytes = new byte[length];
                remoteFile.seek(off);
                remoteFile.read(bytes);
                tempFile.write(bytes);
            }
        }
        remoteFile.close();
        tempFile.close();
        // 移动文件
        File file = new File(fileName);
        if(file.exists()){
            file.delete();
        }
        File temp = new File(fileName+".temp");
        temp.renameTo(file);
        temp.delete();
    }
    
    public static void main(String[] args) throws Exception{
        Map<Long,List<Chunk>> checksums = calcCheckSumFromFile("F:\\rsyncTemp\\remote\\Hello.txt");
        Patch patch = createPatch(new File("F:\\rsyncTemp\\local\\Hello.txt"),checksums);
        applyPatch(patch,"F:\\rsyncTemp\\remote\\Hello.txt");
    }
}

PS:local和remote均需存在Hello.txt文件。 

运行RsyncUtil类的main方法后,remote下的Hello.txt文件内容更新,与local端文件内容相同。

写在最后:本次仅仅提炼出了Rsync算法方法,还未完全实现简单云盘。下次将在此基础上继续实现。若有错误,望纠正。

转载于:https://my.oschina.net/eager/blog/684168

你可能感兴趣的文章
ASP.NET中GridView数据导出到Excel
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
swoole项目思维转换 -- 前篇
查看>>
我的友情链接
查看>>
Redis之----Redis的数据类型和操作
查看>>
只读字段与标签字段
查看>>
ubuntu修改时区和时间的方法
查看>>
maven实战 读书笔记三#高级程序员进阶之路#
查看>>
硬盘安装windows 7
查看>>
编译器编译原理--详解
查看>>
第五章 择偶
查看>>
用Fiddler模拟低速网络环境
查看>>
《跟阿铭学Linux》第8章 文档的压缩与打包:课后习题与答案
查看>>
Python练习2
查看>>
新安装的python2.7无法加载error while loading shared libraries: libpython2.7.so.1.0
查看>>
js反混淆解密
查看>>
Exchange Server 2010 DAG搭建及灾难恢复部署方案(准备环境)
查看>>
Android使用本地页面调用android代码
查看>>
MyBatise配置使用
查看>>