LCOV - code coverage report
Current view: top level - lib - hasher.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 156 180 86.7 %
Date: 2015-09-30 14:09:30 Functions: 12 13 92.3 %

          Line data    Source code
       1             : /*
       2             :  *  This file is part of rmlint.
       3             :  *
       4             :  *  rmlint is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  rmlint is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with rmlint.  If not, see <http://www.gnu.org/licenses/>.
      16             :  *
      17             :  * Authors:
      18             :  *
      19             :  *  - Christopher <sahib> Pahl 2010-2015 (https://github.com/sahib)
      20             :  *  - Daniel <SeeSpotRun> T.   2014-2015 (https://github.com/SeeSpotRun)
      21             :  *
      22             :  * Hosted on http://github.com/sahib/rmlint
      23             :  *
      24             :  */
      25             : 
      26             : #include <stdio.h>
      27             : #include <string.h>
      28             : #include <glib.h>
      29             : 
      30             : #include <fcntl.h>
      31             : 
      32             : #include "hasher.h"
      33             : #include "utilities.h"
      34             : 
      35             : /* Flags for the fadvise() call that tells the kernel
      36             :  * what we want to do with the file.
      37             :  */
      38             : const int HASHER_FADVISE_FLAGS = 0
      39             : #ifdef POSIX_FADV_SEQUENTIAL
      40             :                                  | POSIX_FADV_SEQUENTIAL /* Read from 0 to file-size    */
      41             : #endif
      42             : #ifdef POSIX_FADV_WILLNEED
      43             :                                  | POSIX_FADV_WILLNEED /* Tell the kernel to readhead */
      44             : #endif
      45             : #ifdef POSIX_FADV_NOREUSE
      46             :                                  | POSIX_FADV_NOREUSE /* We will not reuse old data  */
      47             : #endif
      48             :     ;
      49             : 
      50             : #define DIVIDE_CEIL(n, m) ((n) / (m) + !!((n) % (m)))
      51             : 
      52             : struct _RmHasher {
      53             :     RmDigestType digest_type;
      54             :     gboolean use_buffered_read;
      55             :     guint64 cache_quota_bytes;
      56             :     gpointer session_user_data;
      57             :     RmBufferPool *mem_pool;
      58             :     RmHasherCallback callback;
      59             : 
      60             :     GAsyncQueue *hashpipe_pool;
      61             :     GAsyncQueue *return_queue;
      62             :     GMutex lock;
      63             :     GCond cond;
      64             : 
      65             :     gsize buf_size;
      66             :     guint active_tasks;
      67             : };
      68             : 
      69             : struct _RmHasherTask {
      70             :     RmHasher *hasher;
      71             :     GThreadPool *hashpipe;
      72             :     RmDigest *digest;
      73             :     gpointer task_user_data;
      74             : };
      75             : 
      76      221824 : static void rm_hasher_task_free(RmHasherTask *self) {
      77      221824 :     g_async_queue_push(self->hasher->hashpipe_pool, self->hashpipe);
      78      221826 :     g_slice_free(RmHasherTask, self);
      79      221815 : }
      80             : 
      81             : /* GThreadPool Worker for hashing */
      82      495550 : static void rm_hasher_hashpipe_worker(RmBuffer *buffer, RmHasher *hasher) {
      83      495550 :     if(buffer->len > 0) {
      84             :         /* Update digest with buffer->data */
      85      273734 :         rm_digest_buffered_update(buffer);
      86             :     } else {
      87             :         /* finalise via callback */
      88      221816 :         RmHasherTask *task = buffer->user_data;
      89      221816 :         hasher->callback(hasher, buffer->digest, hasher->session_user_data,
      90             :                          task->task_user_data);
      91             : 
      92      221810 :         rm_buffer_pool_release(buffer);
      93      221827 :         rm_hasher_task_free(task);
      94             : 
      95      221811 :         g_mutex_lock(&hasher->lock);
      96             :         {
      97             :             /* decrease active task count and signal same */
      98      221829 :             hasher->active_tasks--;
      99      221829 :             g_cond_signal(&hasher->cond);
     100             :         }
     101      221829 :         g_mutex_unlock(&hasher->lock);
     102             :     }
     103      495567 : }
     104             : 
     105             : //////////////////////////////////////
     106             : //  File Reading Utilities          //
     107             : //////////////////////////////////////
     108             : 
     109      221682 : static void rm_hasher_request_readahead(int fd, RmOff seek_offset, RmOff bytes_to_read) {
     110             : /* Give the kernel scheduler some hints */
     111             : #if HAVE_POSIX_FADVISE && HASHER_FADVISE_FLAGS
     112             :     RmOff readahead = bytes_to_read * 8;
     113             :     posix_fadvise(fd, seek_offset, readahead, HASHER_FADVISE_FLAGS);
     114             : #else
     115             :     (void)fd;
     116             :     (void)seek_offset;
     117             :     (void)bytes_to_read;
     118             : #endif
     119      221682 : }
     120             : 
     121         140 : static gint64 rm_hasher_symlink_read(RmHasher *hasher, RmDigest *digest, char *path) {
     122             :     /* Fake an IO operation on the symlink.  */
     123         140 :     RmBuffer *buf = rm_buffer_pool_get(hasher->mem_pool);
     124         140 :     buf->len = 256;
     125         140 :     memset(buf->data, 0, buf->len);
     126             : 
     127             :     RmStat stat_buf;
     128         140 :     if(rm_sys_stat(path, &stat_buf) == -1) {
     129             :         /* Oops, that did not work out, report as an error */
     130           0 :         rm_log_perror("Cannot stat symbolic link");
     131           0 :         return -1;
     132             :     }
     133             : 
     134         140 :     gint data_size =
     135         280 :         snprintf((char *)buf->data, rm_buffer_size(hasher->mem_pool), "%ld:%ld",
     136         280 :                  (long)stat_buf.st_dev, (long)stat_buf.st_ino);
     137         140 :     buf->len = data_size;
     138         140 :     buf->digest = digest;
     139             : 
     140         140 :     rm_digest_buffered_update(buf);
     141             : 
     142             :     /* In case of paranoia: shrink the used data buffer, so comparasion works
     143             :      * as expected. Otherwise a full buffer is used with possibly different
     144             :      * content */
     145         140 :     if(digest->type == RM_DIGEST_PARANOID) {
     146          28 :         rm_digest_paranoia_shrink(digest, data_size);
     147             :     }
     148         140 :     return 0;
     149             : }
     150             : 
     151             : /* Reads data from file and sends to hasher threadpool
     152             :  * returns number of bytes successfully read */
     153             : 
     154        7888 : static gint64 rm_hasher_buffered_read(RmHasher *hasher, GThreadPool *hashpipe,
     155             :                                       RmDigest *digest, char *path, gsize start_offset,
     156             :                                       gsize bytes_to_read) {
     157        7888 :     FILE *fd = NULL;
     158        7888 :     if(bytes_to_read == 0) {
     159           0 :         bytes_to_read = G_MAXSIZE;
     160             :     }
     161             : 
     162        7888 :     gsize total_bytes_read = 0;
     163             : 
     164        7888 :     if((fd = fopen(path, "rb")) == NULL) {
     165           0 :         rm_log_info("fopen(3) failed for %s: %s\n", path, g_strerror(errno));
     166           0 :         goto finish;
     167             :     }
     168             : 
     169        7888 :     gint32 bytes_read = 0;
     170             : 
     171        7888 :     rm_hasher_request_readahead(fileno(fd), start_offset, bytes_to_read);
     172             : 
     173        7888 :     if(fseek(fd, start_offset, SEEK_SET) == -1) {
     174           0 :         rm_log_perror("fseek(3) failed");
     175           0 :         goto finish;
     176             :     }
     177             : 
     178        7888 :     RmBuffer *buffer = rm_buffer_pool_get(hasher->mem_pool);
     179             : 
     180       25524 :     while((bytes_read =
     181       17636 :                fread(buffer->data, 1, MIN(bytes_to_read, hasher->buf_size), fd)) > 0) {
     182        9748 :         bytes_to_read -= bytes_read;
     183             : 
     184        9748 :         buffer->len = bytes_read;
     185        9748 :         buffer->digest = digest;
     186        9748 :         rm_util_thread_pool_push(hashpipe, buffer);
     187             : 
     188        9748 :         total_bytes_read += bytes_read;
     189        9748 :         buffer = rm_buffer_pool_get(hasher->mem_pool);
     190             :     }
     191        7888 :     rm_buffer_pool_release(buffer);
     192             : 
     193        7888 :     if(ferror(fd) != 0) {
     194           0 :         rm_log_perror("fread(3) failed");
     195           0 :         if(total_bytes_read == bytes_to_read) {
     196             :             /* signal error to caller */
     197           0 :             total_bytes_read++;
     198             :         }
     199             :     }
     200             : 
     201             : finish:
     202        7888 :     if(fd != NULL) {
     203        7888 :         fclose(fd);
     204             :     }
     205        7888 :     return total_bytes_read;
     206             : }
     207             : 
     208             : /* Reads data from file and sends to hasher threadpool
     209             :  * returns number of bytes successfully read */
     210             : 
     211      213798 : static gint64 rm_hasher_unbuffered_read(RmHasher *hasher, GThreadPool *hashpipe,
     212             :                                         RmDigest *digest, char *path, gint64 start_offset,
     213      213798 :                                         gint64 bytes_to_read) {
     214      213798 :     gint32 bytes_read = 0;
     215      213798 :     gint64 total_bytes_read = 0;
     216      213798 :     guint64 file_offset = start_offset;
     217             : 
     218      213798 :     if(bytes_to_read == 0) {
     219             :         RmStat stat_buf;
     220           0 :         if(rm_sys_stat(path, &stat_buf) != -1) {
     221           0 :             bytes_to_read = MAX(stat_buf.st_size - start_offset, 0);
     222             :         }
     223             :     }
     224             : 
     225             :     /* how many buffers to read? */
     226      213798 :     const gint16 N_BUFFERS = MIN(4, DIVIDE_CEIL(bytes_to_read, hasher->buf_size));
     227      427593 :     struct iovec readvec[N_BUFFERS + 1];
     228             : 
     229      213798 :     int fd = 0;
     230             : 
     231      213798 :     fd = rm_sys_open(path, O_RDONLY);
     232      213798 :     if(fd == -1) {
     233           0 :         rm_log_info("open(2) failed for %s: %s\n", path, g_strerror(errno));
     234           0 :         goto finish;
     235             :     }
     236             : 
     237             :     /* preadv() is beneficial for large files since it can cut the
     238             :      * number of syscall heavily.  I suggest N_BUFFERS=4 as good
     239             :      * compromise between memory and cpu.
     240             :      *
     241             :      * With 16 buffers: 43% cpu 33,871 total
     242             :      * With  8 buffers: 43% cpu 32,098 total
     243             :      * With  4 buffers: 42% cpu 32,091 total
     244             :      * With  2 buffers: 44% cpu 32,245 total
     245             :      * With  1 buffers: 45% cpu 34,491 total
     246             :      */
     247             : 
     248             :     /* Give the kernel scheduler some hints */
     249      213798 :     rm_hasher_request_readahead(fd, start_offset, bytes_to_read);
     250             : 
     251             :     /* Initialize the buffers to begin with.
     252             :      * After a buffer is full, a new one is retrieved.
     253             :      */
     254             :     RmBuffer **buffers;
     255      213794 :     buffers = g_slice_alloc(sizeof(*buffers) * N_BUFFERS);
     256             : 
     257      213795 :     memset(readvec, 0, sizeof(readvec));
     258      428567 :     for(int i = 0; i < N_BUFFERS; ++i) {
     259             :         /* buffer is one contignous memory block */
     260      214767 :         buffers[i] = rm_buffer_pool_get(hasher->mem_pool);
     261      214772 :         readvec[i].iov_base = buffers[i]->data;
     262      214772 :         readvec[i].iov_len = hasher->buf_size;
     263             :     }
     264             : 
     265      879820 :     while((bytes_to_read == 0 || total_bytes_read < bytes_to_read) &&
     266      226112 :           (bytes_read = rm_sys_preadv(fd, readvec, N_BUFFERS, file_offset)) > 0) {
     267      226107 :         bytes_read =
     268      226107 :             MIN(bytes_read, bytes_to_read - total_bytes_read); /* ignore over-reads */
     269             : 
     270      226107 :         int blocks = DIVIDE_CEIL(bytes_read, hasher->buf_size);
     271      226107 :         g_assert(blocks <= N_BUFFERS);
     272             : 
     273      226105 :         total_bytes_read += bytes_read;
     274      226105 :         file_offset += bytes_read;
     275             : 
     276      490126 :         for(int i = 0; i < blocks; ++i) {
     277             :             /* Get the RmBuffer from the datapointer */
     278      264013 :             RmBuffer *buffer = buffers[i];
     279      264013 :             buffer->len = MIN(hasher->buf_size, bytes_read - i * hasher->buf_size);
     280      264013 :             buffer->digest = digest;
     281             : 
     282             :             /* Send it to the hasher */
     283      264013 :             rm_util_thread_pool_push(hashpipe, buffer);
     284             : 
     285             :             /* Allocate a new buffer - hasher will release the old buffer */
     286      264021 :             buffers[i] = rm_buffer_pool_get(hasher->mem_pool);
     287      264021 :             readvec[i].iov_base = buffers[i]->data;
     288      264021 :             readvec[i].iov_len = hasher->buf_size;
     289             :         }
     290             :     }
     291             : 
     292      213801 :     if(bytes_read == -1) {
     293           0 :         rm_log_perror("preadv failed");
     294      213801 :     } else if(total_bytes_read != bytes_to_read) {
     295           0 :         rm_log_error_line(_("Something went wrong reading %s; expected %li bytes, "
     296             :                             "got %li; ignoring"),
     297             :                           path, (long int)bytes_to_read, (long int)total_bytes_read);
     298             :     }
     299             : 
     300             :     /* Release the rest of the buffers */
     301      428574 :     for(int i = 0; i < N_BUFFERS; ++i) {
     302      214773 :         rm_buffer_pool_release(buffers[i]);
     303             :     }
     304      213801 :     g_slice_free1(sizeof(*buffers) * N_BUFFERS, buffers);
     305             : 
     306             : finish:
     307      213801 :     if(fd > 0) {
     308      213801 :         rm_sys_close(fd);
     309             :     }
     310             : 
     311      213801 :     return total_bytes_read;
     312             : }
     313             : 
     314             : //////////////////////////////////////
     315             : //  RmHasher                        //
     316             : //////////////////////////////////////
     317             : 
     318     1126449 : static void rm_hasher_hashpipe_free(GThreadPool *hashpipe) {
     319             :     /* free the GThreadPool; wait for any in-progress jobs to finish */
     320     1126449 :     g_thread_pool_free(hashpipe, FALSE, TRUE);
     321     1126449 : }
     322             : 
     323             : /* local joiner if user provides no joiner to rm_hasher_new() */
     324           0 : static RmHasherCallback *rm_hasher_joiner(RmHasher *hasher, RmDigest *digest,
     325             :                                           _U gpointer session_user_data,
     326             :                                           _U gpointer task_user_data) {
     327           0 :     g_async_queue_push(hasher->return_queue, digest);
     328           0 :     return 0;
     329             : }
     330             : 
     331             : //////////////////////////////////////
     332             : //     API
     333             : //////////////////////////////////////
     334             : 
     335       36458 : RmHasher *rm_hasher_new(RmDigestType digest_type,
     336             :                         guint num_threads,
     337             :                         gboolean use_buffered_read,
     338             :                         gsize buf_size,
     339             :                         guint64 cache_quota_bytes,
     340             :                         guint64 target_kept_bytes,
     341             :                         RmHasherCallback joiner,
     342             :                         gpointer session_user_data) {
     343       36458 :     RmHasher *self = g_slice_new0(RmHasher);
     344       36458 :     self->digest_type = digest_type;
     345             : 
     346       36458 :     self->use_buffered_read = use_buffered_read;
     347       36458 :     self->buf_size = buf_size;
     348       36458 :     self->cache_quota_bytes = cache_quota_bytes;
     349             : 
     350       36458 :     if(joiner) {
     351       36458 :         self->callback = joiner;
     352             :     } else {
     353           0 :         self->callback = (RmHasherCallback)rm_hasher_joiner;
     354           0 :         self->return_queue = g_async_queue_new();
     355             :     }
     356             : 
     357       36458 :     self->session_user_data = session_user_data;
     358             : 
     359             :     /* initialise mutex & cond */
     360       36458 :     g_mutex_init(&self->lock);
     361       36458 :     g_cond_init(&self->cond);
     362             : 
     363             :     /* Create buffer mem pool */
     364       36458 :     self->mem_pool = rm_buffer_pool_init(buf_size, cache_quota_bytes, target_kept_bytes);
     365             : 
     366             :     /* Create a pool of hashing thread "pools" - each "pool" can only have
     367             :      * one thread because hashing must be done in order */
     368       36458 :     self->hashpipe_pool = g_async_queue_new_full((GDestroyNotify)rm_hasher_hashpipe_free);
     369       36458 :     g_assert(num_threads > 0);
     370     1162907 :     for(guint i = 0; i < num_threads; i++) {
     371     1126449 :         g_async_queue_push(
     372             :             self->hashpipe_pool,
     373     1126449 :             rm_util_thread_pool_new((GFunc)rm_hasher_hashpipe_worker, self, 1));
     374             :     }
     375       36458 :     return self;
     376             : }
     377             : 
     378       36458 : void rm_hasher_free(RmHasher *hasher, gboolean wait) {
     379             :     /* Note that hasher may be multi-threaded, both at the reader level and at
     380             :      * the hashpipe level.  To ensure graceful exit, the hasher is reference counted
     381             :      * via hasher->active_tasks.
     382             :      */
     383       36458 :     if(wait) {
     384       36458 :         g_mutex_lock(&hasher->lock);
     385             :         {
     386       72916 :             while(hasher->active_tasks > 0) {
     387           0 :                 g_cond_wait(&hasher->cond, &hasher->lock);
     388             :             }
     389             :         }
     390       36458 :         g_mutex_unlock(&hasher->lock);
     391             :     }
     392             : 
     393       36458 :     g_async_queue_unref(hasher->hashpipe_pool);
     394             : 
     395       36458 :     rm_buffer_pool_destroy(hasher->mem_pool);
     396       36458 :     g_cond_clear(&hasher->cond);
     397       36458 :     g_mutex_clear(&hasher->lock);
     398       36458 :     g_slice_free(RmHasher, hasher);
     399       36458 : }
     400             : 
     401      221827 : RmHasherTask *rm_hasher_task_new(RmHasher *hasher, RmDigest *digest,
     402             :                                  gpointer task_user_data) {
     403      221827 :     g_mutex_lock(&hasher->lock);
     404      221829 :     { hasher->active_tasks++; }
     405      221829 :     g_mutex_unlock(&hasher->lock);
     406             : 
     407      221829 :     RmHasherTask *self = g_slice_new0(RmHasherTask);
     408      221824 :     self->hasher = hasher;
     409      221824 :     self->digest =
     410             :         digest ? digest
     411      221824 :                : rm_digest_new(hasher->digest_type, 0, 0, rm_digest_paranoia_bytes(),
     412           0 :                                hasher->digest_type == RM_DIGEST_PARANOID);
     413             : 
     414      221824 :     self->hashpipe = g_async_queue_pop(hasher->hashpipe_pool);
     415      221828 :     self->task_user_data = task_user_data;
     416      221828 :     return self;
     417             : }
     418             : 
     419      221829 : gboolean rm_hasher_task_hash(RmHasherTask *task, char *path, guint64 start_offset,
     420             :                              guint64 bytes_to_read, gboolean is_symlink) {
     421      221829 :     guint64 bytes_read = 0;
     422      221829 :     if(is_symlink) {
     423         140 :         bytes_read = rm_hasher_symlink_read(task->hasher, task->digest, path);
     424      221689 :     } else if(task->hasher->use_buffered_read) {
     425        7888 :         bytes_read = rm_hasher_buffered_read(task->hasher, task->hashpipe, task->digest,
     426             :                                              path, start_offset, bytes_to_read);
     427             :     } else {
     428      213801 :         bytes_read = rm_hasher_unbuffered_read(task->hasher, task->hashpipe, task->digest,
     429             :                                                path, start_offset, bytes_to_read);
     430             :     }
     431             : 
     432      221829 :     return ((is_symlink && bytes_read == 0) || bytes_read == bytes_to_read);
     433             : }
     434             : 
     435      221829 : RmDigest *rm_hasher_task_finish(RmHasherTask *task) {
     436             :     /* get a dummy buffer to use to signal the hasher thread that this increment is
     437             :      * finished */
     438      221829 :     RmHasher *hasher = task->hasher;
     439      221829 :     RmBuffer *finisher = rm_buffer_pool_get(task->hasher->mem_pool);
     440      221829 :     finisher->digest = task->digest;
     441      221829 :     finisher->len = 0;
     442      221829 :     finisher->user_data = task;
     443      221829 :     rm_util_thread_pool_push(task->hashpipe, finisher);
     444             : 
     445             :     /* return hashpipe to hashpipe_pool */
     446             :     // g_async_queue_push(task->hasher->hashpipe_pool, task->hashpipe);
     447             : 
     448      221827 :     if(hasher->return_queue) {
     449           0 :         return (g_async_queue_pop(hasher->return_queue));
     450             :     } else {
     451      221827 :         return NULL;
     452             :     }
     453             : }

Generated by: LCOV version 1.11