librsync  2.3.4
delta.c
Go to the documentation of this file.
1 /*= -*- c-basic-offset: 4; indent-tabs-mode: nil; -*-
2  *
3  * librsync -- library for network deltas
4  *
5  * Copyright (C) 2000, 2001 by Martin Pool <mbp@sourcefrog.net>
6  * Copyright (C) 2003 by Donovan Baarda <abo@minkirri.apana.org.au>
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU Lesser General Public License as published by
10  * the Free Software Foundation; either version 2.1 of the License, or
11  * (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16  * GNU Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22 
23  /*=
24  | Let's climb to the TOP of that
25  | MOUNTAIN and think about STRIP
26  | MINING!!
27  */
28 
29 /** \file delta.c
30  * Generate in streaming mode an rsync delta given a set of signatures, and a
31  * new file.
32  *
33  * The size of blocks for signature generation is determined by the block size
34  * in the incoming signature.
35  *
36  * To calculate a signature, we need to be able to see at least one block of
37  * the new file at a time. Once we have that, we calculate its weak signature,
38  * and see if there is any block in the signature hash table that has the same
39  * weak sum. If there is one, then we also compute the strong sum of the new
40  * block, and cross check that. If they're the same, then we can assume we have
41  * a match.
42  *
43  * The final block of the file has to be handled a little differently, because
44  * it may be a short match. Short blocks in the signature don't include their
45  * length -- we just allow for the final short block of the file to match any
46  * block in the signature, and if they have the same checksum we assume they
47  * must have the same length. Therefore, when we emit a COPY command, we have
48  * to send it with a length that is the same as the block matched, and not the
49  * block length from the signature.
50  *
51  * Profiling results as of v1.26, 2001-03-18:
52  *
53  * If everything matches, then we spend almost all our time in rs_mdfour64 and
54  * rs_weak_sum, which is unavoidable and therefore a good profile.
55  *
56  * If nothing matches, it is not so good.
57  *
58  * 2002-06-26: Donovan Baarda
59  *
60  * The following is based entirely on pysync. It is much cleaner than the
61  * previous incarnation of this code. It is slightly complicated because in
62  * this case the output can block, so the main delta loop needs to stop when
63  * this happens.
64  *
65  * In pysync a 'last' attribute is used to hold the last miss or match for
66  * extending if possible. In this code, basis_len and scan_pos are used instead
67  * of 'last'. When basis_len > 0, last is a match. When basis_len = 0 and
68  * scan_pos is > 0, last is a miss. When both are 0, last is None (ie,
69  * nothing).
70  *
71  * Pysync is also slightly different in that a 'flush' method is available to
72  * force output of accumulated data. This 'flush' is use to finalise delta
73  * calculation. In librsync input is terminated with an eof flag on the input
74  * stream. I have structured this code similar to pysync with a seperate flush
75  * function that is used when eof is reached. This allows for a flush style API
76  * if one is ever needed. Note that flush in pysync can be used for more than
77  * just terminating delta calculation, so a flush based API can in some ways be
78  * more flexible...
79  *
80  * The input data is first scanned, then processed. Scanning identifies input
81  * data as misses or matches, and emits the instruction stream. Processing the
82  * data consumes it off the input scoop and outputs the processed miss data
83  * into the tube.
84  *
85  * The scoop contains all data yet to be processed. The scan_pos is an index
86  * into the scoop that indicates the point scanned to. As data is scanned,
87  * scan_pos is incremented. As data is processed, it is removed from the scoop
88  * and scan_pos adjusted. Everything gets complicated because the tube can
89  * block. When the tube is blocked, no data can be processed. */
90 
91 #include "config.h" /* IWYU pragma: keep */
92 #include <assert.h>
93 #include <stdlib.h>
94 #include "librsync.h"
95 #include "job.h"
96 #include "sumset.h"
97 #include "checksum.h"
98 #include "scoop.h"
99 #include "emit.h"
100 #include "trace.h"
101 
102 /** Max length of a miss is 64K including 3 command bytes. */
103 #define MAX_MISS_LEN (MAX_DELTA_CMD - 3)
104 
105 static rs_result rs_delta_s_scan(rs_job_t *job);
106 static rs_result rs_delta_s_flush(rs_job_t *job);
107 static rs_result rs_delta_s_end(rs_job_t *job);
108 static inline rs_result rs_getinput(rs_job_t *job, size_t block_len);
109 static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
110  size_t *match_len);
111 static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
112  size_t match_len);
113 static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len);
114 static inline rs_result rs_appendflush(rs_job_t *job);
115 static inline rs_result rs_processmatch(rs_job_t *job);
116 static inline rs_result rs_processmiss(rs_job_t *job);
117 
118 /** Get a block of data if possible, and see if it matches.
119  *
120  * On each call, we try to process all of the input data available on the scoop
121  * and input buffer. */
123 {
124  const size_t block_len = job->signature->block_len;
125  rs_long_t match_pos;
126  size_t match_len;
127  rs_result result;
128 
129  rs_job_check(job);
130  /* output any pending output from the tube */
131  if ((result = rs_tube_catchup(job)) != RS_DONE)
132  return result;
133  /* read the input into the scoop */
134  if ((result = rs_getinput(job, block_len)) != RS_DONE)
135  return result;
136  /* while output is not blocked and there is a block of data */
137  while ((result == RS_DONE) && ((job->scan_pos + block_len) < job->scan_len)) {
138  /* check if this block matches */
139  if (rs_findmatch(job, &match_pos, &match_len)) {
140  /* append the match and reset the weak_sum */
141  result = rs_appendmatch(job, match_pos, match_len);
142  weaksum_reset(&job->weak_sum);
143  } else {
144  /* rotate the weak_sum and append the miss byte */
145  weaksum_rotate(&job->weak_sum, job->scan_buf[job->scan_pos],
146  job->scan_buf[job->scan_pos + block_len]);
147  result = rs_appendmiss(job, 1);
148  }
149  }
150  /* if we completed OK */
151  if (result == RS_DONE) {
152  /* if we reached eof, we can flush the last fragment */
153  if (job->stream->eof_in) {
154  job->statefn = rs_delta_s_flush;
155  return RS_RUNNING;
156  } else {
157  /* we are blocked waiting for more data */
158  return RS_BLOCKED;
159  }
160  }
161  return result;
162 }
163 
164 static rs_result rs_delta_s_flush(rs_job_t *job)
165 {
166  const size_t block_len = job->signature->block_len;
167  rs_long_t match_pos;
168  size_t match_len;
169  rs_result result;
170 
171  rs_job_check(job);
172  /* output any pending output from the tube */
173  if ((result = rs_tube_catchup(job)) != RS_DONE)
174  return result;
175  /* read the input into the scoop */
176  if ((result = rs_getinput(job, block_len)) != RS_DONE)
177  return result;
178  /* while output is not blocked and there is any remaining data */
179  while ((result == RS_DONE) && (job->scan_pos < job->scan_len)) {
180  /* check if this block matches */
181  if (rs_findmatch(job, &match_pos, &match_len)) {
182  /* append the match and reset the weak_sum */
183  result = rs_appendmatch(job, match_pos, match_len);
184  weaksum_reset(&job->weak_sum);
185  } else {
186  /* rollout from weak_sum and append the miss byte */
187  weaksum_rollout(&job->weak_sum, job->scan_buf[job->scan_pos]);
188  rs_trace("block reduced to " FMT_SIZE "",
189  weaksum_count(&job->weak_sum));
190  result = rs_appendmiss(job, 1);
191  }
192  }
193  /* if we are not blocked, flush and set end statefn. */
194  if (result == RS_DONE) {
195  result = rs_appendflush(job);
196  job->statefn = rs_delta_s_end;
197  }
198  if (result == RS_DONE) {
199  return RS_RUNNING;
200  }
201  return result;
202 }
203 
204 static rs_result rs_delta_s_end(rs_job_t *job)
205 {
206  rs_emit_end_cmd(job);
207  return RS_DONE;
208 }
209 
210 static inline rs_result rs_getinput(rs_job_t *job, size_t block_len)
211 {
212  size_t min_len = block_len + MAX_DELTA_CMD;
213 
214  job->scan_len = rs_scoop_avail(job);
215  if (job->scan_len < min_len && !job->stream->eof_in)
216  job->scan_len = min_len;
217  return rs_scoop_readahead(job, job->scan_len, (void **)&job->scan_buf);
218 }
219 
220 /** find a match at scan_pos, returning the match_pos and match_len.
221  *
222  * Note that this will calculate weak_sum if required. It will also determine
223  * the match_len.
224  *
225  * This routine could be modified to do xdelta style matches that would extend
226  * matches past block boundaries by matching backwards and forwards beyond the
227  * block boundaries. Extending backwards would require decrementing scan_pos as
228  * appropriate. */
229 static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
230  size_t *match_len)
231 {
232  const size_t block_len = job->signature->block_len;
233 
234  /* calculate the weak_sum if we don't have one */
235  if (weaksum_count(&job->weak_sum) == 0) {
236  /* set match_len to min(block_len, scan_avail) */
237  *match_len = job->scan_len - job->scan_pos;
238  if (*match_len > block_len) {
239  *match_len = block_len;
240  }
241  /* Update the weak_sum */
242  weaksum_update(&job->weak_sum, job->scan_buf + job->scan_pos,
243  *match_len);
244  rs_trace("calculate weak sum from scratch length " FMT_SIZE "",
245  weaksum_count(&job->weak_sum));
246  } else {
247  /* set the match_len to the weak_sum count */
248  *match_len = weaksum_count(&job->weak_sum);
249  }
250  *match_pos =
251  rs_signature_find_match(job->signature, weaksum_digest(&job->weak_sum),
252  job->scan_buf + job->scan_pos, *match_len);
253  return *match_pos != -1;
254 }
255 
256 /** Append a match at match_pos of length match_len to the delta, extending a
257  * previous match if possible, or flushing any previous miss/match. */
258 static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
259  size_t match_len)
260 {
261  rs_result result = RS_DONE;
262 
263  /* if last was a match that can be extended, extend it */
264  if (job->basis_len && (job->basis_pos + job->basis_len) == match_pos) {
265  job->basis_len += match_len;
266  } else {
267  /* else appendflush the last value */
268  result = rs_appendflush(job);
269  /* make this the new match value */
270  job->basis_pos = match_pos;
271  job->basis_len = match_len;
272  }
273  /* increment scan_pos to point at next unscanned data */
274  job->scan_pos += match_len;
275  /* we can only process from the scoop if output is not blocked */
276  if (result == RS_DONE) {
277  /* process the match data off the scoop */
278  result = rs_processmatch(job);
279  }
280  return result;
281 }
282 
283 /** Append a miss of length miss_len to the delta, extending a previous miss
284  * if possible, or flushing any previous match.
285  *
286  * This also breaks misses up into 32KB segments to avoid accumulating too much
287  * in memory. */
288 static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
289 {
290  rs_result result = RS_DONE;
291 
292  /* If last was a match, or MAX_MISS_LEN misses, appendflush it. */
293  if (job->basis_len || (job->scan_pos >= MAX_MISS_LEN)) {
294  result = rs_appendflush(job);
295  }
296  /* increment scan_pos */
297  job->scan_pos += miss_len;
298  return result;
299 }
300 
301 /** Flush any accumulating hit or miss, appending it to the delta. */
302 static inline rs_result rs_appendflush(rs_job_t *job)
303 {
304  /* if last is a match, emit it and reset last by resetting basis_len */
305  if (job->basis_len) {
306  rs_trace("matched " FMT_LONG " bytes at " FMT_LONG "!", job->basis_len,
307  job->basis_pos);
308  rs_emit_copy_cmd(job, job->basis_pos, job->basis_len);
309  job->basis_len = 0;
310  return rs_processmatch(job);
311  /* else if last is a miss, emit and process it */
312  } else if (job->scan_pos) {
313  rs_trace("got " FMT_SIZE " bytes of literal data", job->scan_pos);
314  rs_emit_literal_cmd(job, (int)job->scan_pos);
315  return rs_processmiss(job);
316  }
317  /* otherwise, nothing to flush so we are done */
318  return RS_DONE;
319 }
320 
321 /** Process matching data in the scoop.
322  *
323  * The scoop contains match data at scan_buf of length scan_pos. This function
324  * processes that match data, returning RS_DONE if it completes, or RS_BLOCKED
325  * if it gets blocked. After it completes scan_pos is reset to still point at
326  * the next unscanned data.
327  *
328  * This function currently just removes data from the scoop and adjusts
329  * scan_pos appropriately. In the future this could be used for something like
330  * context compressing of miss data. Note that it also calls rs_tube_catchup to
331  * output any pending output. */
332 static inline rs_result rs_processmatch(rs_job_t *job)
333 {
334  assert(job->copy_len == 0);
335  rs_scoop_advance(job, job->scan_pos);
336  job->scan_buf += job->scan_pos;
337  job->scan_len -= job->scan_pos;
338  job->scan_pos = 0;
339  return rs_tube_catchup(job);
340 }
341 
342 /** Process miss data in the scoop.
343  *
344  * The scoop contains miss data at scan_buf of length scan_pos. This function
345  * processes that miss data, returning RS_DONE if it completes, or RS_BLOCKED
346  * if it gets blocked. After it completes scan_pos is reset to still point at
347  * the next unscanned data.
348  *
349  * This function uses rs_tube_copy to queue copying from the scoop into output.
350  * and uses rs_tube_catchup to do the copying. This automaticly removes data
351  * from the scoop, but this can block. While rs_tube_catchup is blocked,
352  * scan_pos does not point at legit data, so scanning can also not proceed.
353  *
354  * In the future this could do compression of miss data before outputing it. */
355 static inline rs_result rs_processmiss(rs_job_t *job)
356 {
357  assert(job->write_len > 0);
358  rs_tube_copy(job, job->scan_pos);
359  job->scan_buf += job->scan_pos;
360  job->scan_len -= job->scan_pos;
361  job->scan_pos = 0;
362  return rs_tube_catchup(job);
363 }
364 
365 /** State function that does a slack delta containing only literal data to
366  * recreate the input. */
368 {
369  size_t avail = rs_scoop_avail(job);
370 
371  if (avail) {
372  rs_trace("emit slack delta for " FMT_SIZE " available bytes", avail);
373  rs_emit_literal_cmd(job, (int)avail);
374  rs_tube_copy(job, avail);
375  return RS_RUNNING;
376  } else if (rs_scoop_eof(job)) {
377  job->statefn = rs_delta_s_end;
378  return RS_RUNNING;
379  }
380  return RS_BLOCKED;
381 }
382 
383 /** State function for writing out the header of the encoding job. */
385 {
387  if (job->signature) {
388  job->statefn = rs_delta_s_scan;
389  } else {
390  rs_trace("no signature provided for delta, using slack deltas");
391  job->statefn = rs_delta_s_slack;
392  }
393  return RS_RUNNING;
394 }
395 
397 {
398  rs_job_t *job;
399 
400  job = rs_job_new("delta", rs_delta_s_header);
401  /* Caller can pass NULL sig or empty sig for "slack deltas". */
402  if (sig && sig->count > 0) {
403  rs_signature_check(sig);
404  /* Caller must have called rs_build_hash_table() by now. */
405  assert(sig->hashtable);
406  job->signature = sig;
407  weaksum_init(&job->weak_sum, rs_signature_weaksum_kind(sig));
408  }
409  return job;
410 }
Abstract wrappers around different weaksum and strongsum implementations.
static int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, size_t *match_len)
find a match at scan_pos, returning the match_pos and match_len.
Definition: delta.c:229
static rs_result rs_processmiss(rs_job_t *job)
Process miss data in the scoop.
Definition: delta.c:355
static rs_result rs_processmatch(rs_job_t *job)
Process matching data in the scoop.
Definition: delta.c:332
rs_job_t * rs_delta_begin(rs_signature_t *sig)
Prepare to compute a streaming delta.
Definition: delta.c:396
static rs_result rs_delta_s_scan(rs_job_t *job)
Get a block of data if possible, and see if it matches.
Definition: delta.c:122
static rs_result rs_delta_s_slack(rs_job_t *job)
State function that does a slack delta containing only literal data to recreate the input.
Definition: delta.c:367
static rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
Append a miss of length miss_len to the delta, extending a previous miss if possible,...
Definition: delta.c:288
static rs_result rs_appendflush(rs_job_t *job)
Flush any accumulating hit or miss, appending it to the delta.
Definition: delta.c:302
static rs_result rs_delta_s_header(rs_job_t *job)
State function for writing out the header of the encoding job.
Definition: delta.c:384
static rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, size_t match_len)
Append a match at match_pos of length match_len to the delta, extending a previous match if possible,...
Definition: delta.c:258
#define MAX_MISS_LEN
Max length of a miss is 64K including 3 command bytes.
Definition: delta.c:103
encoding output routines.
void rs_emit_delta_header(rs_job_t *)
Write the magic for the start of a delta.
Definition: emit.c:31
void rs_emit_copy_cmd(rs_job_t *job, rs_long_t where, rs_long_t len)
Write a COPY command for given offset and length.
Definition: emit.c:66
void rs_emit_end_cmd(rs_job_t *)
Write an END command.
Definition: emit.c:105
void rs_emit_literal_cmd(rs_job_t *, int len)
Write a LITERAL command.
Definition: emit.c:37
Generic state-machine interface.
#define rs_job_check(job)
Assert that a job is valid.
Definition: job.h:130
#define MAX_DELTA_CMD
Max length of a singled delta command is including command bytes.
Definition: job.h:44
Public header for librsync.
rs_result
Return codes from nonblocking rsync operations.
Definition: librsync.h:180
@ RS_RUNNING
The job is still running, and not yet finished or blocked.
Definition: librsync.h:183
@ RS_DONE
Completed successfully.
Definition: librsync.h:181
@ RS_BLOCKED
Blocked waiting for more data.
Definition: librsync.h:182
rs_result rs_scoop_readahead(rs_job_t *job, size_t len, void **ptr)
Read from scoop without advancing.
Definition: scoop.c:148
void rs_scoop_advance(rs_job_t *job, size_t len)
Advance the input cursor forward len bytes.
Definition: scoop.c:117
Manage librsync streams of IO.
rs_result rs_tube_catchup(rs_job_t *job)
Put whatever will fit from the tube into the output of the stream.
Definition: tube.c:120
static bool rs_scoop_eof(rs_job_t *job)
Test if the scoop has reached eof.
Definition: scoop.h:100
void rs_tube_copy(rs_job_t *job, size_t len)
Queue up a request to copy through len bytes from the input to the output of the stream.
Definition: tube.c:160
int eof_in
True if there is no more data after this.
Definition: librsync.h:345
The contents of this structure are private.
Definition: job.h:47
rs_long_t basis_pos
Copy from the basis position.
Definition: job.h:117
rs_byte_t * scan_buf
The delta scan buffer, where scan_buf[scan_pos..scan_len] is the data yet to be scanned.
Definition: job.h:104
size_t copy_len
If copy_len is >0, then that much data should be copied through from the input.
Definition: job.h:114
size_t scan_pos
The delta scan position.
Definition: job.h:106
size_t scan_len
The delta scan buffer length.
Definition: job.h:105
rs_result(* statefn)(rs_job_t *)
Callback for each processing step.
Definition: job.h:56
rs_signature_t * signature
Pointer to the signature that's being used by the operation.
Definition: job.h:72
weaksum_t weak_sum
The rollsum weak signature accumulator used by delta.c.
Definition: job.h:84
Signature of a whole file.
Definition: sumset.h:44
int count
Total number of blocks.
Definition: sumset.h:48
hashtable_t * hashtable
The hashtable for finding matches.
Definition: sumset.h:51
int block_len
The block length.
Definition: sumset.h:46
The rs_signature class implementation of a file signature.
rs_long_t rs_signature_find_match(rs_signature_t *sig, rs_weak_sum_t weak_sum, void const *buf, size_t len)
Find a matching block offset in a signature.
Definition: sumset.c:242
static weaksum_kind_t rs_signature_weaksum_kind(rs_signature_t const *sig)
Get the weaksum kind for a signature.
Definition: sumset.h:114
#define rs_signature_check(sig)
Assert that a signature is valid.
Definition: sumset.h:107
logging functions.