Line data Source code
1 : /*
2 : * This file is part of rmlint.
3 : *
4 : * rmlint is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * rmlint is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with rmlint. If not, see <http://www.gnu.org/licenses/>.
16 : *
17 : * Authors:
18 : *
19 : * - Christopher <sahib> Pahl 2010-2015 (https://github.com/sahib)
20 : * - Daniel <SeeSpotRun> T. 2014-2015 (https://github.com/SeeSpotRun)
21 : *
22 : * Hosted on http://github.com/sahib/rmlint
23 : *
24 : */
25 :
26 : #include "md-scheduler.h"
27 :
28 : /* handy for comparing 64-bit integers and returning int */
29 : #define SIGN_DIFF(X, Y) (((X) > (Y)) - ((X) < (Y)))
30 :
31 : /* How many milliseconds to sleep if we encounter an empty file queue.
32 : * This prevents a "starving" RmShredDevice from hogging cpu and cluttering up
33 : * debug messages by continually recycling back to the joiner.
34 : */
35 : #if _RM_MDS_DEBUG
36 : #define MDS_EMPTYQUEUE_SLEEP_US (60 * 1000 * 1000) /* 60 seconds */
37 : #else
38 : #define MDS_EMPTYQUEUE_SLEEP_US (50 * 1000) /* 0.05 second */
39 : #endif
40 :
41 : ///////////////////////////////////////
42 : // Structures //
43 : ///////////////////////////////////////
44 :
45 : struct _RmMDS {
46 : /* Structure for RmMDS object/session */
47 :
48 : /* The function called for each task */
49 : RmMDSFunc func;
50 :
51 : /* Threadpool for device workers */
52 : GThreadPool *pool;
53 :
54 : /* Sorting function for device task queues */
55 : RmMDSSortFunc prioritiser;
56 :
57 : /* Mounts table for grouping dev's by physical devices
58 : * and identifying rotationality */
59 : RmMountTable *mount_table;
60 :
61 : /* If true then don't use mount table; interpret user-supplied dev as disk id */
62 : bool fake_disk;
63 :
64 : /* Table of physical disk/devices */
65 : GHashTable *disks;
66 :
67 : /* Lock for access to:
68 : * self->disks
69 : */
70 : GMutex lock;
71 :
72 : /* flag for whether threadpool is running */
73 : gboolean running;
74 :
75 : /* quota to limit number of tasks per pass of each device */
76 : gint pass_quota;
77 :
78 : /* pointer to user data to be passed to func */
79 : gpointer user_data;
80 :
81 : /* number of pending tasks */
82 : guint pending_tasks;
83 :
84 : };
85 :
86 : typedef struct RmMDSDevice {
87 : /* Structure containing data associated with one Device worker thread */
88 :
89 : /* The RmMDS session parent */
90 : RmMDS *mds;
91 :
92 : /* Device's physical disk ID (only used for debug info) */
93 : dev_t disk;
94 :
95 : /* Sorted list of tasks queued for execution */
96 : GSList *sorted_tasks;
97 :
98 : /* Stack for tasks that will be sorted and carried out next pass */
99 : GSList *unsorted_tasks;
100 :
101 : /* Lock for access to:
102 : * self->sorted_tasks
103 : * self->unsorted_tasks
104 : * self->ref_count
105 : */
106 : GMutex lock;
107 : GCond cond;
108 :
109 : /* Reference count for self */
110 : gint ref_count;
111 :
112 : /* is disk rotational? */
113 : gboolean is_rotational;
114 :
115 : } RmMDSDevice;
116 :
117 : //////////////////////////////////////////////
118 : // Internal Structure Init's & Destroyers //
119 : //////////////////////////////////////////////
120 :
121 : /* RmMDSTask */
122 221829 : static RmMDSTask *rm_mds_task_new(const dev_t dev, const guint64 offset,
123 : const gpointer task_data) {
124 221829 : RmMDSTask *self = g_slice_new0(RmMDSTask);
125 221829 : self->dev = dev;
126 221829 : self->offset = offset;
127 221829 : self->task_data = task_data;
128 221829 : return self;
129 : }
130 :
131 221829 : static void rm_mds_task_free(RmMDSTask *task) {
132 221829 : g_slice_free(RmMDSTask, task);
133 221825 : }
134 :
135 : /* RmMDSDevice */
136 38638 : static RmMDSDevice *rm_mds_device_new(RmMDS *mds, const dev_t disk) {
137 38638 : RmMDSDevice *self = g_slice_new0(RmMDSDevice);
138 :
139 38638 : g_mutex_init(&self->lock);
140 38638 : g_cond_init(&self->cond);
141 :
142 38638 : self->mds = mds;
143 38638 : self->ref_count = 0;
144 38638 : self->disk = disk;
145 :
146 38638 : if (mds->fake_disk) {
147 3585 : self->is_rotational = (disk % 2 == 0);
148 : } else {
149 35053 : self->is_rotational = !rm_mounts_is_nonrotational(mds->mount_table, disk);
150 : }
151 :
152 38638 : rm_log_debug_line("Created new RmMDSDevice for %srotational disk #%lu",
153 : self->is_rotational ? "" : "non-", (long unsigned)disk);
154 38638 : return self;
155 : }
156 :
157 : /** @brief Wait for a RmMDSDevice to finish all tasks
158 : **/
159 77134 : static void rm_mds_device_finish(RmMDSDevice *self) {
160 77134 : g_mutex_lock(&self->lock);
161 : {
162 199003 : while(self->ref_count > 0) {
163 44735 : g_cond_wait(&self->cond, &self->lock);
164 : }
165 : }
166 77134 : g_mutex_unlock(&self->lock);
167 77134 : }
168 :
169 : /** @brief Free mem allocated to a RmMDSDevice
170 : **/
171 38638 : static void rm_mds_device_free(RmMDSDevice *self) {
172 38638 : rm_mds_device_finish(self);
173 :
174 38638 : g_mutex_clear(&self->lock);
175 38638 : g_cond_clear(&self->cond);
176 38638 : g_slice_free(RmMDSDevice, self);
177 38638 : }
178 :
179 : ///////////////////////////////////////
180 : // RmMDSDevice Implementation //
181 : ///////////////////////////////////////
182 :
183 : /** @brief Mutex-protected task popper
184 : **/
185 :
186 359540 : static RmMDSTask *rm_mds_pop_task(RmMDSDevice *device) {
187 359540 : RmMDSTask *task = NULL;
188 :
189 359540 : g_mutex_lock(&device->lock);
190 : {
191 359546 : if(device->sorted_tasks) {
192 269809 : task = device->sorted_tasks->data;
193 269805 : device->sorted_tasks =
194 269809 : g_slist_delete_link(device->sorted_tasks, device->sorted_tasks);
195 269805 : device->ref_count--;
196 : }
197 : }
198 359542 : g_mutex_unlock(&device->lock);
199 359554 : return task;
200 : }
201 :
202 : /** @brief Mutex-protected task pusher
203 : **/
204 :
205 269818 : static void rm_mds_push_task(RmMDSDevice *device, RmMDSTask *task) {
206 269818 : g_mutex_lock(&device->lock);
207 : {
208 269818 : device->unsorted_tasks =
209 269818 : g_slist_prepend(device->unsorted_tasks, task);
210 269818 : device->ref_count++;
211 269818 : g_cond_signal(&device->cond);
212 : }
213 269818 : g_mutex_unlock(&device->lock);
214 269818 : }
215 :
216 : /** @brief GCompareDataFunc wrapper for mds->prioritiser
217 : **/
218 434703 : static gint rm_mds_compare(const RmMDSTask *a, const RmMDSTask *b,
219 : RmMDSSortFunc prioritiser) {
220 434703 : gint result = prioritiser(a, b);
221 434688 : return result;
222 : }
223 :
224 : /** @brief RmMDSDevice worker thread
225 : **/
226 89737 : static void rm_mds_factory(RmMDSDevice *device, RmMDS *mds) {
227 : /* rm_mds_factory processes tasks from device->task_list.
228 : * After completing one pass of the device, returns self to the
229 : * mds->pool threadpool. */
230 89737 : gint quota = mds->pass_quota;
231 :
232 89737 : g_mutex_lock(&device->lock);
233 : {
234 : /* check for empty queues - if so then wait a little while before giving up */
235 89735 : if(!device->sorted_tasks && !device->unsorted_tasks && device->ref_count > 0) {
236 : /* timed wait for signal from rm_mds_push_task() */
237 32924 : gint64 end_time = g_get_monotonic_time() + MDS_EMPTYQUEUE_SLEEP_US;
238 32924 : g_cond_wait_until(&device->cond, &device->lock, end_time);
239 : }
240 :
241 : /* merge and sort task lists */
242 89720 : device->sorted_tasks =
243 89728 : g_slist_concat(device->unsorted_tasks, device->sorted_tasks);
244 89720 : device->unsorted_tasks = NULL;
245 89720 : if(mds->prioritiser) {
246 89733 : device->sorted_tasks = g_slist_sort_with_data(
247 : device->sorted_tasks, (GCompareDataFunc)rm_mds_compare,
248 89733 : (RmMDSSortFunc)mds->prioritiser);
249 : }
250 : }
251 89699 : g_mutex_unlock(&device->lock);
252 :
253 : /* process tasks from device->sorted_tasks */
254 89731 : RmMDSTask *task = NULL;
255 449275 : while(quota > 0 &&
256 : (task = rm_mds_pop_task(device))) {
257 269813 : if ( mds->func(task->task_data, mds->user_data) ) {
258 : /* task succeeded */
259 221828 : rm_mds_task_free(task);
260 : /* decrement counters */
261 221829 : --quota;
262 221829 : g_atomic_int_dec_and_test(&device->mds->pending_tasks);
263 : } else {
264 : /* task failed; push it back to device->unsorted_tasks */
265 47989 : rm_mds_push_task(device, task);
266 : }
267 : }
268 :
269 89737 : if(g_atomic_int_get(&device->ref_count) > 0) {
270 : /* return self to pool for further processing */
271 51105 : rm_util_thread_pool_push(mds->pool, device);
272 : } else {
273 : /* signal to rm_mds_device_free() */
274 38632 : g_cond_signal(&device->cond);
275 : }
276 89737 : }
277 :
278 : /** @brief Increase or decrease RmMDSDevice reference count
279 : **/
280 444978 : static void rm_mds_device_ref(RmMDSDevice *device, const gint ref_count) {
281 444978 : g_mutex_lock(&device->lock);
282 444978 : { device->ref_count += ref_count; }
283 444978 : g_mutex_unlock(&device->lock);
284 444978 : }
285 :
286 : /** @brief Push a RmMDSDevice to the threadpool
287 : **/
288 38638 : void rm_mds_device_start(_U gpointer disk, RmMDSDevice *device, RmMDS *mds) {
289 38638 : rm_util_thread_pool_push(mds->pool, device);
290 38638 : }
291 :
292 : /** @brief Push a RmMDSDevice to the threadpool
293 : **/
294 36458 : void rm_mds_start(RmMDS *mds) {
295 36458 : mds->running = TRUE;
296 36458 : g_hash_table_foreach(mds->disks, (GHFunc)rm_mds_device_start, mds);
297 36458 : }
298 :
299 0 : RmMountTable *rm_mds_get_mount_table(const RmMDS *mds) {
300 0 : return mds->mount_table;
301 : }
302 :
303 666807 : static RmMDSDevice *rm_mds_device_get(RmMDS *mds, const dev_t disk) {
304 666807 : RmMDSDevice *result = NULL;
305 666807 : g_mutex_lock(&mds->lock);
306 : {
307 666807 : g_assert(mds->disks);
308 666807 : result = g_hash_table_lookup(mds->disks, GINT_TO_POINTER(disk));
309 666807 : if(!result) {
310 38638 : result = rm_mds_device_new(mds, disk);
311 38638 : g_hash_table_insert(mds->disks, GINT_TO_POINTER(disk), result);
312 38638 : if(g_atomic_int_get(&mds->running) == TRUE) {
313 0 : rm_util_thread_pool_push(mds->pool, result);
314 : }
315 : }
316 : }
317 666807 : g_mutex_unlock(&mds->lock);
318 666807 : return result;
319 : }
320 :
321 0 : static RmMDSDevice *rm_mds_device_get_by_path(RmMDS *mds, const char *path) {
322 0 : dev_t disk = (mds->fake_disk) ? 0 :
323 0 : rm_mounts_get_disk_id_by_path(mds->mount_table, path);
324 0 : return rm_mds_device_get(mds, disk);
325 : }
326 :
327 666807 : static RmMDSDevice *rm_mds_device_get_by_dev(RmMDS *mds, dev_t dev, const char *path) {
328 1309887 : dev_t disk = (mds->fake_disk) ? dev :
329 643080 : rm_mounts_get_disk_id(mds->mount_table, dev, path);
330 666807 : return rm_mds_device_get(mds, disk);
331 : }
332 :
333 : //////////////////////////
334 : // API Implementation //
335 : //////////////////////////
336 :
337 36458 : RmMDS *rm_mds_new(const gint max_threads, RmMountTable *mount_table, bool fake_disk) {
338 36458 : RmMDS *self = g_slice_new0(RmMDS);
339 :
340 36458 : g_mutex_init(&self->lock);
341 :
342 36458 : self->pool = rm_util_thread_pool_new((GFunc)rm_mds_factory, self, max_threads);
343 :
344 36458 : if (!mount_table && !fake_disk) {
345 0 : self->mount_table = rm_mounts_table_new(FALSE);
346 : } else {
347 36458 : self->mount_table = mount_table;
348 : }
349 :
350 36458 : self->fake_disk = fake_disk;
351 36458 : self->disks = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
352 : (GDestroyNotify)rm_mds_device_free);
353 36458 : self->running = FALSE;
354 :
355 36458 : return self;
356 : }
357 :
358 36458 : void rm_mds_configure(RmMDS *self,
359 : const RmMDSFunc func,
360 : const gpointer user_data,
361 : const gint pass_quota,
362 : RmMDSSortFunc prioritiser) {
363 36458 : g_assert(self->running == FALSE);
364 36458 : self->func = func;
365 36458 : self->user_data = user_data;
366 36458 : self->pass_quota = (pass_quota>0) ? pass_quota : G_MAXINT;
367 36458 : self->prioritiser = prioritiser;
368 36458 : }
369 :
370 36458 : void rm_mds_finish(RmMDS *mds) {
371 : /* wait for any pending threads to finish */
372 109120 : while(g_atomic_int_get(&mds->pending_tasks)) {
373 : /* make this threadsafe for rare cases where new disks may
374 : * be encountered and spawned on-the-fly via rm_mds_factory*/
375 36204 : g_mutex_lock(&mds->lock);
376 36204 : GList *devices = g_hash_table_get_values(mds->disks);
377 36204 : g_mutex_unlock(&mds->lock);
378 :
379 110904 : while(devices) {
380 38496 : RmMDSDevice *device = devices->data;
381 38496 : rm_log_debug_line("Finishing device %lu", device->disk);
382 38496 : rm_mds_device_finish(device);
383 38496 : devices = g_list_delete_link(devices, devices);
384 : }
385 : }
386 36458 : mds->running = FALSE;
387 36458 : }
388 :
389 36458 : void rm_mds_free(RmMDS *mds, gboolean free_mount_table) {
390 36458 : rm_mds_finish(mds);
391 :
392 36458 : g_thread_pool_free(mds->pool, false, true);
393 36458 : g_hash_table_destroy(mds->disks);
394 :
395 36458 : if(free_mount_table && mds->mount_table) {
396 0 : rm_mounts_table_destroy(mds->mount_table);
397 : }
398 36458 : g_slice_free(RmMDS, mds);
399 36458 : }
400 :
401 0 : void rm_mds_ref_path(RmMDS *mds, const char *path, const gint ref_count) {
402 0 : RmMDSDevice *device = rm_mds_device_get_by_path(mds, path);
403 0 : rm_mds_device_ref(device, ref_count);
404 0 : }
405 :
406 444978 : void rm_mds_ref_dev(RmMDS *mds, dev_t dev, const gint ref_count) {
407 444978 : RmMDSDevice *device = rm_mds_device_get_by_dev(mds, dev, NULL);
408 444978 : rm_mds_device_ref(device, ref_count);
409 444978 : }
410 :
411 221829 : static void rm_mds_push_new_task(RmMDSDevice *device, const dev_t dev, const guint64 offset,
412 : const gpointer task_user_data) {
413 221829 : g_atomic_int_inc(&device->mds->pending_tasks);
414 :
415 221829 : RmMDSTask *task = rm_mds_task_new(dev, offset, task_user_data);
416 221829 : rm_mds_push_task(device, task);
417 221829 : }
418 :
419 0 : dev_t rm_mds_dev(const char *path) {
420 : RmStat stat_buf;
421 0 : if(rm_sys_stat(path, &stat_buf) == -1) {
422 0 : return 0;
423 : }
424 0 : return stat_buf.st_dev;
425 : }
426 :
427 221829 : void rm_mds_push_task_by_dev(RmMDS *mds, const dev_t dev, gint64 offset, const char *path,
428 : const gpointer task_user_data) {
429 435765 : bool is_rotational = (mds->fake_disk) ? (dev %2 == 0) :
430 213936 : !rm_mounts_is_nonrotational(mds->mount_table, dev);
431 221829 : if(offset == -1 && path && is_rotational) {
432 0 : offset = rm_offset_get_from_path(path, 0, NULL);
433 : }
434 221829 : RmMDSDevice *device = rm_mds_device_get_by_dev(mds, dev, path);
435 221829 : rm_mds_push_new_task(device, dev, offset, task_user_data);
436 221829 : }
437 :
438 0 : void rm_mds_push_task_by_path(RmMDS *mds, const char *path, gint64 offset,
439 : const gpointer task_user_data) {
440 0 : rm_mds_push_task_by_dev(mds, rm_mds_dev(path), offset, path, task_user_data);
441 0 : }
442 :
443 : /**
444 : * @brief prioritiser function for basic elevator algorithm
445 : **/
446 434694 : gint rm_mds_elevator_cmp(const RmMDSTask *task_a, const RmMDSTask *task_b) {
447 869388 : return (2 * SIGN_DIFF(task_a->dev, task_b->dev) +
448 434694 : 1 * SIGN_DIFF(task_a->offset, task_b->offset));
449 : }
|