Line data Source code
1 : /*
2 : * This file is part of rmlint.
3 : *
4 : * rmlint is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * rmlint is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with rmlint. If not, see <http://www.gnu.org/licenses/>.
16 : *
17 : * Authors:
18 : *
19 : * - Christopher <sahib> Pahl 2010-2015 (https://github.com/sahib)
20 : * - Daniel <SeeSpotRun> T. 2014-2015 (https://github.com/SeeSpotRun)
21 : *
22 : * Hosted on http://github.com/sahib/rmlint
23 : *
24 : */
25 :
26 : #include <stdio.h>
27 : #include <string.h>
28 : #include <glib.h>
29 :
30 : #include <fcntl.h>
31 :
32 : #include "hasher.h"
33 : #include "utilities.h"
34 :
35 : /* Flags for the fadvise() call that tells the kernel
36 : * what we want to do with the file.
37 : */
38 : const int HASHER_FADVISE_FLAGS = 0
39 : #ifdef POSIX_FADV_SEQUENTIAL
40 : | POSIX_FADV_SEQUENTIAL /* Read from 0 to file-size */
41 : #endif
42 : #ifdef POSIX_FADV_WILLNEED
43 : | POSIX_FADV_WILLNEED /* Tell the kernel to readhead */
44 : #endif
45 : #ifdef POSIX_FADV_NOREUSE
46 : | POSIX_FADV_NOREUSE /* We will not reuse old data */
47 : #endif
48 : ;
49 :
50 : #define DIVIDE_CEIL(n, m) ((n) / (m) + !!((n) % (m)))
51 :
52 : struct _RmHasher {
53 : RmDigestType digest_type;
54 : gboolean use_buffered_read;
55 : guint64 cache_quota_bytes;
56 : gpointer session_user_data;
57 : RmBufferPool *mem_pool;
58 : RmHasherCallback callback;
59 :
60 : GAsyncQueue *hashpipe_pool;
61 : GAsyncQueue *return_queue;
62 : GMutex lock;
63 : GCond cond;
64 :
65 : gsize buf_size;
66 : guint active_tasks;
67 : };
68 :
69 : struct _RmHasherTask {
70 : RmHasher *hasher;
71 : GThreadPool *hashpipe;
72 : RmDigest *digest;
73 : gpointer task_user_data;
74 : };
75 :
76 221824 : static void rm_hasher_task_free(RmHasherTask *self) {
77 221824 : g_async_queue_push(self->hasher->hashpipe_pool, self->hashpipe);
78 221826 : g_slice_free(RmHasherTask, self);
79 221815 : }
80 :
81 : /* GThreadPool Worker for hashing */
82 495550 : static void rm_hasher_hashpipe_worker(RmBuffer *buffer, RmHasher *hasher) {
83 495550 : if(buffer->len > 0) {
84 : /* Update digest with buffer->data */
85 273734 : rm_digest_buffered_update(buffer);
86 : } else {
87 : /* finalise via callback */
88 221816 : RmHasherTask *task = buffer->user_data;
89 221816 : hasher->callback(hasher, buffer->digest, hasher->session_user_data,
90 : task->task_user_data);
91 :
92 221810 : rm_buffer_pool_release(buffer);
93 221827 : rm_hasher_task_free(task);
94 :
95 221811 : g_mutex_lock(&hasher->lock);
96 : {
97 : /* decrease active task count and signal same */
98 221829 : hasher->active_tasks--;
99 221829 : g_cond_signal(&hasher->cond);
100 : }
101 221829 : g_mutex_unlock(&hasher->lock);
102 : }
103 495567 : }
104 :
105 : //////////////////////////////////////
106 : // File Reading Utilities //
107 : //////////////////////////////////////
108 :
109 221682 : static void rm_hasher_request_readahead(int fd, RmOff seek_offset, RmOff bytes_to_read) {
110 : /* Give the kernel scheduler some hints */
111 : #if HAVE_POSIX_FADVISE && HASHER_FADVISE_FLAGS
112 : RmOff readahead = bytes_to_read * 8;
113 : posix_fadvise(fd, seek_offset, readahead, HASHER_FADVISE_FLAGS);
114 : #else
115 : (void)fd;
116 : (void)seek_offset;
117 : (void)bytes_to_read;
118 : #endif
119 221682 : }
120 :
121 140 : static gint64 rm_hasher_symlink_read(RmHasher *hasher, RmDigest *digest, char *path) {
122 : /* Fake an IO operation on the symlink. */
123 140 : RmBuffer *buf = rm_buffer_pool_get(hasher->mem_pool);
124 140 : buf->len = 256;
125 140 : memset(buf->data, 0, buf->len);
126 :
127 : RmStat stat_buf;
128 140 : if(rm_sys_stat(path, &stat_buf) == -1) {
129 : /* Oops, that did not work out, report as an error */
130 0 : rm_log_perror("Cannot stat symbolic link");
131 0 : return -1;
132 : }
133 :
134 140 : gint data_size =
135 280 : snprintf((char *)buf->data, rm_buffer_size(hasher->mem_pool), "%ld:%ld",
136 280 : (long)stat_buf.st_dev, (long)stat_buf.st_ino);
137 140 : buf->len = data_size;
138 140 : buf->digest = digest;
139 :
140 140 : rm_digest_buffered_update(buf);
141 :
142 : /* In case of paranoia: shrink the used data buffer, so comparasion works
143 : * as expected. Otherwise a full buffer is used with possibly different
144 : * content */
145 140 : if(digest->type == RM_DIGEST_PARANOID) {
146 28 : rm_digest_paranoia_shrink(digest, data_size);
147 : }
148 140 : return 0;
149 : }
150 :
151 : /* Reads data from file and sends to hasher threadpool
152 : * returns number of bytes successfully read */
153 :
154 7888 : static gint64 rm_hasher_buffered_read(RmHasher *hasher, GThreadPool *hashpipe,
155 : RmDigest *digest, char *path, gsize start_offset,
156 : gsize bytes_to_read) {
157 7888 : FILE *fd = NULL;
158 7888 : if(bytes_to_read == 0) {
159 0 : bytes_to_read = G_MAXSIZE;
160 : }
161 :
162 7888 : gsize total_bytes_read = 0;
163 :
164 7888 : if((fd = fopen(path, "rb")) == NULL) {
165 0 : rm_log_info("fopen(3) failed for %s: %s\n", path, g_strerror(errno));
166 0 : goto finish;
167 : }
168 :
169 7888 : gint32 bytes_read = 0;
170 :
171 7888 : rm_hasher_request_readahead(fileno(fd), start_offset, bytes_to_read);
172 :
173 7888 : if(fseek(fd, start_offset, SEEK_SET) == -1) {
174 0 : rm_log_perror("fseek(3) failed");
175 0 : goto finish;
176 : }
177 :
178 7888 : RmBuffer *buffer = rm_buffer_pool_get(hasher->mem_pool);
179 :
180 25524 : while((bytes_read =
181 17636 : fread(buffer->data, 1, MIN(bytes_to_read, hasher->buf_size), fd)) > 0) {
182 9748 : bytes_to_read -= bytes_read;
183 :
184 9748 : buffer->len = bytes_read;
185 9748 : buffer->digest = digest;
186 9748 : rm_util_thread_pool_push(hashpipe, buffer);
187 :
188 9748 : total_bytes_read += bytes_read;
189 9748 : buffer = rm_buffer_pool_get(hasher->mem_pool);
190 : }
191 7888 : rm_buffer_pool_release(buffer);
192 :
193 7888 : if(ferror(fd) != 0) {
194 0 : rm_log_perror("fread(3) failed");
195 0 : if(total_bytes_read == bytes_to_read) {
196 : /* signal error to caller */
197 0 : total_bytes_read++;
198 : }
199 : }
200 :
201 : finish:
202 7888 : if(fd != NULL) {
203 7888 : fclose(fd);
204 : }
205 7888 : return total_bytes_read;
206 : }
207 :
208 : /* Reads data from file and sends to hasher threadpool
209 : * returns number of bytes successfully read */
210 :
211 213798 : static gint64 rm_hasher_unbuffered_read(RmHasher *hasher, GThreadPool *hashpipe,
212 : RmDigest *digest, char *path, gint64 start_offset,
213 213798 : gint64 bytes_to_read) {
214 213798 : gint32 bytes_read = 0;
215 213798 : gint64 total_bytes_read = 0;
216 213798 : guint64 file_offset = start_offset;
217 :
218 213798 : if(bytes_to_read == 0) {
219 : RmStat stat_buf;
220 0 : if(rm_sys_stat(path, &stat_buf) != -1) {
221 0 : bytes_to_read = MAX(stat_buf.st_size - start_offset, 0);
222 : }
223 : }
224 :
225 : /* how many buffers to read? */
226 213798 : const gint16 N_BUFFERS = MIN(4, DIVIDE_CEIL(bytes_to_read, hasher->buf_size));
227 427593 : struct iovec readvec[N_BUFFERS + 1];
228 :
229 213798 : int fd = 0;
230 :
231 213798 : fd = rm_sys_open(path, O_RDONLY);
232 213798 : if(fd == -1) {
233 0 : rm_log_info("open(2) failed for %s: %s\n", path, g_strerror(errno));
234 0 : goto finish;
235 : }
236 :
237 : /* preadv() is beneficial for large files since it can cut the
238 : * number of syscall heavily. I suggest N_BUFFERS=4 as good
239 : * compromise between memory and cpu.
240 : *
241 : * With 16 buffers: 43% cpu 33,871 total
242 : * With 8 buffers: 43% cpu 32,098 total
243 : * With 4 buffers: 42% cpu 32,091 total
244 : * With 2 buffers: 44% cpu 32,245 total
245 : * With 1 buffers: 45% cpu 34,491 total
246 : */
247 :
248 : /* Give the kernel scheduler some hints */
249 213798 : rm_hasher_request_readahead(fd, start_offset, bytes_to_read);
250 :
251 : /* Initialize the buffers to begin with.
252 : * After a buffer is full, a new one is retrieved.
253 : */
254 : RmBuffer **buffers;
255 213794 : buffers = g_slice_alloc(sizeof(*buffers) * N_BUFFERS);
256 :
257 213795 : memset(readvec, 0, sizeof(readvec));
258 428567 : for(int i = 0; i < N_BUFFERS; ++i) {
259 : /* buffer is one contignous memory block */
260 214767 : buffers[i] = rm_buffer_pool_get(hasher->mem_pool);
261 214772 : readvec[i].iov_base = buffers[i]->data;
262 214772 : readvec[i].iov_len = hasher->buf_size;
263 : }
264 :
265 879820 : while((bytes_to_read == 0 || total_bytes_read < bytes_to_read) &&
266 226112 : (bytes_read = rm_sys_preadv(fd, readvec, N_BUFFERS, file_offset)) > 0) {
267 226107 : bytes_read =
268 226107 : MIN(bytes_read, bytes_to_read - total_bytes_read); /* ignore over-reads */
269 :
270 226107 : int blocks = DIVIDE_CEIL(bytes_read, hasher->buf_size);
271 226107 : g_assert(blocks <= N_BUFFERS);
272 :
273 226105 : total_bytes_read += bytes_read;
274 226105 : file_offset += bytes_read;
275 :
276 490126 : for(int i = 0; i < blocks; ++i) {
277 : /* Get the RmBuffer from the datapointer */
278 264013 : RmBuffer *buffer = buffers[i];
279 264013 : buffer->len = MIN(hasher->buf_size, bytes_read - i * hasher->buf_size);
280 264013 : buffer->digest = digest;
281 :
282 : /* Send it to the hasher */
283 264013 : rm_util_thread_pool_push(hashpipe, buffer);
284 :
285 : /* Allocate a new buffer - hasher will release the old buffer */
286 264021 : buffers[i] = rm_buffer_pool_get(hasher->mem_pool);
287 264021 : readvec[i].iov_base = buffers[i]->data;
288 264021 : readvec[i].iov_len = hasher->buf_size;
289 : }
290 : }
291 :
292 213801 : if(bytes_read == -1) {
293 0 : rm_log_perror("preadv failed");
294 213801 : } else if(total_bytes_read != bytes_to_read) {
295 0 : rm_log_error_line(_("Something went wrong reading %s; expected %li bytes, "
296 : "got %li; ignoring"),
297 : path, (long int)bytes_to_read, (long int)total_bytes_read);
298 : }
299 :
300 : /* Release the rest of the buffers */
301 428574 : for(int i = 0; i < N_BUFFERS; ++i) {
302 214773 : rm_buffer_pool_release(buffers[i]);
303 : }
304 213801 : g_slice_free1(sizeof(*buffers) * N_BUFFERS, buffers);
305 :
306 : finish:
307 213801 : if(fd > 0) {
308 213801 : rm_sys_close(fd);
309 : }
310 :
311 213801 : return total_bytes_read;
312 : }
313 :
314 : //////////////////////////////////////
315 : // RmHasher //
316 : //////////////////////////////////////
317 :
318 1126449 : static void rm_hasher_hashpipe_free(GThreadPool *hashpipe) {
319 : /* free the GThreadPool; wait for any in-progress jobs to finish */
320 1126449 : g_thread_pool_free(hashpipe, FALSE, TRUE);
321 1126449 : }
322 :
323 : /* local joiner if user provides no joiner to rm_hasher_new() */
324 0 : static RmHasherCallback *rm_hasher_joiner(RmHasher *hasher, RmDigest *digest,
325 : _U gpointer session_user_data,
326 : _U gpointer task_user_data) {
327 0 : g_async_queue_push(hasher->return_queue, digest);
328 0 : return 0;
329 : }
330 :
331 : //////////////////////////////////////
332 : // API
333 : //////////////////////////////////////
334 :
335 36458 : RmHasher *rm_hasher_new(RmDigestType digest_type,
336 : guint num_threads,
337 : gboolean use_buffered_read,
338 : gsize buf_size,
339 : guint64 cache_quota_bytes,
340 : guint64 target_kept_bytes,
341 : RmHasherCallback joiner,
342 : gpointer session_user_data) {
343 36458 : RmHasher *self = g_slice_new0(RmHasher);
344 36458 : self->digest_type = digest_type;
345 :
346 36458 : self->use_buffered_read = use_buffered_read;
347 36458 : self->buf_size = buf_size;
348 36458 : self->cache_quota_bytes = cache_quota_bytes;
349 :
350 36458 : if(joiner) {
351 36458 : self->callback = joiner;
352 : } else {
353 0 : self->callback = (RmHasherCallback)rm_hasher_joiner;
354 0 : self->return_queue = g_async_queue_new();
355 : }
356 :
357 36458 : self->session_user_data = session_user_data;
358 :
359 : /* initialise mutex & cond */
360 36458 : g_mutex_init(&self->lock);
361 36458 : g_cond_init(&self->cond);
362 :
363 : /* Create buffer mem pool */
364 36458 : self->mem_pool = rm_buffer_pool_init(buf_size, cache_quota_bytes, target_kept_bytes);
365 :
366 : /* Create a pool of hashing thread "pools" - each "pool" can only have
367 : * one thread because hashing must be done in order */
368 36458 : self->hashpipe_pool = g_async_queue_new_full((GDestroyNotify)rm_hasher_hashpipe_free);
369 36458 : g_assert(num_threads > 0);
370 1162907 : for(guint i = 0; i < num_threads; i++) {
371 1126449 : g_async_queue_push(
372 : self->hashpipe_pool,
373 1126449 : rm_util_thread_pool_new((GFunc)rm_hasher_hashpipe_worker, self, 1));
374 : }
375 36458 : return self;
376 : }
377 :
378 36458 : void rm_hasher_free(RmHasher *hasher, gboolean wait) {
379 : /* Note that hasher may be multi-threaded, both at the reader level and at
380 : * the hashpipe level. To ensure graceful exit, the hasher is reference counted
381 : * via hasher->active_tasks.
382 : */
383 36458 : if(wait) {
384 36458 : g_mutex_lock(&hasher->lock);
385 : {
386 72916 : while(hasher->active_tasks > 0) {
387 0 : g_cond_wait(&hasher->cond, &hasher->lock);
388 : }
389 : }
390 36458 : g_mutex_unlock(&hasher->lock);
391 : }
392 :
393 36458 : g_async_queue_unref(hasher->hashpipe_pool);
394 :
395 36458 : rm_buffer_pool_destroy(hasher->mem_pool);
396 36458 : g_cond_clear(&hasher->cond);
397 36458 : g_mutex_clear(&hasher->lock);
398 36458 : g_slice_free(RmHasher, hasher);
399 36458 : }
400 :
401 221827 : RmHasherTask *rm_hasher_task_new(RmHasher *hasher, RmDigest *digest,
402 : gpointer task_user_data) {
403 221827 : g_mutex_lock(&hasher->lock);
404 221829 : { hasher->active_tasks++; }
405 221829 : g_mutex_unlock(&hasher->lock);
406 :
407 221829 : RmHasherTask *self = g_slice_new0(RmHasherTask);
408 221824 : self->hasher = hasher;
409 221824 : self->digest =
410 : digest ? digest
411 221824 : : rm_digest_new(hasher->digest_type, 0, 0, rm_digest_paranoia_bytes(),
412 0 : hasher->digest_type == RM_DIGEST_PARANOID);
413 :
414 221824 : self->hashpipe = g_async_queue_pop(hasher->hashpipe_pool);
415 221828 : self->task_user_data = task_user_data;
416 221828 : return self;
417 : }
418 :
419 221829 : gboolean rm_hasher_task_hash(RmHasherTask *task, char *path, guint64 start_offset,
420 : guint64 bytes_to_read, gboolean is_symlink) {
421 221829 : guint64 bytes_read = 0;
422 221829 : if(is_symlink) {
423 140 : bytes_read = rm_hasher_symlink_read(task->hasher, task->digest, path);
424 221689 : } else if(task->hasher->use_buffered_read) {
425 7888 : bytes_read = rm_hasher_buffered_read(task->hasher, task->hashpipe, task->digest,
426 : path, start_offset, bytes_to_read);
427 : } else {
428 213801 : bytes_read = rm_hasher_unbuffered_read(task->hasher, task->hashpipe, task->digest,
429 : path, start_offset, bytes_to_read);
430 : }
431 :
432 221829 : return ((is_symlink && bytes_read == 0) || bytes_read == bytes_to_read);
433 : }
434 :
435 221829 : RmDigest *rm_hasher_task_finish(RmHasherTask *task) {
436 : /* get a dummy buffer to use to signal the hasher thread that this increment is
437 : * finished */
438 221829 : RmHasher *hasher = task->hasher;
439 221829 : RmBuffer *finisher = rm_buffer_pool_get(task->hasher->mem_pool);
440 221829 : finisher->digest = task->digest;
441 221829 : finisher->len = 0;
442 221829 : finisher->user_data = task;
443 221829 : rm_util_thread_pool_push(task->hashpipe, finisher);
444 :
445 : /* return hashpipe to hashpipe_pool */
446 : // g_async_queue_push(task->hasher->hashpipe_pool, task->hashpipe);
447 :
448 221827 : if(hasher->return_queue) {
449 0 : return (g_async_queue_pop(hasher->return_queue));
450 : } else {
451 221827 : return NULL;
452 : }
453 : }
|