LCOV - code coverage report
Current view: top level - lib - shredder.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 559 598 93.5 %
Date: 2015-09-30 14:09:30 Functions: 31 31 100.0 %

          Line data    Source code
       1             : /*
       2             :  *  This file is part of rmlint.
       3             :  *
       4             :  *  rmlint is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  rmlint is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with rmlint.  If not, see <http://www.gnu.org/licenses/>.
      16             :  *
      17             :  * Authors:
      18             :  *
      19             :  *  - Christopher <sahib> Pahl 2010-2015 (https://github.com/sahib)
      20             :  *  - Daniel <SeeSpotRun> T.   2014-2015 (https://github.com/SeeSpotRun)
      21             :  *
      22             :  * Hosted on http://github.com/sahib/rmlint
      23             :  *
      24             :  */
      25             : 
      26             : #include <glib.h>
      27             : #include <unistd.h>
      28             : #include <stdio.h>
      29             : #include <string.h>
      30             : 
      31             : #include <sys/uio.h>
      32             : 
      33             : #include "checksum.h"
      34             : #include "hasher.h"
      35             : 
      36             : #include "preprocess.h"
      37             : #include "utilities.h"
      38             : #include "formats.h"
      39             : 
      40             : #include "shredder.h"
      41             : #include "xattr.h"
      42             : #include "md-scheduler.h"
      43             : 
      44             : /* Enable extra debug messages? */
      45             : #define _RM_SHRED_DEBUG 0
      46             : 
      47             : /* This is the engine of rmlint for file duplicate matching.
      48             :  *
      49             :  * Files are compared in progressive "generations" to identify matching
      50             :  * clusters termed "ShredGroup"s:
      51             :  * Generation 0: Same size files
      52             :  * Generation 1: Same size and same hash of first  ~16kB
      53             :  * Generation 2: Same size and same hash of first  ~50MB
      54             :  * Generation 3: Same size and same hash of first ~100MB
      55             :  * Generation 3: Same size and same hash of first ~150MB
      56             :  * ... and so on until the end of the file is reached.
      57             :  *
      58             :  * The default step size can be configured below.
      59             :  *
      60             :  *
      61             :  * The clusters and generations look something like this:
      62             :  *
      63             :  *+-------------------------------------------------------------------------+
      64             :  *|     Initial list after filtering and preprocessing                      |
      65             :  *+-------------------------------------------------------------------------+
      66             :  *          | same size                   | same size           | same size
      67             :  *   +------------------+           +------------------+    +----------------+
      68             :  *   |   ShredGroup 1   |           |   ShredGroup 2   |    |   ShredGroup 3 |
      69             :  *   |F1,F2,F3,F4,F5,F6 |           |F7,F8,F9,F10,F11  |    |   F12,F13      |
      70             :  *   +------------------+           +------------------+    +----------------+
      71             :  *       |            |                 |            |
      72             :  *  +------------+ +----------+     +------------+  +---------+  +----+ +----+
      73             :  *  | Child 1.1  | |Child 1.2 |     | Child 2.1  |  |Child 2.2|  |3.1 | |3.2 |
      74             :  *  | F1,F3,F6   | |F2,F4,F5  |     |F7,F8,F9,F10|  |  F11    |  |F12 | |F13 |
      75             :  *  |(hash=hash1 | |(hash=h2) |     |(hash=h3)   |  |(hash=h4)|  |(h5)| |(h6)|
      76             :  *  +------------+ +----------+     +------------+  +---------+  +----+ +----+
      77             :  *       |            |                |        |              \       \
      78             :  *   +----------+ +-----------+  +-----------+ +-----------+    free!   free!
      79             :  *   |Child1.1.1| |Child 1.2.1|  |Child 2.2.1| |Child 2.2.2|
      80             :  *   |F1,F3,F6  | |F2,F4,F5   |  |F7,F9,F10  | |   F8      |
      81             :  *   +----------+ +-----------+  +-----------+ +-----------+
      82             :  *               \             \              \             \
      83             :  *                rm!           rm!            rm!           free!
      84             :  *
      85             :  *
      86             :  * The basic workflow is:
      87             :  * 1. One worker thread is established for each physical device
      88             :  * 2. The device thread picks a file from its queue, reads the next increment of that
      89             :  *    file, and sends it to a hashing thread.
      90             :  * 3. Depending on some logic ("worth_waiting"), the device thread may wait for the
      91             :  *    file increment to finish hashing, or may move straight on to the next file in
      92             :  *    the queue.  The "worth_waiting" logic aims to reduce disk seeks on rotational
      93             :  *    devices.
      94             :  * 4. The hashed fragment result is "sifted" into a child RmShredGroup of its parent
      95             :  *    group, and unlinked it from its parent.
      96             :  * 5. (a) If the child RmShredGroup needs hashing (ie >= 2 files and not completely hashed
      97             :  *    yet) then the file is pushed back to the device queue for further hashing;
      98             :  *    (b) If the file is not completely hashed but is the only file in the group (or
      99             :  *    otherwise fails criteria such as --must-match-tagged) then it is retained by the
     100             :  *    child RmShredGroup until a suitable sibling arrives, whereupon it is released to
     101             :  *    the device queue.
     102             :  *    (c) If the file has finished hashing, it is retained by the child RmShredGroup
     103             :  *    until its parent and all ancestors have finished processing, whereupon the file
     104             :  *    is sent to the "result factory" (if >= 2 files in the group) or discarded.
     105             :  *
     106             :  * In the above example, the hashing order will depend on the "worth_waiting" logic.
     107             :  *    On a rotational device the hashing order should end up being something like:
     108             :  *         F1.1 F2.1 (F3.1,F3.2), (F4.1,F4.2), (F5.1,F5.2,F5.3)...
     109             :  *                        ^            ^            ^    ^
     110             :  *        (^ indicates where hashing could continue on to a second increment (avoiding a
     111             :  *           disk seek) because there was already a matching file after the first
     112             :  *           increment)
     113             :  *
     114             :  *    On a non-rotational device where there is no seek penalty, the hashing order is:
     115             :  *         F1.1 F2.1 F3.1 F4.1 F5.1...
     116             :  *
     117             :  *
     118             :  * The threading looks somewhat like this for two devices:
     119             :  *
     120             :  *                          +----------+
     121             :  *                          |  Result  |
     122             :  *                          |  Factory |
     123             :  *                          |  Pipe    |
     124             :  *                          +----------+
     125             :  *                                ^
     126             :  *                                |
     127             :  *                        +--------------+
     128             :  *                        | Matched      |
     129             :  *                        | fully-hashed |
     130             :  *                        | dupe groups  |
     131             :  *    Device #1           +--------------+      Device #2
     132             :  *                                ^
     133             :  * +-------------------+          |          +-------------------+
     134             :  * | RmShredDevice     |          |          | RmShredDevice     |
     135             :  * | Worker            |          |          | Worker            |
     136             :  * | +-------------+   |          |          | +-------------+   |
     137             :  * | | File Queue  |<--+----+     |     +----+>| File Queue  |   |
     138             :  * | +-------------+   |    |     |     |    | +-------------+   |
     139             :  * | pop from          |    |     |     |    |        pop from   |
     140             :  * |  queue            |    |     |     |    |         queue     |
     141             :  * |     |             |    |     |     |    |            |      |
     142             :  * |     |<--Continue  |    |     |     |    | Continue-->|      |
     143             :  * |     |     ^       |    |     |     |    |      ^     |      |
     144             :  * |     v     |       |    |     |     |    |      |     v      |
     145             :  * |   Read    |       |    |     |     |    |      |    Read    |
     146             :  * |     |     |       |    |     |     |    |      |     |      |
     147             :  * |     |     |       |    |     |     |    |      |     |      |
     148             :  * |     |     |       |  Device  |  Device  |      |     |      |
     149             :  * |    [1]    |       |   Not    |    Not   |      |    [1]     |
     150             :  * +-----|-----+-------+ Waiting  |  Waiting +------|-----|------+
     151             :  *       |     |            |     |     |           |     |
     152             :  *       |     |            |     |     |           |     |
     153             :  *       |  Device  +-------+-----+-----+------+  Device  |
     154             :  *       | Waiting  |         Sifting          | Waiting  |
     155             :  *       |     |    |  (Identifies which       |    |     |
     156             :  *       |     -----+  partially-hashed files  +----+     |
     157             :  *       |          |  qualify for further     |          |
     158             :  *       |     +--->|  hashing)                |<--+      |
     159             :  *       |     |    |                          |   |      |
     160             :  *       |     |    +--------------------------+   |      |
     161             :  *       |     |         ^            |            |      |
     162             :  *       |     |         |            v            |      |
     163             :  *       |     |  +----------+   +----------+      |      |
     164             :  *       |     |  |Initial   |   | Rejects  |      |      |
     165             :  *       |     |  |File List |   |          |      |      |
     166             :  *       |     |  +----------+   +----------+      |      |
     167             :  *       |     |                                   |      |
     168             :  *  +----+-----+-----------------------------------+------+----+
     169             :  *  |    v     |        Hashing Pool               |      v    |
     170             :  *  |  +----------+                              +----------+  |
     171             :  *  |  |Hash Pipe |                              |Hash Pipe |  |
     172             :  *  |  +----------+                              +----------+  |
     173             :  *  +----------------------------------------------------------+
     174             :  *
     175             :  *  Note [1] - at this point the read results are sent to the hashpipe
     176             :  *             and the Device must decide if it is worth waiting for
     177             :  *             the hashing/sifting result; if not then the device thread
     178             :  *             will immediately pop the next file from its queue.
     179             :  *
     180             :  *
     181             :  *
     182             :  * Every subbox left and right are the task that are performed.
     183             :  *
     184             :  * The Device Workers, Hash Pipes and Finisher Pipe run as separate threads
     185             :  * managed by GThreadPool.  Note that while they are implemented as
     186             :  * GThreadPools, the hashers and finisher are limited to 1 thread eash
     187             :  * hence the term "pipe" is more appropriate than "pool".  This is
     188             :  * particularly important for hashing because hash functions are generally
     189             :  * order-dependent, ie hash(ab) != hash(ba); the only way to ensure hashing
     190             :  * tasks are complete in correct sequence is to use a single pipe.
     191             :  *
     192             :  * The Device Workers work sequentially through the queue of hashing
     193             :  * jobs; if the device is rotational then the files are sorted in order of
     194             :  * disk offset in order to reduce seek times.
     195             :  *
     196             :  * The Devlist Manager calls the hasher library (see hasher.c) to read one
     197             :  * file at a time.  The hasher library takes care of read buffers, hash
     198             :  * pipe allocation, etc.  Once the hasher is done, the result is sent back
     199             :  * via callback to rm_shred_hash_callback.
     200             :  *
     201             :  * If "worth_waiting" has been flagged then the callback sends the file
     202             :  * back to the Device Worker thread via a GAsyncQueue, whereupon the Device
     203             :  * Manager does a quick check to see if it can continue with the same file;
     204             :  * if not then a new file is taken from the device queue.
     205             :  *
     206             :  * The RmShredGroups don't have a thread managing them, instead the individual
     207             :  * Device Workers and/or hash pipe callbacks write to the RmShredGroups
     208             :  * under mutex protection.
     209             :  *
     210             :  *
     211             :  * The main ("foreground") thread waits for the Devlist Managers to
     212             :  * finish their sequential walk through the files.  If there are still
     213             :  * files to process on the device, the initial thread sends them back to
     214             :  * the GThreadPool for another pass through the files.
     215             :  *
     216             :  *
     217             :  *
     218             :  * Additional notes regarding "paranoid" hashing:
     219             :  *   The default file matching method uses the SHA1 cryptographic hash; there are
     220             :  * several other hash functions available as well.  The data hashing is somewhat
     221             :  * cpu-intensive but this is handled by separate threads (the hash pipes) so generally
     222             :  * doesn't bottleneck rmlint (as long as CPU exceeds disk reading speed).  The subsequent
     223             :  * hash matching is very fast because we only need to compare 20 bytes (in the case of
     224             :  * SHA1) to find matching files.
     225             :  *   The "paranoid" method uses byte-by-byte comparison.  In the implementation, this is
     226             :  * masqueraded as a hash function, but there is no hashing involved.  Instead, the whole
     227             :  * data increment is kept in memory.  This introduces 2 new challenges:
     228             :  * (1) Memory management.  In order to avoid overflowing mem availability, we limit the
     229             :  * number of concurrent active RmShredGroups and also limit the size of each file
     230             :  * increment.
     231             :  * (2) Matching time.  Unlike the conventional hashing strategy (CPU-intensive hashing
     232             :  * followed by simple matching), the paranoid method requires almost no CPU during
     233             :  * reading/hashing, but requires a large memcmp() at the end to find matching
     234             :  *files/groups.
     235             :  * That would not be a bottleneck as long as the reader thread still has other files
     236             :  * that it can go and read while the hasher/sorter does the memcmp in parallel... but
     237             :  * unfortunately the memory management issue means that's not always an option and so
     238             :  * reading gets delayed while waiting for the memcmp() to catch up.
     239             :  * Two strategies are used to speed this up:
     240             :  * (a) Pre-matching of candidate digests.  During reading/hashing, as each buffer (4096
     241             :  * bytes) is read in, it can be checked against a "twin candidate".  We can send twin
     242             :  * candidates to the hash pipe at any time via rm_digest_send_match_candidate().  If the
     243             :  * correct twin candidate has been sent, then when the increment is finished the
     244             :  * matching has already been done, and rm_digest_equal() is almost instantaneous.
     245             :  * (b) Shadow hash.  A lightweight hash (Murmor) is calculated and used for hashtable
     246             :  * lookup to quickly identify potential matches.  This saves time in the case of
     247             :  * RmShredGroups with large number of child groups and where the pre-matching strategy
     248             :  * failed.
     249             :  * */
     250             : 
     251             : /*
     252             : * Below some performance controls are listed that may impact performance.
     253             : * Controls are sorted by subjectve importanceness.
     254             : */
     255             : 
     256             : ////////////////////////////////////////////
     257             : // OPTIMISATION PARAMETERS FOR DECIDING   //
     258             : // HOW MANY BYTES TO READ BEFORE STOPPING //
     259             : // TO COMPARE PROGRESSIVE HASHES          //
     260             : ////////////////////////////////////////////
     261             : 
     262             : /* how many pages can we read in (seek_time)/(CHEAP)? (use for initial read) */
     263             : #define SHRED_BALANCED_PAGES (4)
     264             : 
     265             : /* How large a single page is (typically 4096 bytes but not always)*/
     266             : #define SHRED_PAGE_SIZE (sysconf(_SC_PAGESIZE))
     267             : 
     268             : #define SHRED_MAX_READ_FACTOR \
     269             :     ((256 * 1024 * 1024) / SHRED_BALANCED_PAGES / SHRED_PAGE_SIZE)
     270             : 
     271             : /* Maximum increment size for paranoid digests.  This is smaller than for other
     272             :  * digest types due to memory management issues.
     273             :  * 16MB should be big enough buffer size to make seek time fairly insignificant
     274             :  * relative to sequential read time, eg 16MB read at typical 100 MB/s read
     275             :  * rate = 160ms read vs typical seek time 10ms*/
     276             : #define SHRED_PARANOID_BYTES (16 * 1024 * 1024)
     277             : 
     278             : /* Whether to use buffered fread() or direct preadv()
     279             :  * The latter is preferred, since it's slightly faster on linux.
     280             :  * Other platforms may have different results though or not even have preadv.
     281             :  * */
     282             : #define SHRED_USE_BUFFERED_READ (0)
     283             : 
     284             : /* When paranoid hashing, if a file increments is larger
     285             :  * than SHRED_PREMATCH_THRESHOLD, we take a guess at the likely
     286             :  * matching file and do a progressive memcmp() on each buffer
     287             :  * rather than waiting until the whole increment has been read
     288             :  * */
     289             : #define SHRED_PREMATCH_THRESHOLD (0)
     290             : 
     291             : /* Minimum number of files or bytes that should be in an update sent to
     292             :  * the statistics counters.
     293             :  */
     294             : #define SHRED_MIN_FILE_STATS_PACK_FILES (16)
     295             : #define SHRED_MIN_FILE_STATS_PACK_BYTES (1024 * 1024 * 16)
     296             : 
     297             : /* empirical estimate of mem usage per file (excluding read buffers and
     298             :  * paranoid digests) */
     299             : #define RM_AVERAGE_MEM_PER_FILE (100)
     300             : 
     301             : ////////////////////////
     302             : //  MATHS SHORTCUTS   //
     303             : ////////////////////////
     304             : 
     305             : #define SIGN_DIFF(X, Y) (((X) > (Y)) - ((X) < (Y))) /* handy for comparing unit64's */
     306             : 
     307             : ///////////////////////////////////////////////////////////////////////
     308             : //    INTERNAL STRUCTURES, WITH THEIR INITIALISERS AND DESTROYERS    //
     309             : ///////////////////////////////////////////////////////////////////////
     310             : 
     311             : /////////* The main extra data for the duplicate finder *///////////
     312             : 
     313             : typedef struct RmShredTag {
     314             :     RmSession *session;
     315             :     GAsyncQueue *device_return;
     316             :     GMutex hash_mem_mtx;
     317             :     gint64 paranoid_mem_alloc; /* how much memory to allocate for paranoid checks */
     318             :     gint32 active_groups; /* how many shred groups active (only used with paranoid) */
     319             :     RmHasher *hasher;
     320             :     GThreadPool *result_pool;
     321             :     gint32 page_size;
     322             :     bool mem_refusing;
     323             : 
     324             :     GMutex lock;
     325             : 
     326             :     gint32 remaining_files;
     327             :     gint64 remaining_bytes;
     328             : 
     329             :     bool after_preprocess : 1;
     330             : 
     331             :     /* cached counters to avoid blocking delays in rm_shred_adjust_counters */
     332             :     gint cache_file_count;
     333             :     gint cache_filtered_count;
     334             :     gint64 cache_byte_count;
     335             : 
     336             : } RmShredTag;
     337             : 
     338             : typedef enum RmShredGroupStatus {
     339             :     RM_SHRED_GROUP_DORMANT = 0,
     340             :     RM_SHRED_GROUP_START_HASHING,
     341             :     RM_SHRED_GROUP_HASHING,
     342             :     RM_SHRED_GROUP_FINISHING,
     343             :     RM_SHRED_GROUP_FINISHED
     344             : } RmShredGroupStatus;
     345             : 
     346             : #define NEEDS_PREF(group) \
     347             :     (group->session->cfg->must_match_tagged || group->session->cfg->keep_all_untagged)
     348             : #define NEEDS_NPREF(group) \
     349             :     (group->session->cfg->must_match_untagged || group->session->cfg->keep_all_tagged)
     350             : #define NEEDS_NEW(group) (group->session->cfg->min_mtime)
     351             : 
     352             : #define HAS_CACHE(session) \
     353             :     (session->cfg->read_cksum_from_xattr || session->cache_list.length)
     354             : 
     355             : #define NEEDS_SHADOW_HASH(cfg) \
     356             :     (TRUE || cfg->merge_directories || cfg->read_cksum_from_xattr)
     357             : /* @sahib - performance is faster with shadow hash, probably due to hash
     358             :  * collisions in large RmShredGroups */
     359             : 
     360             : typedef struct RmShredGroup {
     361             :     /* holding queue for files; they are held here until the group first meets
     362             :      * criteria for further hashing (normally just 2 or more files, but sometimes
     363             :      * related to preferred path counts)
     364             :      * */
     365             :     GQueue *held_files;
     366             : 
     367             :     /* link(s) to next generation of RmShredGroups(s) which have this RmShredGroup as
     368             :      * parent*/
     369             :     GHashTable *children;
     370             : 
     371             :     /* RmShredGroup of the same size files but with lower RmFile->hash_offset;
     372             :      * getsset to null when parent dies
     373             :      * */
     374             :     struct RmShredGroup *parent;
     375             : 
     376             :     /* total number of files that have passed through this group*/
     377             :     gulong num_files;
     378             : 
     379             :     /* number of pending digests */
     380             :     gulong num_pending;
     381             : 
     382             :     /* list of in-progress paranoid digests, used for pre-matching */
     383             :     GList *in_progress_digests;
     384             : 
     385             :     /* set if group has 1 or more files from "preferred" paths */
     386             :     bool has_pref : 1;
     387             : 
     388             :     /* set if group has 1 or more files from "non-preferred" paths */
     389             :     bool has_npref : 1;
     390             : 
     391             :     /* set if group has 1 or more files newer than cfg->min_mtime */
     392             :     bool has_new : 1;
     393             : 
     394             :     /* set if group has been greenlighted by paranoid mem manager */
     395             :     bool is_active : 1;
     396             : 
     397             :     /* true if all files in the group have an external checksum */
     398             :     bool has_only_ext_cksums : 1;
     399             : 
     400             :     /* incremented for each file in the group that obtained it's checksum from ext.
     401             :      * If all files came from there we do not even need to hash the group.
     402             :      */
     403             :     gulong num_ext_cksums;
     404             : 
     405             :     /* if whole group has same basename, pointer to first file, else null */
     406             :     RmFile *unique_basename;
     407             : 
     408             :     /* initially RM_SHRED_GROUP_DORMANT; triggered as soon as we have >= 2 files
     409             :      * and meet preferred path and will go to either RM_SHRED_GROUP_HASHING or
     410             :      * RM_SHRED_GROUP_FINISHING.  When switching from dormant to hashing, all
     411             :      * held_files are released and future arrivals go straight to hashing
     412             :      * */
     413             :     RmShredGroupStatus status;
     414             : 
     415             :     /* file size of files in this group */
     416             :     RmOff file_size;
     417             : 
     418             :     /* file hash_offset when files arrived in this group */
     419             :     RmOff hash_offset;
     420             : 
     421             :     /* file hash_offset for next increment */
     422             :     RmOff next_offset;
     423             : 
     424             :     /* Factor of SHRED_BALANCED_PAGES to read next time */
     425             :     gint64 offset_factor;
     426             : 
     427             :     /* allocated memory for paranoid hashing */
     428             :     RmOff mem_allocation;
     429             : 
     430             :     /* checksum structure taken from first file to enter the group.  This allows
     431             :      * digests to be released from RmFiles and memory freed up until they
     432             :      * are required again for further hashing.*/
     433             :     RmDigestType digest_type;
     434             :     RmDigest *digest;
     435             : 
     436             :     /* lock for access to this RmShredGroup */
     437             :     GMutex lock;
     438             : 
     439             :     /* Reference to main */
     440             :     const RmSession *session;
     441             : } RmShredGroup;
     442             : 
     443             : typedef struct RmSignal {
     444             :     GMutex lock;
     445             :     GCond cond;
     446             :     gboolean done;
     447             : } RmSignal;
     448             : 
     449           6 : static RmSignal *rm_signal_new(void) {
     450           6 :     RmSignal *self = g_slice_new(RmSignal);
     451           6 :     g_mutex_init(&self->lock);
     452           6 :     g_cond_init(&self->cond);
     453           6 :     self->done = FALSE;
     454           6 :     return self;
     455             : }
     456             : 
     457           6 : static void rm_signal_wait(RmSignal *signal) {
     458           6 :     g_mutex_lock(&signal->lock);
     459             :     {
     460          18 :         while(!signal->done) {
     461           6 :             g_cond_wait(&signal->cond, &signal->lock);
     462             :         }
     463             :     }
     464           6 :     g_mutex_unlock(&signal->lock);
     465           6 :     g_mutex_clear(&signal->lock);
     466           6 :     g_cond_clear(&signal->cond);
     467           6 :     g_slice_free(RmSignal, signal);
     468           6 : }
     469             : 
     470           6 : static void rm_signal_done(RmSignal *signal) {
     471           6 :     g_mutex_lock(&signal->lock);
     472             :     {
     473           6 :         signal->done = TRUE;
     474           6 :         g_cond_signal(&signal->cond);
     475             :     }
     476           6 :     g_mutex_unlock(&signal->lock);
     477           6 : }
     478             : 
     479             : /////////// RmShredGroup ////////////////
     480             : 
     481             : /* allocate and initialise new RmShredGroup */
     482      109490 : static RmShredGroup *rm_shred_group_new(RmFile *file) {
     483      109490 :     RmShredGroup *self = g_slice_new0(RmShredGroup);
     484             : 
     485      109490 :     if(file->digest) {
     486       55948 :         self->digest_type = file->digest->type;
     487       55948 :         self->digest = file->digest;
     488       55948 :         file->digest = NULL;
     489             :     } else {
     490             :         /* initial groups have no checksum */
     491       53542 :         g_assert(!file->shred_group);
     492             :     }
     493             : 
     494      109490 :     self->parent = file->shred_group;
     495      109490 :     self->session = file->session;
     496             : 
     497      109490 :     if(self->parent) {
     498       55948 :         self->offset_factor = MIN(self->parent->offset_factor * 8, SHRED_MAX_READ_FACTOR);
     499             :     } else {
     500       53542 :         self->offset_factor = 1;
     501             :     }
     502             : 
     503      109490 :     self->held_files = g_queue_new();
     504      109490 :     self->file_size = file->file_size;
     505      109490 :     self->hash_offset = file->hash_offset;
     506             : 
     507      109490 :     self->session = file->session;
     508             : 
     509      109490 :     g_mutex_init(&self->lock);
     510             : 
     511      109490 :     return self;
     512             : }
     513             : 
     514             : //////////////////////////////////
     515             : // OPTIMISATION AND MEMORY      //
     516             : // MANAGEMENT ALGORITHMS        //
     517             : //////////////////////////////////
     518             : 
     519             : /* Compute optimal size for next hash increment
     520             :  * call this with group locked
     521             :  * */
     522      225665 : static gint32 rm_shred_get_read_size(RmFile *file, RmShredTag *tag) {
     523      225665 :     RmShredGroup *group = file->shred_group;
     524      225665 :     g_assert(group);
     525             : 
     526      225665 :     gint32 result = 0;
     527             : 
     528             :     /* calculate next_offset property of the RmShredGroup */
     529      225665 :     RmOff balanced_bytes = tag->page_size * SHRED_BALANCED_PAGES;
     530      225665 :     RmOff target_bytes = balanced_bytes * group->offset_factor;
     531      225665 :     if(group->next_offset == 2) {
     532           0 :         file->fadvise_requested = 1;
     533             :     }
     534             : 
     535             :     /* round to even number of pages, round up to MIN_READ_PAGES */
     536      225665 :     RmOff target_pages = MAX(target_bytes / tag->page_size, 1);
     537      225665 :     target_bytes = target_pages * tag->page_size;
     538             : 
     539             :     /* test if cost-effective to read the whole file */
     540      225665 :     if(group->hash_offset + target_bytes + (balanced_bytes) >= group->file_size) {
     541      225309 :         group->next_offset = group->file_size;
     542      225309 :         file->fadvise_requested = 1;
     543             :     } else {
     544         356 :         group->next_offset = group->hash_offset + target_bytes;
     545             :     }
     546             : 
     547             :     /* for paranoid digests, make sure next read is not > max size of paranoid buffer */
     548      225665 :     if(group->digest_type == RM_DIGEST_PARANOID) {
     549       19862 :         group->next_offset =
     550       19862 :             MIN(group->next_offset, group->hash_offset + SHRED_PARANOID_BYTES);
     551             :     }
     552             : 
     553      225665 :     file->status = RM_FILE_STATE_NORMAL;
     554      225665 :     result = (group->next_offset - file->hash_offset);
     555             : 
     556      225665 :     return result;
     557             : }
     558             : 
     559             : /* Memory manager (only used for RM_DIGEST_PARANOID at the moment
     560             :  * but could also be adapted for other digests if very large
     561             :  * filesystems are contemplated)
     562             :  */
     563             : 
     564      108526 : static void rm_shred_mem_return(RmShredGroup *group) {
     565      108526 :     if(group->is_active) {
     566        3826 :         RmShredTag *tag = group->session->shredder;
     567        3826 :         g_mutex_lock(&tag->hash_mem_mtx);
     568             :         {
     569        3826 :             tag->paranoid_mem_alloc += group->mem_allocation;
     570        3826 :             tag->active_groups--;
     571        3826 :             group->is_active = FALSE;
     572             : #if _RM_SHRED_DEBUG
     573             :             rm_log_debug_line("Mem avail %" LLI ", active groups %d. " YELLOW "Returned %" LLU " bytes for paranoid hashing.",
     574             :                          tag->paranoid_mem_alloc,
     575             :                          tag->active_groups,
     576             :                          group->mem_allocation);
     577             : #endif
     578        3826 :             tag->mem_refusing = FALSE;
     579        3826 :             if(group->digest) {
     580           8 :                 g_assert(group->digest->type == RM_DIGEST_PARANOID);
     581           8 :                 rm_digest_free(group->digest);
     582           8 :                 group->digest = NULL;
     583             :             }
     584             :         }
     585        3826 :         g_mutex_unlock(&tag->hash_mem_mtx);
     586        3826 :         group->mem_allocation = 0;
     587             :     }
     588      108526 : }
     589             : 
     590             : /* what is the maximum number of files that a group may end up with (including
     591             :  * parent, grandparent etc group files that haven't been hashed yet)?
     592             :  */
     593      103638 : static gulong rm_shred_group_potential_file_count(RmShredGroup *group) {
     594      103638 :     if(group) {
     595       51823 :         return group->num_pending + rm_shred_group_potential_file_count(group->parent);
     596             :     } else {
     597       51815 :         return 0;
     598             :     }
     599             : }
     600             : 
     601             : /* Governer to limit memory usage by limiting how many RmShredGroups can be
     602             :  * active at any one time
     603             :  * NOTE: group_lock must be held before calling rm_shred_check_paranoid_mem_alloc
     604             :  */
     605       64033 : static bool rm_shred_check_paranoid_mem_alloc(RmShredGroup *group,
     606             :                                               int active_group_threshold) {
     607       64033 :     if(group->status >= RM_SHRED_GROUP_HASHING) {
     608             :         /* group already committed */
     609       12218 :         return true;
     610             :     }
     611             : 
     612       51815 :     gint64 mem_required =
     613      103630 :         (rm_shred_group_potential_file_count(group) / 2 + 1) *
     614       51815 :         MIN(group->file_size - group->hash_offset, SHRED_PARANOID_BYTES);
     615             : 
     616       51815 :     bool result = FALSE;
     617       51815 :     RmShredTag *tag = group->session->shredder;
     618       51815 :     g_mutex_lock(&tag->hash_mem_mtx);
     619             :     {
     620       51815 :         gint64 inherited = group->parent ? group->parent->mem_allocation : 0;
     621             : 
     622      101677 :         if(0 || mem_required <= tag->paranoid_mem_alloc + inherited ||
     623       53688 :            (tag->active_groups <= active_group_threshold)) {
     624             :             /* ok to proceed */
     625             :             /* only take what we need from parent */
     626        3826 :             inherited = MIN(inherited, mem_required);
     627        3826 :             if(inherited > 0) {
     628           4 :                 group->parent->mem_allocation -= inherited;
     629           4 :                 group->mem_allocation += inherited;
     630             :             }
     631             : 
     632             :             /* take the rest from bank */
     633        3826 :             gint64 borrowed =
     634        3826 :                 MIN(mem_required - inherited, (gint64)tag->paranoid_mem_alloc);
     635        3826 :             tag->paranoid_mem_alloc -= borrowed;
     636        3826 :             group->mem_allocation += borrowed;
     637             : 
     638        3826 :             if(tag->mem_refusing) {
     639           0 :                 rm_log_debug_line(
     640             :                     "Mem avail %"LLI", active groups %d. Borrowed %"LLI". Inherited: %"LLI" bytes for paranoid hashing",
     641             :                              tag->paranoid_mem_alloc,
     642             :                              tag->active_groups, borrowed,
     643             :                              inherited
     644             :                     );
     645             : 
     646           0 :                 if(mem_required > borrowed + inherited) {
     647           0 :                     rm_log_debug_line("...due to %i active group limit", active_group_threshold);
     648             :                 }
     649             : 
     650           0 :                 tag->mem_refusing = FALSE;
     651             :             }
     652             : 
     653        3826 :             tag->active_groups++;
     654        3826 :             group->is_active = TRUE;
     655        3826 :             group->status = RM_SHRED_GROUP_HASHING;
     656        3826 :             result = TRUE;
     657             :         } else {
     658       47989 :             if(!tag->mem_refusing) {
     659         581 :                 rm_log_debug_line("Mem avail %"LLI", active groups %d. " RED
     660             :                              "Refused request for %" LLU
     661             :                              " bytes for paranoid hashing.",
     662             :                              tag->paranoid_mem_alloc, tag->active_groups, mem_required);
     663         581 :                 tag->mem_refusing = TRUE;
     664             :             }
     665       47989 :             result = FALSE;
     666             :         }
     667             :     }
     668       51815 :     g_mutex_unlock(&tag->hash_mem_mtx);
     669             : 
     670       51815 :     return result;
     671             : }
     672             : 
     673             : ///////////////////////////////////
     674             : //    RmShredDevice UTILITIES    //
     675             : ///////////////////////////////////
     676             : 
     677      666733 : static void rm_shred_adjust_counters(RmShredTag *tag, int files, gint64 bytes) {
     678      666733 :     g_mutex_lock(&tag->lock);
     679             :     {
     680      666807 :         RmSession *session = tag->session;
     681      666807 :         tag->cache_byte_count += bytes;
     682      666807 :         tag->cache_file_count += files;
     683      666807 :         if(files < 0) {
     684      222489 :             tag->cache_filtered_count += files;
     685             :         }
     686             : 
     687     1333558 :         if(abs(tag->cache_byte_count) >= SHRED_MIN_FILE_STATS_PACK_BYTES ||
     688      666751 :            abs(tag->cache_file_count) >= SHRED_MIN_FILE_STATS_PACK_FILES) {
     689         112 :             rm_fmt_lock_state(session->formats);
     690             :             {
     691             : #if RM_SHRED_DEBUG
     692             :                 gint64 bytes_remaining =
     693             :                     session->shred_bytes_remaining + tag->cache_byte_count;
     694             :                 gint64 files_remaining =
     695             :                     session->shred_files_remaining + tag->cache_file_count;
     696             :                 g_assert(check_bytes >= 0);
     697             :                 g_assert(check_files >= 0);
     698             : #endif
     699         112 :                 session->shred_files_remaining += tag->cache_file_count;
     700         112 :                 session->total_filtered_files += tag->cache_filtered_count;
     701         112 :                 session->shred_bytes_remaining += tag->cache_byte_count;
     702             : 
     703         112 :                 rm_fmt_set_state(session->formats, (tag->after_preprocess)
     704             :                                                        ? RM_PROGRESS_STATE_SHREDDER
     705             :                                                        : RM_PROGRESS_STATE_PREPROCESS);
     706         112 :                 tag->cache_file_count = 0;
     707         112 :                 tag->cache_filtered_count = 0;
     708         112 :                 tag->cache_byte_count = 0;
     709             :             }
     710         112 :             rm_fmt_unlock_state(session->formats);
     711             :         }
     712             :     }
     713      666807 :     g_mutex_unlock(&tag->lock);
     714      666800 : }
     715             : 
     716      221889 : static void rm_shred_write_cksum_to_xattr(const RmSession *session, RmFile *file) {
     717      221889 :     if(session->cfg->write_cksum_to_xattr) {
     718         392 :         if(file->has_ext_cksum == false) {
     719         392 :             rm_xattr_write_hash((RmSession *)session, file);
     720             :         }
     721             :     }
     722      221889 : }
     723             : 
     724             : /* Unlink RmFile from device queue
     725             :  */
     726      222919 : static void rm_shred_discard_file(RmFile *file, bool free_file) {
     727      222919 :     const RmSession *session = file->session;
     728      222919 :     RmShredTag *tag = session->shredder;
     729             :     /* update device counters (unless this file was a bundled hardlink) */
     730      222919 :     if(!file->hardlinks.hardlink_head) {
     731      222489 :         rm_mds_ref_dev(session->mds, file->disk, -1);
     732      222489 :         rm_shred_adjust_counters(tag, -1, -(gint64)(file->file_size - file->hash_offset));
     733             : 
     734             :         /* ShredGroup that was going nowhere */
     735             :         g_assert(session->cfg->write_unfinished || TRUE);
     736      227885 :         if(file->shred_group && file->shred_group->num_files <= 1 &&
     737        5396 :            session->cfg->write_unfinished) {
     738         168 :             file->lint_type = RM_LINT_TYPE_UNFINISHED_CKSUM;
     739         168 :             file->digest = (file->digest) ? file->digest : file->shred_group->digest;
     740             : 
     741         168 :             if(file->digest) {
     742          84 :                 rm_fmt_write(file, session->formats, -1);
     743          84 :                 rm_shred_write_cksum_to_xattr(session, file);
     744          84 :                 file->digest = NULL;
     745             :             }
     746             :         }
     747             :     }
     748             : 
     749      222919 :     if(free_file) {
     750             :         /* toss the file (and any embedded hardlinks)*/
     751      105473 :         rm_file_destroy(file);
     752             :     }
     753      222919 : }
     754             : 
     755             : /* Push file to scheduler queue.
     756             :  * */
     757      221829 : static void rm_shred_push_queue(RmFile *file) {
     758      221829 :     const RmSession *session = file->session;
     759      221829 :     if(file->hash_offset == 0) {
     760             :         /* first-timer; lookup disk offset */
     761      434335 :         if(file->session->cfg->build_fiemap &&
     762      221101 :            !rm_mounts_is_nonrotational(file->session->mounts, file->dev)) {
     763        7867 :             RM_DEFINE_PATH(file);
     764        7867 :             file->disk_offset = rm_offset_get_from_path(file_path, 0, NULL);
     765             :         } else {
     766             :             /* use inode number instead of disk offset */
     767      213234 :             file->disk_offset = file->inode;
     768             :         }
     769             :     }
     770      443658 :     rm_mds_push_task_by_dev(
     771      443658 :         session->mds, file->disk, file->disk_offset, NULL, file);
     772      221829 : }
     773             : 
     774             : //////////////////////////////////
     775             : //    RMSHREDGROUP UTILITIES    //
     776             : //    AND SIFTING ALGORITHM     //
     777             : //////////////////////////////////
     778             : 
     779             : /* Free RmShredGroup and any dormant files still in its queue
     780             :  */
     781      109490 : static void rm_shred_group_free(RmShredGroup *self, bool force_free) {
     782      109490 :     g_assert(self->parent == NULL); /* children should outlive their parents! */
     783             : 
     784      109490 :     RmCfg *cfg = self->session->cfg;
     785             : 
     786      109490 :     bool needs_free = !(cfg->cache_file_structs) || force_free;
     787             : 
     788             :     /* May not free though when unfinished checksums are written.
     789             :      * Those are freed by the output module.
     790             :      */
     791      109490 :     if(cfg->write_unfinished) {
     792         420 :         needs_free = false;
     793             :     }
     794             : 
     795      109490 :     if(self->held_files) {
     796       56800 :         g_queue_foreach(self->held_files, (GFunc)rm_shred_discard_file,
     797       56800 :                         GUINT_TO_POINTER(needs_free));
     798       56800 :         g_queue_free(self->held_files);
     799       56800 :         self->held_files = NULL;
     800             :     }
     801             : 
     802      109490 :     if(self->digest && needs_free) {
     803       21708 :         rm_digest_free(self->digest);
     804       21708 :         self->digest = NULL;
     805             :     }
     806             : 
     807      109490 :     if(self->children) {
     808       52690 :         g_hash_table_unref(self->children);
     809             :     }
     810             : 
     811      109488 :     g_assert(!self->in_progress_digests);
     812             : 
     813      109488 :     g_mutex_clear(&self->lock);
     814             : 
     815      109490 :     g_slice_free(RmShredGroup, self);
     816      109490 : }
     817             : 
     818             : /* call unlocked; should be no contention issues since group is finished */
     819      108525 : static void rm_shred_group_finalise(RmShredGroup *self) {
     820             :     /* return any paranoid mem allocation */
     821      108525 :     rm_shred_mem_return(self);
     822             : 
     823      108526 :     switch(self->status) {
     824             :     case RM_SHRED_GROUP_DORMANT:
     825             :         /* Dead-ended files; don't force free since we may want to write the partial
     826             :          * checksums */
     827        4686 :         rm_shred_group_free(self, FALSE);
     828        4686 :         break;
     829             :     case RM_SHRED_GROUP_START_HASHING:
     830             :     case RM_SHRED_GROUP_HASHING:
     831             :         /* intermediate increment group no longer required; force free */
     832       52690 :         rm_shred_group_free(self, TRUE);
     833       52690 :         break;
     834             :     case RM_SHRED_GROUP_FINISHING:
     835             :         /* free any paranoid buffers held in group->digest (should not be needed for
     836             :          * results processing */
     837       51150 :         if(self->digest_type == RM_DIGEST_PARANOID) {
     838        3707 :             rm_digest_release_buffers(self->digest);
     839             :         }
     840             :         /* send it to finisher (which takes responsibility for calling
     841             :          * rm_shred_group_free())*/
     842       51150 :         rm_util_thread_pool_push(self->session->shredder->result_pool, self);
     843             : 
     844       51150 :         break;
     845             :     case RM_SHRED_GROUP_FINISHED:
     846             :     default:
     847           0 :         g_assert_not_reached();
     848             :     }
     849      108526 : }
     850             : 
     851             : /* Checks whether group qualifies as duplicate candidate (ie more than
     852             :  * two members and meets has_pref and NEEDS_PREF criteria).
     853             :  * Assume group already protected by group_lock.
     854             :  * */
     855      444314 : static void rm_shred_group_update_status(RmShredGroup *group) {
     856      444314 :     if(group->status == RM_SHRED_GROUP_DORMANT) {
     857      213403 :         if(1 && group->num_files >= 2 /* it takes 2 to tango */
     858      104338 :            &&
     859      208028 :            (group->has_pref || !NEEDS_PREF(group))
     860             :            /* we have at least one file from preferred path, or we don't care */
     861      104011 :            &&
     862      104255 :            (group->has_npref || !NEEDS_NPREF(group))
     863             :            /* we have at least one file from non-pref path, or we don't care */
     864      103874 :            &&
     865      103880 :            (group->has_new || !NEEDS_NEW(group))
     866             :            /* we have at least one file newer than cfg->min_mtime, or we don't care */
     867      103868 :            &&
     868      103896 :            (!group->unique_basename || !group->session->cfg->unmatched_basenames)
     869             :            /* we have more than one unique basename, or we don't care */
     870             :            ) {
     871      156530 :             if(group->hash_offset < group->file_size &&
     872       52690 :                group->has_only_ext_cksums == false) {
     873             :                 /* group can go active */
     874       52690 :                 group->status = RM_SHRED_GROUP_START_HASHING;
     875             :             } else {
     876       51150 :                 group->status = RM_SHRED_GROUP_FINISHING;
     877             :             }
     878             :         }
     879             :     }
     880      444314 : }
     881             : 
     882             : /* Only called by rm_shred_group_free (via GDestroyNotify of group->children).
     883             :  * Call with group->lock unlocked.
     884             :  */
     885       55948 : static void rm_shred_group_make_orphan(RmShredGroup *self) {
     886       55948 :     gboolean group_finished = FALSE;
     887       55948 :     g_mutex_lock(&self->lock);
     888             :     {
     889       55948 :         self->parent = NULL;
     890       55948 :         group_finished = (self->num_pending == 0);
     891             :     }
     892       55948 :     g_mutex_unlock(&self->lock);
     893             : 
     894       55948 :     if(group_finished) {
     895       55836 :         rm_shred_group_finalise(self);
     896             :     }
     897       55948 : }
     898             : 
     899             : /* Call with shred_group->lock unlocked.
     900             :  * */
     901      444314 : static RmFile *rm_shred_group_push_file(RmShredGroup *shred_group, RmFile *file,
     902             :                                         gboolean initial) {
     903      444314 :     RmFile *result = NULL;
     904      444314 :     file->shred_group = shred_group;
     905             : 
     906      444314 :     if(file->digest) {
     907      165881 :         rm_digest_free(file->digest);
     908      165880 :         file->digest = NULL;
     909             :     }
     910             : 
     911      444313 :     g_mutex_lock(&shred_group->lock);
     912             :     {
     913      444316 :         shred_group->has_pref |= file->is_prefd | file->hardlinks.has_prefd;
     914      444316 :         shred_group->has_npref |= (!file->is_prefd) | file->hardlinks.has_non_prefd;
     915      444316 :         shred_group->has_new |= file->is_new_or_has_new;
     916             : 
     917      444316 :         if (shred_group->num_files == 0 && shred_group->session->cfg->unmatched_basenames) {
     918          84 :             shred_group->unique_basename = file;
     919      444316 :         } else if (shred_group->unique_basename &&
     920          84 :                    !rm_file_basenames_match(file, shred_group->unique_basename)) {
     921          56 :             shred_group->unique_basename = NULL;
     922             :         }
     923             : 
     924      444316 :         shred_group->num_files++;
     925      444316 :         if(file->hardlinks.is_head) {
     926         692 :             g_assert(file->hardlinks.files);
     927         692 :             shred_group->num_files += file->hardlinks.files->length;
     928         692 :             if (shred_group->unique_basename && shred_group->session->cfg->unmatched_basenames) {
     929           0 :                 for(GList *iter = file->hardlinks.files->head; iter; iter = iter->next) {
     930           0 :                     if (!rm_file_basenames_match(iter->data, shred_group->unique_basename)) {
     931           0 :                         shred_group->unique_basename = NULL;
     932           0 :                         break;
     933             :                     }
     934             :                 }
     935             :             }
     936             :         }
     937             : 
     938      444316 :         g_assert(file->hash_offset == shred_group->hash_offset);
     939             : 
     940      444316 :         rm_shred_group_update_status(shred_group);
     941      444316 :         switch(shred_group->status) {
     942             :         case RM_SHRED_GROUP_START_HASHING:
     943             :             /* clear the queue and push all its rmfiles to the appropriate device queue */
     944      169309 :             if(shred_group->held_files) {
     945       52690 :                 shred_group->num_pending += g_queue_get_length(shred_group->held_files);
     946       52690 :                 g_queue_free_full(shred_group->held_files,
     947             :                                   (GDestroyNotify)rm_shred_push_queue);
     948       52690 :                 shred_group->held_files = NULL; /* won't need shred_group queue any more,
     949             :                                                    since new arrivals will bypass */
     950             :             }
     951      169309 :             if(shred_group->digest_type == RM_DIGEST_PARANOID && !initial) {
     952           8 :                 rm_shred_check_paranoid_mem_alloc(shred_group, 1);
     953             :             }
     954             :         /* FALLTHROUGH */
     955             :         case RM_SHRED_GROUP_HASHING:
     956      169317 :             shred_group->num_pending++;
     957      169317 :             if(initial || !file->devlist_waiting) {
     958             :                 /* add file to device queue */
     959      169317 :                 rm_shred_push_queue(file);
     960             :             } else {
     961             :                 /* calling routine will handle the file */
     962           0 :                 result = file;
     963             :             }
     964      169317 :             break;
     965             :         case RM_SHRED_GROUP_DORMANT:
     966             :         case RM_SHRED_GROUP_FINISHING:
     967             :             /* add file to held_files */
     968      274998 :             g_queue_push_head(shred_group->held_files, file);
     969      274998 :             break;
     970             :         case RM_SHRED_GROUP_FINISHED:
     971             :         default:
     972           0 :             g_assert_not_reached();
     973             :         }
     974             :     }
     975      444315 :     g_mutex_unlock(&shred_group->lock);
     976             : 
     977      444318 :     return result;
     978             : }
     979             : 
     980             : /* After partial hashing of RmFile, add it back into the sieve for further
     981             :  * hashing if required.  If waiting option is set, then try to return the
     982             :  * RmFile to the calling routine so it can continue with the next hashing
     983             :  * increment (this bypasses the normal device queue and so avoids an unnecessary
     984             :  * file seek operation ) returns true if the file can be immediately be hashed
     985             :  * some more.
     986             :  * */
     987      221801 : static RmFile *rm_shred_sift(RmFile *file) {
     988      221801 :     RmFile *result = NULL;
     989      221801 :     gboolean current_group_finished = FALSE;
     990             : 
     991      221801 :     g_assert(file);
     992      221801 :     RmShredGroup *current_group = file->shred_group;
     993      221801 :     g_assert(current_group);
     994             : 
     995      221801 :     g_mutex_lock(&current_group->lock);
     996             :     {
     997      221826 :         current_group->num_pending--;
     998      221826 :         if(current_group->in_progress_digests) {
     999             :             /* remove this file from current_group's pending digests list */
    1000       16008 :             current_group->in_progress_digests =
    1001       16007 :                 g_list_remove(current_group->in_progress_digests, file->digest);
    1002             :         }
    1003             : 
    1004      221827 :         if(file->status == RM_FILE_STATE_IGNORE) {
    1005             :             /* reading/hashing failed somewhere */
    1006           0 :             if(file->digest) {
    1007           0 :                 rm_digest_free(file->digest);
    1008             :             }
    1009           0 :             rm_shred_discard_file(file, true);
    1010             : 
    1011             :         } else {
    1012      221827 :             g_assert(file->digest);
    1013             : 
    1014             :             /* check is child group hashtable has been created yet */
    1015      221827 :             if(current_group->children == NULL) {
    1016       52689 :                 current_group->children =
    1017       52690 :                     g_hash_table_new_full((GHashFunc)rm_digest_hash,
    1018             :                                           (GEqualFunc)rm_digest_equal,
    1019             :                                           NULL,
    1020             :                                           (GDestroyNotify)rm_shred_group_make_orphan);
    1021             :             }
    1022             : 
    1023             :             /* check if there is already a descendent of current_group which
    1024             :              * matches snap... if yes then move this file into it; if not then
    1025             :              * create a new group ... */
    1026      221826 :             RmShredGroup *child_group =
    1027      221826 :                 g_hash_table_lookup(current_group->children, file->digest);
    1028      221826 :             if(!child_group) {
    1029       55948 :                 child_group = rm_shred_group_new(file);
    1030       55948 :                 g_hash_table_insert(current_group->children, child_group->digest,
    1031             :                                     child_group);
    1032       55948 :                 child_group->has_only_ext_cksums = current_group->has_only_ext_cksums;
    1033             : 
    1034             :                 /* signal any pending (paranoid) digests that there is a new match
    1035             :                  * candidate digest */
    1036       55948 :                 g_list_foreach(current_group->in_progress_digests,
    1037             :                                (GFunc)rm_digest_send_match_candidate,
    1038       55948 :                                child_group->digest);
    1039             :             }
    1040      221826 :             result =
    1041             :                 rm_shred_group_push_file(child_group, file, FALSE);
    1042             :         }
    1043             : 
    1044             :         /* is current shred group needed any longer? */
    1045      221828 :         current_group_finished =
    1046      221828 :             !current_group->parent && current_group->num_pending == 0;
    1047             :     }
    1048      221828 :     g_mutex_unlock(&current_group->lock);
    1049             : 
    1050      221823 :     if(current_group_finished) {
    1051       52689 :         rm_shred_group_finalise(current_group);
    1052             :     }
    1053             : 
    1054      221820 :     return result;
    1055             : }
    1056             : 
    1057             : /* Hasher callback file. Runs as threadpool in parallel / tandem with
    1058             :  * rm_shred_read_factory above
    1059             :  * */
    1060      221820 : static void rm_shred_hash_callback(_U RmHasher *hasher, RmDigest *digest, RmShredTag *tag,
    1061             :                                    RmFile *file) {
    1062             :     /* Report the progress to rm_shred_devlist_factory */
    1063      221820 :     g_assert(file->digest == digest);
    1064             : 
    1065      221820 :     if(file->hash_offset == file->shred_group->next_offset ||
    1066           0 :        file->status == RM_FILE_STATE_IGNORE) {
    1067      221820 :         if(file->status != RM_FILE_STATE_IGNORE) {
    1068             :             /* remember that checksum */
    1069      221819 :             rm_shred_write_cksum_to_xattr(tag->session, file);
    1070             :         }
    1071             : 
    1072      443626 :         if(file->signal) {
    1073             :             /* MDS scheduler is waiting for result */
    1074           6 :             rm_signal_done(file->signal);
    1075             :         } else {
    1076             :             /* handle the file ourselves; MDS scheduler has moved on to the next file */
    1077      221799 :             rm_shred_sift(file);
    1078             :         }
    1079             :     } else {
    1080           0 :         RM_DEFINE_PATH(file);
    1081           0 :         rm_log_error_line("Unexpected hash offset for %s, got %" LLU ", expected %" LLU,
    1082             :                      file_path, file->hash_offset, file->shred_group->next_offset);
    1083           0 :         g_assert_not_reached();
    1084             :     }
    1085      221821 : }
    1086             : 
    1087             : ////////////////////////////////////
    1088             : //  SHRED-SPECIFIC PREPROCESSING  //
    1089             : ////////////////////////////////////
    1090             : 
    1091             : /* Basically this unloads files from the initial list build (which has
    1092             :  * hardlinks already grouped).
    1093             :  * Outline:
    1094             :  * 1. Use g_hash_table_foreach_remove to send RmFiles from node_table
    1095             :  *    to size_groups via rm_shred_file_preprocess.
    1096             :  * 2. Use g_hash_table_foreach_remove to delete all singleton and other
    1097             :  *    non-qualifying groups from size_groups via rm_shred_group_preprocess.
    1098             :  * 3. Use g_hash_table_foreach to do the FIEMAP lookup for all remaining
    1099             :  *        files via rm_shred_device_preprocess.
    1100             :  * */
    1101      222489 : static void rm_shred_file_preprocess(_U gpointer key, RmFile *file, RmShredTag *main) {
    1102             :     /* initial population of RmShredDevice's and first level RmShredGroup's */
    1103      222489 :     RmSession *session = main->session;
    1104             : 
    1105      222489 :     g_assert(file);
    1106      222489 :     g_assert(file->lint_type == RM_LINT_TYPE_DUPE_CANDIDATE);
    1107      222489 :     g_assert(file->file_size > 0);
    1108             : 
    1109      222489 :     file->is_new_or_has_new = (file->mtime >= session->cfg->min_mtime);
    1110             : 
    1111             :     /* if file has hardlinks then set file->hardlinks.has_[non_]prefd*/
    1112      222489 :     if(file->hardlinks.is_head) {
    1113         776 :         for(GList *iter = file->hardlinks.files->head; iter; iter = iter->next) {
    1114         430 :             RmFile *link = iter->data;
    1115         430 :             file->hardlinks.has_non_prefd |= !(link->is_prefd);
    1116         430 :             file->hardlinks.has_prefd |= link->is_prefd;
    1117         430 :             file->is_new_or_has_new |= (link->mtime >= session->cfg->min_mtime);
    1118             :         }
    1119             :     }
    1120             : 
    1121             :     /* cfg->fake_pathindex_as_disk is for debugging/testing... */
    1122      222489 :     file->disk = (session->cfg->fake_pathindex_as_disk) ? file->path_index : file->dev;
    1123             : 
    1124             :     /* add reference for this file to the MDS scheduler */
    1125      222489 :     rm_mds_ref_dev(session->mds, file->disk, 1);
    1126      222489 :     rm_shred_adjust_counters(main, 1, (gint64)file->file_size - file->hash_offset);
    1127             : 
    1128      222489 :     RmShredGroup *group = g_hash_table_lookup(session->tables->size_groups, file);
    1129             : 
    1130      222489 :     if(group == NULL) {
    1131       53542 :         group = rm_shred_group_new(file);
    1132       53542 :         group->digest_type = session->cfg->checksum_type;
    1133       53542 :         g_hash_table_insert(session->tables->size_groups, file, group);
    1134             :     }
    1135             : 
    1136      222489 :     rm_shred_group_push_file(group, file, true);
    1137             : 
    1138      222489 :     if(main->session->cfg->read_cksum_from_xattr) {
    1139         392 :         char *ext_cksum = rm_xattr_read_hash(main->session, file);
    1140         392 :         if(ext_cksum != NULL) {
    1141           0 :             file->folder->data = ext_cksum;
    1142             :         }
    1143             :     }
    1144             : 
    1145      222489 :     if(HAS_CACHE(session)) {
    1146        1064 :         RM_DEFINE_PATH(file);
    1147        1064 :         if(rm_trie_search(&session->cfg->file_trie, file_path)) {
    1148         135 :             group->num_ext_cksums += 1;
    1149         135 :             file->has_ext_cksum = 1;
    1150             :         }
    1151             :     }
    1152      222489 : }
    1153             : 
    1154       53542 : static gboolean rm_shred_group_preprocess(_U gpointer key, RmShredGroup *group,
    1155             :                                           _U RmShredTag *tag) {
    1156       53542 :     g_assert(group);
    1157       53542 :     if(group->status == RM_SHRED_GROUP_DORMANT) {
    1158         964 :         rm_shred_group_free(group, true);
    1159         964 :         return true;
    1160             :     } else {
    1161       52578 :         return false;
    1162             :     }
    1163             : }
    1164             : 
    1165       36458 : static void rm_shred_preprocess_input(RmShredTag *main) {
    1166       36458 :     RmSession *session = main->session;
    1167       36458 :     guint removed = 0;
    1168             : 
    1169             :     /* move remaining files to RmShredGroups */
    1170       36458 :     g_assert(session->tables->node_table);
    1171             : 
    1172             :     /* Read any cache files */
    1173       36542 :     for(GList *iter = main->session->cache_list.head; iter; iter = iter->next) {
    1174          84 :         char *cache_path = iter->data;
    1175          84 :         rm_json_cache_read(&session->cfg->file_trie, cache_path);
    1176             :     }
    1177             : 
    1178       36458 :     rm_log_debug_line("Moving files into size groups...");
    1179             : 
    1180             :     /* move files from node tables into initial RmShredGroups */
    1181       36458 :     g_hash_table_foreach_remove(session->tables->node_table,
    1182             :                                 (GHRFunc)rm_shred_file_preprocess, main);
    1183       36458 :     g_hash_table_unref(session->tables->node_table);
    1184       36458 :     session->tables->node_table = NULL;
    1185             : 
    1186             :     GHashTableIter iter;
    1187             :     gpointer size, p_group;
    1188             : 
    1189       36458 :     if(HAS_CACHE(main->session)) {
    1190         140 :         g_assert(session->tables->size_groups);
    1191         140 :         g_hash_table_iter_init(&iter, session->tables->size_groups);
    1192         644 :         while(g_hash_table_iter_next(&iter, &size, &p_group)) {
    1193         364 :             RmShredGroup *group = p_group;
    1194         364 :             if(group->num_files == group->num_ext_cksums) {
    1195           0 :                 group->has_only_ext_cksums = true;
    1196             :             }
    1197             :         }
    1198             :     }
    1199             : 
    1200       36458 :     rm_log_debug_line("move remaining files to size_groups finished at time %.3f",
    1201             :                  g_timer_elapsed(session->timer, NULL));
    1202             : 
    1203       36458 :     rm_log_debug_line("Discarding unique sizes and read fiemap data for others...");
    1204       36458 :     g_assert(session->tables->size_groups);
    1205       36458 :     removed = g_hash_table_foreach_remove(session->tables->size_groups,
    1206             :                                           (GHRFunc)rm_shred_group_preprocess, main);
    1207       36458 :     g_hash_table_unref(session->tables->size_groups);
    1208       36458 :     session->tables->size_groups = NULL;
    1209             : 
    1210       36458 :     rm_log_debug_line("...done at time %.3f; removed %u of %" LLU,
    1211             :                  g_timer_elapsed(session->timer, NULL), removed,
    1212             :                  session->total_filtered_files);
    1213       36458 : }
    1214             : 
    1215             : /////////////////////////////////
    1216             : //       POST PROCESSING       //
    1217             : /////////////////////////////////
    1218             : 
    1219             : /* post-processing sorting of files by criteria (-S and -[kmKM])
    1220             :  * this is slightly different to rm_shred_cmp_orig_criteria in the case of
    1221             :  * either -K or -M options
    1222             :  */
    1223      366047 : int rm_shred_cmp_orig_criteria(RmFile *a, RmFile *b, RmSession *session) {
    1224      366047 :     RmCfg *cfg = session->cfg;
    1225             : 
    1226             :     /* Make sure to *never* make a symlink to be the original */
    1227      366047 :     if(a->is_symlink != b->is_symlink) {
    1228         140 :         return a->is_symlink - b->is_symlink;
    1229      366459 :     } else if((a->is_prefd != b->is_prefd) &&
    1230         928 :               (cfg->keep_all_untagged || cfg->must_match_untagged)) {
    1231         235 :         return (a->is_prefd - b->is_prefd);
    1232             :     } else {
    1233      365672 :         int comparasion = rm_pp_cmp_orig_criteria(a, b, session);
    1234      365672 :         if(comparasion == 0) {
    1235       69658 :             return b->is_original - a->is_original;
    1236             :         }
    1237             : 
    1238      296014 :         return comparasion;
    1239             :     }
    1240             : }
    1241             : 
    1242             : /* iterate over group to find highest ranked; return it and tag it as original    */
    1243             : /* also in special cases (eg keep_all_tagged) there may be more than one original,
    1244             :  * in which case tag them as well
    1245             :  */
    1246       52018 : void rm_shred_group_find_original(RmSession *session, GQueue *group) {
    1247             :     /* iterate over group, unbundling hardlinks and identifying "tagged" originals */
    1248      271103 :     for(GList *iter = group->head; iter; iter = iter->next) {
    1249      219085 :         RmFile *file = iter->data;
    1250      219085 :         file->is_original = false;
    1251             : 
    1252      219085 :         if(file->hardlinks.is_head && file->hardlinks.files) {
    1253             :             /* if group member has a hardlink cluster attached to it then
    1254             :              * unbundle the cluster and append it to the queue
    1255             :              */
    1256         346 :             GQueue *hardlinks = file->hardlinks.files;
    1257         776 :             for(GList *link = hardlinks->head; link; link = link->next) {
    1258         430 :                 g_queue_push_tail(group, link->data);
    1259             :             }
    1260         346 :             g_queue_free(hardlinks);
    1261         346 :             file->hardlinks.files = NULL;
    1262             :         }
    1263             :         /* identify "tagged" originals: */
    1264      438002 :         if(((file->is_prefd) && (session->cfg->keep_all_tagged)) ||
    1265      437330 :            ((!file->is_prefd) && (session->cfg->keep_all_untagged))) {
    1266         336 :             file->is_original = true;
    1267             : 
    1268             : #if _RM_SHRED_DEBUG
    1269             :             RM_DEFINE_PATH(file);
    1270             :             rm_log_debug_line("tagging %s as original because %s",
    1271             :                          file_path,
    1272             :                          ((file->is_prefd) && (session->cfg->keep_all_tagged))
    1273             :                              ? "tagged"
    1274             :                              : "untagged");
    1275             : #endif
    1276             :         }
    1277             :     }
    1278             : 
    1279             :     /* sort the unbundled group */
    1280       52018 :     g_queue_sort(group, (GCompareDataFunc)rm_shred_cmp_orig_criteria, session);
    1281             : 
    1282       52018 :     RmFile *headfile = group->head->data;
    1283       52018 :     if(!headfile->is_original) {
    1284       51850 :         headfile->is_original = true;
    1285             : #if _RM_SHRED_DEBUG
    1286             :         RM_DEFINE_PATH(headfile);
    1287             :         rm_log_debug_line("tagging %s as original because it is highest ranked",
    1288             :                      headfile_path);
    1289             : #endif
    1290             :     }
    1291       52018 :     if (session->cfg->unmatched_basenames) {
    1292             :         /* remove files which match headfile's basename */
    1293          28 :         GList *iter = group->head->next;
    1294          84 :         while(iter) {
    1295          28 :             RmFile *iter_file = iter->data;
    1296          28 :             GList *temp = iter;
    1297          28 :             iter = iter->next;
    1298          28 :             if (rm_file_basenames_match(iter_file, headfile)) {
    1299           0 :                 rm_shred_discard_file(iter_file, TRUE);
    1300           0 :                 g_queue_delete_link(group, temp);
    1301             :             }
    1302             :         }
    1303             :     }
    1304       52018 : }
    1305             : 
    1306       51374 : void rm_shred_forward_to_output(RmSession *session, GQueue *group) {
    1307       51374 :     g_assert(group);
    1308       51374 :     g_assert(group->head);
    1309             : 
    1310             : #if _RM_SHRED_DEBUG
    1311             :     RmFile *head = group->head->data;
    1312             :     RM_DEFINE_PATH(head);
    1313             :     rm_log_debug_line("Forwarding %s's group", head_path);
    1314             : #endif
    1315             : 
    1316             :     /* Hand it over to the printing module */
    1317      267855 :     for(GList *iter = group->head; iter; iter = iter->next) {
    1318      216481 :         RmFile *file = iter->data;
    1319      216481 :         rm_fmt_write(file, session->formats, group->length);
    1320             :     }
    1321       51374 : }
    1322             : 
    1323      216901 : static void rm_shred_dupe_totals(RmFile *file, RmSession *session) {
    1324      216901 :     if(!file->is_original) {
    1325      165583 :         session->dup_counter++;
    1326             : 
    1327             :         /* Only check file size if it's not a hardlink.
    1328             :          * Since deleting hardlinks does not free any space
    1329             :          * they should not be counted unless all of them would
    1330             :          * be removed.
    1331             :          */
    1332      165583 :         if(file->hardlinks.is_head || file->hardlinks.hardlink_head == NULL) {
    1333      165153 :             session->total_lint_size += file->file_size;
    1334             :         }
    1335             :     }
    1336      216901 : }
    1337             : 
    1338       51150 : static void rm_shred_result_factory(RmShredGroup *group, RmShredTag *tag) {
    1339       51150 :     RmCfg *cfg = tag->session->cfg;
    1340             : 
    1341       51150 :     if(g_queue_get_length(group->held_files) > 0) {
    1342             :         /* find the original(s)
    1343             :          * (note this also unbundles hardlinks and sorts the group from
    1344             :          *  highest ranked to lowest ranked
    1345             :          */
    1346       51150 :         rm_shred_group_find_original(tag->session, group->held_files);
    1347             : 
    1348             :         /* Update statistics */
    1349       51150 :         rm_fmt_lock_state(tag->session->formats);
    1350             :         {
    1351       51150 :             tag->session->dup_group_counter++;
    1352       51150 :             g_queue_foreach(group->held_files, (GFunc)rm_shred_dupe_totals, tag->session);
    1353             :         }
    1354       51150 :         rm_fmt_unlock_state(tag->session->formats);
    1355             : 
    1356             :         /* Cache the files for merging them into directories */
    1357      268051 :         for(GList *iter = group->held_files->head; iter; iter = iter->next) {
    1358      216901 :             RmFile *file = iter->data;
    1359      216901 :             file->digest = group->digest;
    1360             : 
    1361      216901 :             if(cfg->merge_directories) {
    1362        4060 :                 rm_tm_feed(tag->session->dir_merger, file);
    1363             :             }
    1364             :         }
    1365             : 
    1366       51150 :         if(cfg->merge_directories == false) {
    1367             :             /* Output them directly, do not merge them first. */
    1368       49778 :             rm_shred_forward_to_output(tag->session, group->held_files);
    1369             :         }
    1370             :     }
    1371             : 
    1372       51150 :     group->status = RM_SHRED_GROUP_FINISHED;
    1373             : #if _RM_SHRED_DEBUG
    1374             :     rm_log_debug_line("Free from rm_shred_result_factory");
    1375             : #endif
    1376             : 
    1377             :     /* Do not force free files here, output module might need do that itself. */
    1378       51150 :     rm_shred_group_free(group, false);
    1379       51150 : }
    1380             : 
    1381             : /////////////////////////////////
    1382             : //    ACTUAL IMPLEMENTATION    //
    1383             : /////////////////////////////////
    1384             : 
    1385      269817 : static bool rm_shred_reassign_checksum(RmShredTag *main, RmFile *file) {
    1386      269817 :     bool can_process = true;
    1387      269817 :     RmCfg *cfg = main->session->cfg;
    1388      269817 :     RmShredGroup *group = file->shred_group;
    1389             : 
    1390      269817 :     if(group->has_only_ext_cksums) {
    1391             :         /* Cool, we were able to read the checksum from disk */
    1392           0 :         file->digest = rm_digest_new(RM_DIGEST_EXT, 0, 0, 0, NEEDS_SHADOW_HASH(cfg));
    1393             : 
    1394           0 :         RM_DEFINE_PATH(file);
    1395             : 
    1396           0 :         char *hexstring = file->folder->data;
    1397             : 
    1398           0 :         if(hexstring != NULL) {
    1399           0 :             rm_digest_update(file->digest, (unsigned char *)hexstring, strlen(hexstring));
    1400           0 :             rm_log_debug_line("%s=%s was read from cache.", hexstring, file_path);
    1401             :         } else {
    1402           0 :             rm_log_warning_line(
    1403             :                 "Unable to read external checksum from interal cache for %s", file_path);
    1404           0 :             file->has_ext_cksum = 0;
    1405           0 :             group->has_only_ext_cksums = 0;
    1406             :         }
    1407      269817 :     } else if(group->digest_type == RM_DIGEST_PARANOID) {
    1408             :         /* check if memory allocation is ok */
    1409       64025 :         if(!rm_shred_check_paranoid_mem_alloc(group, 0)) {
    1410       47989 :             can_process = false;
    1411             :         } else {
    1412             :             /* get the required target offset into group->next_offset, so
    1413             :                 * that we can make the paranoid RmDigest the right size*/
    1414       16036 :             if(group->next_offset == 0) {
    1415        3826 :                 (void)rm_shred_get_read_size(file, main);
    1416             :             }
    1417       16036 :             g_assert(group->hash_offset == file->hash_offset);
    1418             : 
    1419       16036 :             if(file->is_symlink && cfg->see_symlinks) {
    1420          28 :                 file->digest =
    1421          28 :                     rm_digest_new(RM_DIGEST_PARANOID, 0, 0,
    1422             :                                   PATH_MAX + 1 /* max size of a symlink file */,
    1423             :                                   NEEDS_SHADOW_HASH(cfg));
    1424             :             } else {
    1425       16008 :                 file->digest = rm_digest_new(RM_DIGEST_PARANOID, 0, 0,
    1426       16008 :                                              group->next_offset - file->hash_offset,
    1427             :                                              NEEDS_SHADOW_HASH(cfg));
    1428       16008 :                 if(group->next_offset > file->hash_offset + SHRED_PREMATCH_THRESHOLD) {
    1429             :                     /* send candidate twin(s) */
    1430       16008 :                     if(group->children) {
    1431        7166 :                         GList *children = g_hash_table_get_values(group->children);
    1432       21647 :                         while(children) {
    1433        7315 :                             RmShredGroup *child = children->data;
    1434        7315 :                             rm_digest_send_match_candidate(file->digest, child->digest);
    1435        7315 :                             children = g_list_delete_link(children, children);
    1436             :                         }
    1437             :                     }
    1438             :                     /* store a reference so the shred group knows where to send any future
    1439             :                      * twin candidate digests */
    1440       16008 :                     group->in_progress_digests =
    1441       16008 :                         g_list_prepend(group->in_progress_digests, file->digest);
    1442             :                 }
    1443             :             }
    1444             :         }
    1445      205792 :     } else if(group->digest) {
    1446             :         /* pick up the digest-so-far from the RmShredGroup */
    1447         312 :         file->digest = rm_digest_copy(group->digest);
    1448             :     } else {
    1449             :         /* this is first generation of RMGroups, so there is no progressive hash yet */
    1450      410960 :         file->digest = rm_digest_new(cfg->checksum_type,
    1451      205480 :                                      main->session->hash_seed1,
    1452      205480 :                                      main->session->hash_seed2,
    1453             :                                      0,
    1454             :                                      NEEDS_SHADOW_HASH(cfg));
    1455             :     }
    1456             : 
    1457      269818 :     return can_process;
    1458             : }
    1459             : 
    1460             : #define RM_SHRED_TOO_MANY_BYTES_TO_WAIT (64 * 1024 * 1024)
    1461             : 
    1462             : /* call with device unlocked */
    1463      491629 : static bool rm_shred_can_process(RmFile *file, RmShredTag *main) {
    1464             :     /* initialise hash (or recover progressive hash so far) */
    1465      491629 :     if(!file->shred_group) {
    1466           0 :         return FALSE;
    1467             :     }
    1468             : 
    1469      491629 :     bool result = TRUE;
    1470      491629 :     g_mutex_lock(&file->shred_group->lock);
    1471             :     {
    1472      491647 :         if(file->digest == NULL) {
    1473      269818 :             g_assert(file->shred_group);
    1474      269818 :             result = rm_shred_reassign_checksum(main, file);
    1475             :         }
    1476             :     }
    1477      491647 :     g_mutex_unlock(&file->shred_group->lock);
    1478      491635 :     return result;
    1479             : }
    1480             : 
    1481             : /* Callback for RmMDS */
    1482      269797 : static gint rm_shred_process_file(RmFile *file, RmSession *session) {
    1483      269797 :     RmShredTag *tag = session->shredder;
    1484             : 
    1485      269797 :     if(session->aborted || file->shred_group->has_only_ext_cksums) {
    1486           4 :         if (session->aborted) {
    1487           0 :             file->status = RM_FILE_STATE_IGNORE;
    1488             :         }
    1489           4 :         rm_shred_sift(file);
    1490           0 :         return 1;
    1491             :     }
    1492             : 
    1493      269793 :     if(!rm_shred_can_process(file, tag)) {
    1494       47989 :         return 0;
    1495             :     }
    1496             : 
    1497      221829 :     RM_DEFINE_PATH(file);
    1498             : 
    1499      665485 :     while(file && rm_shred_can_process(file, tag)) {
    1500             :         /* hash the next increment of the file */
    1501      221829 :         bool worth_waiting = FALSE;
    1502      221829 :         RmCfg *cfg = session->cfg;
    1503      221829 :         RmOff bytes_to_read = rm_shred_get_read_size(file, tag);
    1504             : 
    1505      221829 :         g_mutex_lock(&file->shred_group->lock);
    1506             :         {
    1507      221829 :             worth_waiting =
    1508      222189 :                 (file->shred_group->next_offset != file->file_size) &&
    1509         660 :                 (cfg->shred_always_wait ||
    1510             :                  (
    1511         336 :                      !rm_mounts_is_nonrotational(session->mounts, file->dev) &&
    1512          12 :                      rm_shred_get_read_size(file, tag) <
    1513          12 :                          RM_SHRED_TOO_MANY_BYTES_TO_WAIT &&
    1514          24 :                      (file->status == RM_FILE_STATE_NORMAL) && !cfg->shred_never_wait));
    1515             :         }
    1516      221829 :         g_mutex_unlock(&file->shred_group->lock);
    1517             : 
    1518      221828 :         RmHasherTask *task = rm_hasher_task_new(tag->hasher, file->digest, file);
    1519      221828 :         if(!rm_hasher_task_hash(task, file_path, file->hash_offset, bytes_to_read,
    1520      221828 :                                 file->is_symlink)) {
    1521             :             /* rm_hasher_start_increment failed somewhere */
    1522           0 :             file->status = RM_FILE_STATE_IGNORE;
    1523           0 :             worth_waiting = FALSE;
    1524             :         }
    1525             : 
    1526             :         /* Update totals for file, device and session*/
    1527      221829 :         file->hash_offset += bytes_to_read;
    1528      221829 :         if(file->is_symlink) {
    1529         140 :             rm_shred_adjust_counters(tag, 0, -(gint64)file->file_size);
    1530             :         } else {
    1531      221689 :             rm_shred_adjust_counters(tag, 0, -(gint64)bytes_to_read);
    1532             :         }
    1533             : 
    1534      221829 :         if(worth_waiting) {
    1535             :             /* some final checks if it's still worth waiting for the hash result */
    1536          24 :             g_mutex_lock(&file->shred_group->lock);
    1537             :             {
    1538          24 :                 worth_waiting = worth_waiting && (file->shred_group->children);
    1539          24 :                 if(file->digest->type == RM_DIGEST_PARANOID) {
    1540           0 :                     worth_waiting =
    1541           0 :                         worth_waiting && file->digest->paranoid->twin_candidate;
    1542             :                 }
    1543             :             }
    1544          24 :             g_mutex_unlock(&file->shred_group->lock);
    1545             :         }
    1546             : 
    1547      221829 :         file->signal = worth_waiting ? rm_signal_new() : NULL;
    1548             : 
    1549             :         /* tell the hasher we have finished */
    1550      221829 :         rm_hasher_task_finish(task);
    1551             : 
    1552      221827 :         if(worth_waiting) {
    1553             :             /* wait until the increment has finished hashing; assert that we get the
    1554             :              * expected file back */
    1555           6 :             rm_signal_wait(file->signal);
    1556           6 :             file->signal = NULL;
    1557             :             /* sift file; if returned then continue processing it */
    1558           6 :             file = rm_shred_sift(file);
    1559             :         } else {
    1560      221821 :             file = NULL;
    1561             :         }
    1562             :     }
    1563      221827 :     return 1;
    1564             : }
    1565             : 
    1566       36458 : void rm_shred_run(RmSession *session) {
    1567       36458 :     g_assert(session);
    1568       36458 :     g_assert(session->tables);
    1569       36458 :     g_assert(session->mounts);
    1570             : 
    1571             :     RmShredTag tag;
    1572       36458 :     tag.active_groups = 0;
    1573       36458 :     tag.session = session;
    1574       36458 :     tag.mem_refusing = false;
    1575       36458 :     session->shredder = &tag;
    1576             : 
    1577       36458 :     tag.device_return = g_async_queue_new();
    1578       36458 :     tag.page_size = SHRED_PAGE_SIZE;
    1579             : 
    1580       36458 :     tag.cache_file_count = 0;
    1581       36458 :     tag.cache_byte_count = 0;
    1582       36458 :     tag.cache_filtered_count = 0;
    1583       36458 :     tag.after_preprocess = FALSE;
    1584             : 
    1585             :     /* would use g_atomic, but helgrind does not like that */
    1586       36458 :     g_mutex_init(&tag.hash_mem_mtx);
    1587             : 
    1588       36458 :     g_mutex_init(&tag.lock);
    1589       36458 :     gint threads = g_hash_table_size(session->mounts->disk_table);
    1590       36458 :     session->mds =
    1591       36458 :         rm_mds_new(threads, session->mounts, session->cfg->fake_pathindex_as_disk);
    1592       36458 :     rm_mds_configure(session->mds,
    1593             :                      (RmMDSFunc)rm_shred_process_file,
    1594             :                      session,
    1595       36458 :                      session->cfg->sweep_count,
    1596             :                      (RmMDSSortFunc)rm_mds_elevator_cmp);
    1597             : 
    1598       36458 :     rm_shred_preprocess_input(&tag);
    1599       36458 :     rm_log_debug_line("Done shred preprocessing");
    1600       36458 :     tag.after_preprocess = TRUE;
    1601       36458 :     session->shred_bytes_after_preprocess = session->shred_bytes_remaining;
    1602             : 
    1603             :     /* estimate mem used for RmFiles and allocate any leftovers to read buffer and/or
    1604             :      * paranoid mem */
    1605       36458 :     RmOff mem_used = RM_AVERAGE_MEM_PER_FILE * session->shred_files_remaining;
    1606             : 
    1607       36458 :     if(session->cfg->checksum_type == RM_DIGEST_PARANOID) {
    1608             :         /* allocate any spare mem for paranoid hashing */
    1609        2665 :         tag.paranoid_mem_alloc = MIN((gint64)session->cfg->paranoid_mem,
    1610             :                                      (gint64)session->cfg->total_mem - (gint64)mem_used -
    1611             :                                          (gint64)session->cfg->read_buffer_mem);
    1612        2665 :         tag.paranoid_mem_alloc = MAX(0, tag.paranoid_mem_alloc);
    1613        2665 :         rm_log_debug_line("Paranoid Mem: %" LLU, tag.paranoid_mem_alloc);
    1614             :     } else {
    1615       67586 :         session->cfg->read_buffer_mem =
    1616       33793 :             MAX((gint64)session->cfg->read_buffer_mem,
    1617             :                 (gint64)session->cfg->total_mem - (gint64)mem_used);
    1618       33793 :         tag.paranoid_mem_alloc = 0;
    1619             :     }
    1620       36458 :     rm_log_debug_line("Read buffer Mem: %" LLU, session->cfg->read_buffer_mem);
    1621             : 
    1622             :     /* Initialise hasher */
    1623             :     /* Optimum buffer size based on /usr without dropping caches:
    1624             :      * SHRED_PAGE_SIZE * 1 => 5.29 seconds
    1625             :      * SHRED_PAGE_SIZE * 2 => 5.11 seconds
    1626             :      * SHRED_PAGE_SIZE * 4 => 5.04 seconds
    1627             :      * SHRED_PAGE_SIZE * 8 => 5.08 seconds
    1628             :      * With dropped caches:
    1629             :      * SHRED_PAGE_SIZE * 1 => 45.2 seconds
    1630             :      * SHRED_PAGE_SIZE * 4 => 45.0 seconds*/
    1631      182290 :     tag.hasher = rm_hasher_new(session->cfg->checksum_type,
    1632       36458 :                                session->cfg->threads,
    1633       36458 :                                session->cfg->use_buffered_read,
    1634       36458 :                                SHRED_PAGE_SIZE * 4,
    1635       36458 :                                session->cfg->read_buffer_mem,
    1636       36458 :                                session->cfg->paranoid_mem,
    1637             :                                (RmHasherCallback)rm_shred_hash_callback,
    1638             :                                &tag);
    1639             : 
    1640             :     /* Create a pool for results processing */
    1641       36458 :     tag.result_pool = rm_util_thread_pool_new((GFunc)rm_shred_result_factory, &tag, 1);
    1642             : 
    1643       36458 :     rm_fmt_set_state(session->formats, RM_PROGRESS_STATE_SHREDDER);
    1644       36458 :     rm_mds_start(session->mds);
    1645             : 
    1646             : 
    1647             :     /* should complete shred session and then free: */
    1648       36458 :     rm_mds_free(session->mds, FALSE);
    1649       36458 :     rm_hasher_free(tag.hasher, TRUE);
    1650             : 
    1651       36458 :     session->shredder_finished = TRUE;
    1652       36458 :     session->shred_files_remaining += tag.cache_file_count;
    1653       36458 :     session->total_filtered_files += tag.cache_filtered_count;
    1654       36458 :     session->shred_bytes_remaining += tag.cache_byte_count;
    1655       36458 :     rm_fmt_set_state(session->formats, RM_PROGRESS_STATE_SHREDDER);
    1656             : 
    1657             :     /* This should not block, or at least only very short. */
    1658       36458 :     g_thread_pool_free(tag.result_pool, FALSE, TRUE);
    1659             : 
    1660       36458 :     g_async_queue_unref(tag.device_return);
    1661             : 
    1662       36458 :     g_mutex_clear(&tag.hash_mem_mtx);
    1663       36458 :     rm_log_debug_line("Remaining %"LLU" bytes in %"LLU" files, cached %i",
    1664             :                       session->shred_bytes_remaining, session->shred_files_remaining,
    1665             :                       tag.cache_filtered_count);
    1666       36458 : }

Generated by: LCOV version 1.11