1、Rsync算法包括4大部分:
1)分块Checksum算法;
2)传输算法;
3)checksum查找算法;
4)比对算法。
以上算法不懂的可以百度哟,也可以参考我给的链接~~~
(PS:)
在此着重讲一下比对算法中弱校验和,采用Adler32算法;强校验和,采用MD5算法。而Rsync算法的真正的精髓在于roll一次之后不用重新计算checksum,而是仅仅在原有checksum的基础上进行调整就可以得出新的checksum,所以弱校验和算法是核心。
其中Adler32算法如下:
Adler-32通过求解两个16位的数值A、B实现,并将结果连结成一个32位整数。
A就是字符串中每个字节的和,而B是A在相加时每一步的阶段值之和。在Adler-32开始运行时,A初始化为1,B初始化为0,最后的校验和要模上65521(最大的素数)。
A = 1 + D1 + D2 + ... + Dn (mod 65521)
B = (1 + D1) + (1 + D1 + D2) + ... + (1 + D1 + D2 + ... + Dn) (mod 65521) = n×D1 + (n−1)×D2 + (n−2)×D3 + ... + Dn + n (mod 65521)
Adler-32(D) = B × 65536 + A
其中D为字符串的字节,n是D的字节长度。
可以根据上述公式推出移动k个字节后的A和B。
2、本地文件和云端文件如下图:
操作解析:
Remote端:1)将数据按照Chunck进行切分
2)计算每一个chunk的弱校验和与强校验和
3)保存至Map中,key为弱校验ID,value为chunk链表。
(Map<Long,List<Chunk>>)
Local端:1)读取一个Chunck 判断数据是否够一个chunck大小
2)根据读取到的Chunk 根Remote的Map中是否匹配,如果匹配则检查是否有differ Data,如果有differ Data则将其加入patch中
3)最终得到一个匹配的chunk index + differ Data的链表
最后遍历补丁(patch),在Remote端生成新的文件。
3、已经知道了上述原理,接下来就是代码实现,首先需准备Adler32算法和MD5算法类,如下:
/**
* 粗略比较Adler32算法,比对的时候为O(1),总时间复杂度为O(n) * Adler32算法: * A(1, n) = 1 + D1 + D2 + ... + Dn (mod 65521) * A(2, n+1) = 1+ D2 + D3 +.....+ (Dn+1) (mod 65521) * B(1,n) = (1 + D1) + (1 + D1 + D2) + ... + (1 + D1 + D2 + ... + Dn) (mod 65521) * = n×D1 + (n−1)×D2 + (n−2)×D3 + ... + Dn + n (mod 65521) * B(2,n+1) = (1 + D2) + (1 + D2 + D3) + ... + (1 + D2 + D3 + ... + Dn+1) (mod 65521) * Adler-32(D) = B × 65536 + A * * */ public class Adler32 { /* * the largest prime number smaller than 2^16 */ protected static final int MOD_ADLER = 65521;/**
* datas 待计算的数据块 * offset 待计算的偏移 * length 待计算的长度 * 校验和 */ public static int adler32( byte[] datas , int offset , int length ){ int A = 1; int B = 0; for( int i = offset ; i< offset + length ; i++ ){ //1. 计算A A += datas[i]; if( A >= MOD_ADLER ){ A -= MOD_ADLER; } //2. 计算B B += A; if( B >= MOD_ADLER){ B -= MOD_ADLER; } } //3. 返回校验和 return B << 16 | A; } /** * Adler-32 核心算法在于计算下一块的checksum复杂度为 O( 1 ) * oldAdler32 初始chunksum * @param chunkSize 块大小 * @param preByte 前一个字节 * @param nextByte 后一个字节 * @return */ public static int nextAdler32(int oldAdler32 , int chunkSize, byte preByte , byte nextByte ){ int oldA = oldAdler32 & 0xFFFF ; int oldB = (oldAdler32 >>> 16) & 0xFFFF; int A = oldA - preByte + nextByte; //向右移动一字节 int B = oldB - preByte * (chunkSize ) -1 + A ; return (B << 16) | A; } /** * * @param datas 待加密数据块 * @return 校验和 */ public static int adler32(byte[] datas){ return adler32( datas , 0 , datas.length ); } }
public class MD5 {
static MessageDigest md = null; static { try { md = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException ne) { ne.printStackTrace(); } }public static String getMD5(byte[] source) {
String s = null; char hexDigits[] = { // 用来将字节转换成 16 进制表示的字符 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; try { md.update(source); byte tmp[] = md.digest(); // MD5 的计算结果是一个 128 位的长整数, // 用字节表示就是 16 个字节 char str[] = new char[16 * 2]; // 每个字节用 16 进制表示的话,使用两个字符, // 所以表示成 16 进制需要 32 个字符 int k = 0; // 表示转换结果中对应的字符位置 for (int i = 0; i < 16; i++) { // 从第一个字节开始,对 MD5 的每一个字节 // 转换成 16 进制字符的转换 byte byte0 = tmp[i]; // 取第 i 个字节 str[k++] = hexDigits[byte0 >>> 4 & 0xf]; // 取字节中高 4 位的数字转换, // >>> // 为逻辑右移,将符号位一起右移 str[k++] = hexDigits[byte0 & 0xf]; // 取字节中低 4 位的数字转换 } s = new String(str); // 换后的结果转换为字符串} catch (Exception e) {
e.printStackTrace(); } return s; } }
然后需要Chunk类和补丁类(Patch),因为Local端最终得到可能是一个匹配的chunk index + differ Data的链表,所以补丁内容(PatchPart)需要分为两类,一个是匹配chunk的index(PatchPartChunk),另一个是不存在的或修改的数据内容(PatchPartData)。如下:
/**
* 校验和,根据Rsync算法原理,里面包括块号,弱校验和,强校验和 * 分别采取Adler32与MD5算法 */ public class Chunk implements Serializable{ private static final long serialVersionUID = 1L; private long index; //块号 private int weakCheckSum;//弱校验和 adler32算法 private String strongCheckSum;//强校验和 md5算法 private int length;//长度public long getIndex() {
return index; } public void setIndex(long index) { this.index = index; } public int getWeakCheckSum() { return weakCheckSum; } public void setWeakCheckSum(int weakCheckSum) { this.weakCheckSum = weakCheckSum; } public String getStrongCheckSum() { return strongCheckSum; } public void setStrongCheckSum(String strongCheckSum) { this.strongCheckSum = strongCheckSum; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } }
/**
* 抽象的补丁类 , 源文件 + 补丁 = 目标文件 */ public class Patch implements Serializable{ private static final long serialVersionUID = 1L; private List<PatchPart> parts = new ArrayList<PatchPart>(); public void add(List<Byte> datas){ parts.add(new PatchPartData(datas)); } public void add(Chunk chunk){ parts.add(new PatchPartChunk(chunk.getIndex())); }public List<PatchPart> getParts() {
return parts; }public void setParts(List<PatchPart> parts) {
this.parts = parts; } }
/**
* Patch内容抽象类 , 因为有2种不同的内容,对于已经存在的内容 * 我们只需要记录index , 不存在的内容,我们需要存byte数据。 */ public abstract class PatchPart implements Serializable{ private static final long serialVersionUID = 1L; }
/**
* 匹配的chunk,只需要记录index; */ public class PatchPartChunk extends PatchPart{ private static final long serialVersionUID = 1L; private long index; public PatchPartChunk(long index){ this.index = index; }public long getIndex() {
return index; }public void setIndex(long index) {
this.index = index; } }
/**
* 不存在数据内容保存其数据 */ public class PatchPartData extends PatchPart{ private static final long serialVersionUID = 1L; private byte[] datas; public PatchPartData(List<Byte> datas){ this.datas = new byte[datas.size()]; for(int i = 0 ; i < datas.size() ; i++){ this.datas[i] = datas.get(i); } }public byte[] getDatas() {
return datas; }public void setDatas(byte[] datas) {
this.datas = datas; } }
最后将分块、比对、记录id和data在Remote端生成新的文件等操作形成一个工具类,并验证其正确性。代码如下:
/**
* 将Rsync中的一些方法抽象为工具方法。 */ public class RsyncUtil { //块大小 //public final static int CHUNK_SIZE = 512; /** * 计算文件的CheckSum 列表 * @param fileName 文件名 * @return checksum 集合 */ public static Map<Long,List<Chunk>> calcCheckSumFromFile(String fileName) { Map<Long,List<Chunk>> chunkMaps = new HashMap<Long,List<Chunk>>(); try { File file = new File(fileName); long fileLength = file.length(); long remainedLength = fileLength ; int read = 0; long index = 0;//块号 RandomAccessFile raf = new RandomAccessFile(fileName,"r"); //RandomAccessFile随机文件读取流 while (remainedLength > 0) { read = (remainedLength < Constant.CHUNK_SIZE) ? (int)remainedLength : Constant.CHUNK_SIZE; byte[] buffer = new byte[read]; raf.read(buffer); Chunk chunk = calcCheckSum(buffer,read); chunk.setIndex(index++); //判断chunks中是否存在 if(chunkMaps.containsKey(chunk.getWeakCheckSum())){ //如果存在,则加入链表中 chunkMaps.get(chunk.getWeakCheckSum()).add(chunk); }else{ //如果不存在,则创建链表加入其中 List<Chunk> chunks = new ArrayList<Chunk>(); chunks.add(chunk); chunkMaps.put(new Long(chunk.getWeakCheckSum()), chunks); } remainedLength -=read; } raf.close(); } catch (Exception e) { e.printStackTrace(); } return chunkMaps; } /** * 计算每块的checksum * @param buffer * @param length * @return */ private static Chunk calcCheckSum(byte[] buffer,int length){ Chunk chunk = new Chunk(); chunk.setWeakCheckSum(Adler32.adler32(buffer,0, length)); chunk.setStrongCheckSum(MD5.getMD5(buffer)); chunk.setLength(length); return chunk; } /** * 根据传入的字节数组去匹配,如果匹配到则返回该Chunk ,否者为空 * @param chunkMaps * @param datas * @return */ public static Chunk match(Map<Long,List<Chunk>> chunkMaps , byte[] datas){ Long weakChecksum = new Long(Adler32.adler32(datas)); //先比较弱校验和 再比较强校验和,如果相等则返回这块数据 if(chunkMaps.containsKey(weakChecksum)){ for(Chunk chunk : chunkMaps.get(weakChecksum)){ if(MD5.getMD5(datas).equals(chunk.getStrongCheckSum())){ return chunk; } } } return null; } /** * 读取下一个block * @param raf * @param remainedLength * @return * @throws Exception */ public static byte[] readNextBlock(RandomAccessFile raf , long remainedLength) throws Exception{ int bufferSize = (remainedLength < Constant.CHUNK_SIZE) ? (int)remainedLength : Constant.CHUNK_SIZE; byte[] buffer = new byte[bufferSize]; raf.read(buffer); return buffer; } /** * 读取下一个byte * @param raf * @param block * @return * @throws Exception */ public static byte[] readNextByte(RandomAccessFile raf , byte[] block) throws Exception{ byte[] next = new byte[1]; raf.read(next); for(int i = 0 ; i < block.length-1 ; i++){ block[i]=block[i+1]; } block[block.length-1] = next[0]; return block; } /** * 创建补丁 * @param localFile * @return * @throws Exception */ public static Patch createPatch(File localFile , Map<Long,List<Chunk>> chunkMaps) throws Exception{ //分块计算文件的弱校验与强校验,并且设置块号 Patch patch = new Patch(); RandomAccessFile raf = new RandomAccessFile(localFile,"r"); long fileLength = localFile.length(); long remainedLength = fileLength; byte[] bytes = {}; Chunk chunk = null; List<Byte> diffBytes = new ArrayList<Byte>(); boolean nextBlock = true;while(remainedLength > 0){
if(nextBlock){ bytes = readNextBlock(raf, remainedLength); remainedLength -=bytes.length; }else{ bytes = readNextByte(raf,bytes); remainedLength--; } //判断是否匹配 chunk = match(chunkMaps, bytes); if(chunk != null){ //匹配的chunk之前存在differ data ,先将它加进patch if( diffBytes.size() > 0 ){ patch.add(diffBytes); diffBytes = new ArrayList<Byte>(); //重新创建一个空的differ bytes } patch.add(chunk); nextBlock = true; }else{ //将最左的byte保存, 不能读block 只能读下一个byte diffBytes.add(bytes[0]); nextBlock = false; } } //最后一个block没有匹配上 需要把bytes 中的剩余数据加入到diffBytes中,0 已经加入了。 if(chunk == null ){ for(int i = 1 ; i < bytes.length ; i++){ diffBytes.add(bytes[i]); } } patch.add(diffBytes); raf.close(); return patch; }public static void applyPatch(Patch patch, String fileName) throws Exception{
RandomAccessFile remoteFile = new RandomAccessFile(fileName,"r"); RandomAccessFile tempFile = new RandomAccessFile( fileName+".temp" , "rw" ); long srcFileLength = remoteFile.length(); //遍历补丁 for(PatchPart part : patch.getParts()){ if(part instanceof PatchPartData){ tempFile.write(((PatchPartData) part).getDatas()); }else{ PatchPartChunk chunkPart = (PatchPartChunk)part; long off = chunkPart.getIndex()*Constant.CHUNK_SIZE; long remainedFileLength = srcFileLength - off; int length = remainedFileLength < Constant.CHUNK_SIZE ? (int)remainedFileLength : Constant.CHUNK_SIZE; byte[] bytes = new byte[length]; remoteFile.seek(off); remoteFile.read(bytes); tempFile.write(bytes); } } remoteFile.close(); tempFile.close(); // 移动文件 File file = new File(fileName); if(file.exists()){ file.delete(); } File temp = new File(fileName+".temp"); temp.renameTo(file); temp.delete(); } public static void main(String[] args) throws Exception{ Map<Long,List<Chunk>> checksums = calcCheckSumFromFile("F:\\rsyncTemp\\remote\\Hello.txt"); Patch patch = createPatch(new File("F:\\rsyncTemp\\local\\Hello.txt"),checksums); applyPatch(patch,"F:\\rsyncTemp\\remote\\Hello.txt"); } }
PS:local和remote均需存在Hello.txt文件。
运行RsyncUtil类的main方法后,remote下的Hello.txt文件内容更新,与local端文件内容相同。
写在最后:本次仅仅提炼出了Rsync算法方法,还未完全实现简单云盘。下次将在此基础上继续实现。若有错误,望纠正。