LCOV - code coverage report
Current view: top level - lib - md-scheduler.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 160 181 88.4 %
Date: 2015-09-30 14:09:30 Functions: 22 27 81.5 %

          Line data    Source code
       1             : /*
       2             :  *  This file is part of rmlint.
       3             :  *
       4             :  *  rmlint is free software: you can redistribute it and/or modify
       5             :  *  it under the terms of the GNU General Public License as published by
       6             :  *  the Free Software Foundation, either version 3 of the License, or
       7             :  *  (at your option) any later version.
       8             :  *
       9             :  *  rmlint is distributed in the hope that it will be useful,
      10             :  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      12             :  *  GNU General Public License for more details.
      13             :  *
      14             :  *  You should have received a copy of the GNU General Public License
      15             :  *  along with rmlint.  If not, see <http://www.gnu.org/licenses/>.
      16             :  *
      17             :  * Authors:
      18             :  *
      19             :  *  - Christopher <sahib> Pahl 2010-2015 (https://github.com/sahib)
      20             :  *  - Daniel <SeeSpotRun> T.   2014-2015 (https://github.com/SeeSpotRun)
      21             :  *
      22             :  * Hosted on http://github.com/sahib/rmlint
      23             :  *
      24             :  */
      25             : 
      26             : #include "md-scheduler.h"
      27             : 
      28             : /* handy for comparing 64-bit integers and returning int */
      29             : #define SIGN_DIFF(X, Y) (((X) > (Y)) - ((X) < (Y)))
      30             : 
      31             : /* How many milliseconds to sleep if we encounter an empty file queue.
      32             :  * This prevents a "starving" RmShredDevice from hogging cpu and cluttering up
      33             :  * debug messages by continually recycling back to the joiner.
      34             :  */
      35             : #if _RM_MDS_DEBUG
      36             : #define MDS_EMPTYQUEUE_SLEEP_US (60 * 1000 * 1000) /* 60 seconds */
      37             : #else
      38             : #define MDS_EMPTYQUEUE_SLEEP_US (50 * 1000) /* 0.05 second */
      39             : #endif
      40             : 
      41             : ///////////////////////////////////////
      42             : //            Structures             //
      43             : ///////////////////////////////////////
      44             : 
      45             : struct _RmMDS {
      46             :     /* Structure for RmMDS object/session */
      47             : 
      48             :     /* The function called for each task */
      49             :     RmMDSFunc func;
      50             : 
      51             :     /* Threadpool for device workers */
      52             :     GThreadPool *pool;
      53             : 
      54             :     /* Sorting function for device task queues */
      55             :     RmMDSSortFunc prioritiser;
      56             : 
      57             :     /* Mounts table for grouping dev's by physical devices
      58             :      * and identifying rotationality */
      59             :     RmMountTable *mount_table;
      60             : 
      61             :     /* If true then don't use mount table; interpret user-supplied dev as disk id */
      62             :     bool fake_disk;
      63             : 
      64             :     /* Table of physical disk/devices */
      65             :     GHashTable *disks;
      66             : 
      67             :     /* Lock for access to:
      68             :      *  self->disks
      69             :      */
      70             :     GMutex lock;
      71             : 
      72             :     /* flag for whether threadpool is running */
      73             :     gboolean running;
      74             : 
      75             :     /* quota to limit number of tasks per pass of each device */
      76             :     gint pass_quota;
      77             : 
      78             :     /* pointer to user data to be passed to func */
      79             :     gpointer user_data;
      80             : 
      81             :     /* number of pending tasks */
      82             :     guint pending_tasks;
      83             : 
      84             : };
      85             : 
      86             : typedef struct RmMDSDevice {
      87             :     /* Structure containing data associated with one Device worker thread */
      88             : 
      89             :     /* The RmMDS session parent */
      90             :     RmMDS *mds;
      91             : 
      92             :     /* Device's physical disk ID (only used for debug info) */
      93             :     dev_t disk;
      94             : 
      95             :     /* Sorted list of tasks queued for execution */
      96             :     GSList *sorted_tasks;
      97             : 
      98             :     /* Stack for tasks that will be sorted and carried out next pass */
      99             :     GSList *unsorted_tasks;
     100             : 
     101             :     /* Lock for access to:
     102             :      *  self->sorted_tasks
     103             :      *  self->unsorted_tasks
     104             :      *  self->ref_count
     105             :      */
     106             :     GMutex lock;
     107             :     GCond cond;
     108             : 
     109             :     /* Reference count for self */
     110             :     gint ref_count;
     111             : 
     112             :     /* is disk rotational? */
     113             :     gboolean is_rotational;
     114             : 
     115             : } RmMDSDevice;
     116             : 
     117             : //////////////////////////////////////////////
     118             : //  Internal Structure Init's & Destroyers  //
     119             : //////////////////////////////////////////////
     120             : 
     121             : /* RmMDSTask */
     122      221829 : static RmMDSTask *rm_mds_task_new(const dev_t dev, const guint64 offset,
     123             :                                   const gpointer task_data) {
     124      221829 :     RmMDSTask *self = g_slice_new0(RmMDSTask);
     125      221829 :     self->dev = dev;
     126      221829 :     self->offset = offset;
     127      221829 :     self->task_data = task_data;
     128      221829 :     return self;
     129             : }
     130             : 
     131      221829 : static void rm_mds_task_free(RmMDSTask *task) {
     132      221829 :     g_slice_free(RmMDSTask, task);
     133      221825 : }
     134             : 
     135             : /* RmMDSDevice */
     136       38638 : static RmMDSDevice *rm_mds_device_new(RmMDS *mds, const dev_t disk) {
     137       38638 :     RmMDSDevice *self = g_slice_new0(RmMDSDevice);
     138             : 
     139       38638 :     g_mutex_init(&self->lock);
     140       38638 :     g_cond_init(&self->cond);
     141             : 
     142       38638 :     self->mds = mds;
     143       38638 :     self->ref_count = 0;
     144       38638 :     self->disk = disk;
     145             : 
     146       38638 :     if (mds->fake_disk) {
     147        3585 :         self->is_rotational = (disk % 2 == 0);
     148             :     } else {
     149       35053 :         self->is_rotational = !rm_mounts_is_nonrotational(mds->mount_table, disk);
     150             :     }
     151             : 
     152       38638 :     rm_log_debug_line("Created new RmMDSDevice for %srotational disk #%lu",
     153             :             self->is_rotational ? "" : "non-", (long unsigned)disk);
     154       38638 :     return self;
     155             : }
     156             : 
     157             : /** @brief  Wait for a RmMDSDevice to finish all tasks
     158             :  **/
     159       77134 : static void rm_mds_device_finish(RmMDSDevice *self) {
     160       77134 :     g_mutex_lock(&self->lock);
     161             :     {
     162      199003 :         while(self->ref_count > 0) {
     163       44735 :             g_cond_wait(&self->cond, &self->lock);
     164             :         }
     165             :     }
     166       77134 :     g_mutex_unlock(&self->lock);
     167       77134 : }
     168             : 
     169             : /** @brief  Free mem allocated to a RmMDSDevice
     170             :  **/
     171       38638 : static void rm_mds_device_free(RmMDSDevice *self) {
     172       38638 :     rm_mds_device_finish(self);
     173             : 
     174       38638 :     g_mutex_clear(&self->lock);
     175       38638 :     g_cond_clear(&self->cond);
     176       38638 :     g_slice_free(RmMDSDevice, self);
     177       38638 : }
     178             : 
     179             : ///////////////////////////////////////
     180             : //    RmMDSDevice Implementation   //
     181             : ///////////////////////////////////////
     182             : 
     183             : /** @brief Mutex-protected task popper
     184             :  **/
     185             : 
     186      359540 : static RmMDSTask *rm_mds_pop_task(RmMDSDevice *device) {
     187      359540 :     RmMDSTask *task = NULL;
     188             : 
     189      359540 :     g_mutex_lock(&device->lock);
     190             :     {
     191      359546 :         if(device->sorted_tasks) {
     192      269809 :             task = device->sorted_tasks->data;
     193      269805 :             device->sorted_tasks =
     194      269809 :                 g_slist_delete_link(device->sorted_tasks, device->sorted_tasks);
     195      269805 :             device->ref_count--;
     196             :         }
     197             :     }
     198      359542 :     g_mutex_unlock(&device->lock);
     199      359554 :     return task;
     200             : }
     201             : 
     202             : /** @brief Mutex-protected task pusher
     203             :  **/
     204             : 
     205      269818 : static void rm_mds_push_task(RmMDSDevice *device, RmMDSTask *task) {
     206      269818 :     g_mutex_lock(&device->lock);
     207             :     {
     208      269818 :         device->unsorted_tasks =
     209      269818 :                 g_slist_prepend(device->unsorted_tasks, task);
     210      269818 :         device->ref_count++;
     211      269818 :         g_cond_signal(&device->cond);
     212             :     }
     213      269818 :     g_mutex_unlock(&device->lock);
     214      269818 : }
     215             : 
     216             : /** @brief GCompareDataFunc wrapper for mds->prioritiser
     217             :  **/
     218      434703 : static gint rm_mds_compare(const RmMDSTask *a, const RmMDSTask *b,
     219             :                            RmMDSSortFunc prioritiser) {
     220      434703 :     gint result = prioritiser(a, b);
     221      434688 :     return result;
     222             : }
     223             : 
     224             : /** @brief RmMDSDevice worker thread
     225             :  **/
     226       89737 : static void rm_mds_factory(RmMDSDevice *device, RmMDS *mds) {
     227             :     /* rm_mds_factory processes tasks from device->task_list.
     228             :      * After completing one pass of the device, returns self to the
     229             :      * mds->pool threadpool. */
     230       89737 :     gint quota = mds->pass_quota;
     231             : 
     232       89737 :     g_mutex_lock(&device->lock);
     233             :     {
     234             :         /* check for empty queues - if so then wait a little while before giving up */
     235       89735 :         if(!device->sorted_tasks && !device->unsorted_tasks && device->ref_count > 0) {
     236             :             /* timed wait for signal from rm_mds_push_task() */
     237       32924 :             gint64 end_time = g_get_monotonic_time() + MDS_EMPTYQUEUE_SLEEP_US;
     238       32924 :             g_cond_wait_until(&device->cond, &device->lock, end_time);
     239             :         }
     240             : 
     241             :         /* merge and sort task lists */
     242       89720 :         device->sorted_tasks =
     243       89728 :             g_slist_concat(device->unsorted_tasks, device->sorted_tasks);
     244       89720 :         device->unsorted_tasks = NULL;
     245       89720 :         if(mds->prioritiser) {
     246       89733 :             device->sorted_tasks = g_slist_sort_with_data(
     247             :                 device->sorted_tasks, (GCompareDataFunc)rm_mds_compare,
     248       89733 :                 (RmMDSSortFunc)mds->prioritiser);
     249             :         }
     250             :     }
     251       89699 :     g_mutex_unlock(&device->lock);
     252             : 
     253             :     /* process tasks from device->sorted_tasks */
     254       89731 :     RmMDSTask *task = NULL;
     255      449275 :     while(quota > 0 &&
     256             :           (task = rm_mds_pop_task(device))) {
     257      269813 :         if ( mds->func(task->task_data, mds->user_data) ) {
     258             :             /* task succeeded */
     259      221828 :             rm_mds_task_free(task);
     260             :             /* decrement counters */
     261      221829 :             --quota;
     262      221829 :             g_atomic_int_dec_and_test(&device->mds->pending_tasks);
     263             :         } else {
     264             :             /* task failed; push it back to device->unsorted_tasks */
     265       47989 :             rm_mds_push_task(device, task);
     266             :         }
     267             :     }
     268             : 
     269       89737 :     if(g_atomic_int_get(&device->ref_count) > 0) {
     270             :         /* return self to pool for further processing */
     271       51105 :         rm_util_thread_pool_push(mds->pool, device);
     272             :     } else {
     273             :         /* signal to rm_mds_device_free() */
     274       38632 :         g_cond_signal(&device->cond);
     275             :     }
     276       89737 : }
     277             : 
     278             : /** @brief Increase or decrease RmMDSDevice reference count
     279             :  **/
     280      444978 : static void rm_mds_device_ref(RmMDSDevice *device, const gint ref_count) {
     281      444978 :     g_mutex_lock(&device->lock);
     282      444978 :     { device->ref_count += ref_count; }
     283      444978 :     g_mutex_unlock(&device->lock);
     284      444978 : }
     285             : 
     286             : /** @brief Push a RmMDSDevice to the threadpool
     287             :  **/
     288       38638 : void rm_mds_device_start(_U  gpointer disk, RmMDSDevice *device, RmMDS *mds) {
     289       38638 :     rm_util_thread_pool_push(mds->pool, device);
     290       38638 : }
     291             : 
     292             : /** @brief Push a RmMDSDevice to the threadpool
     293             :  **/
     294       36458 : void rm_mds_start(RmMDS *mds) {
     295       36458 :     mds->running = TRUE;
     296       36458 :     g_hash_table_foreach(mds->disks, (GHFunc)rm_mds_device_start, mds);
     297       36458 : }
     298             : 
     299           0 : RmMountTable *rm_mds_get_mount_table(const RmMDS *mds) {
     300           0 :     return mds->mount_table;
     301             : }
     302             : 
     303      666807 : static RmMDSDevice *rm_mds_device_get(RmMDS *mds, const dev_t disk) {
     304      666807 :     RmMDSDevice *result = NULL;
     305      666807 :     g_mutex_lock(&mds->lock);
     306             :     {
     307      666807 :         g_assert(mds->disks);
     308      666807 :         result = g_hash_table_lookup(mds->disks, GINT_TO_POINTER(disk));
     309      666807 :         if(!result) {
     310       38638 :             result = rm_mds_device_new(mds, disk);
     311       38638 :             g_hash_table_insert(mds->disks, GINT_TO_POINTER(disk), result);
     312       38638 :             if(g_atomic_int_get(&mds->running) == TRUE) {
     313           0 :                 rm_util_thread_pool_push(mds->pool, result);
     314             :             }
     315             :         }
     316             :     }
     317      666807 :     g_mutex_unlock(&mds->lock);
     318      666807 :     return result;
     319             : }
     320             : 
     321           0 : static RmMDSDevice *rm_mds_device_get_by_path(RmMDS *mds, const char *path) {
     322           0 :     dev_t disk = (mds->fake_disk) ? 0 :
     323           0 :             rm_mounts_get_disk_id_by_path(mds->mount_table, path);
     324           0 :     return rm_mds_device_get(mds, disk);
     325             : }
     326             : 
     327      666807 : static RmMDSDevice *rm_mds_device_get_by_dev(RmMDS *mds, dev_t dev, const char *path) {
     328     1309887 :     dev_t disk = (mds->fake_disk) ? dev :
     329      643080 :             rm_mounts_get_disk_id(mds->mount_table, dev, path);
     330      666807 :     return rm_mds_device_get(mds, disk);
     331             : }
     332             : 
     333             : //////////////////////////
     334             : //  API Implementation  //
     335             : //////////////////////////
     336             : 
     337       36458 : RmMDS *rm_mds_new(const gint max_threads, RmMountTable *mount_table, bool fake_disk) {
     338       36458 :     RmMDS *self = g_slice_new0(RmMDS);
     339             : 
     340       36458 :     g_mutex_init(&self->lock);
     341             : 
     342       36458 :     self->pool = rm_util_thread_pool_new((GFunc)rm_mds_factory, self, max_threads);
     343             : 
     344       36458 :     if (!mount_table && !fake_disk) {
     345           0 :         self->mount_table = rm_mounts_table_new(FALSE);
     346             :     } else {
     347       36458 :         self->mount_table = mount_table;
     348             :     }
     349             : 
     350       36458 :     self->fake_disk = fake_disk;
     351       36458 :     self->disks = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL,
     352             :                                         (GDestroyNotify)rm_mds_device_free);
     353       36458 :     self->running = FALSE;
     354             : 
     355       36458 :     return self;
     356             : }
     357             : 
     358       36458 : void rm_mds_configure(RmMDS *self,
     359             :                       const RmMDSFunc func,
     360             :                       const gpointer user_data,
     361             :                       const gint pass_quota,
     362             :                       RmMDSSortFunc prioritiser) {
     363       36458 :     g_assert(self->running == FALSE);
     364       36458 :     self->func = func;
     365       36458 :     self->user_data = user_data;
     366       36458 :     self->pass_quota = (pass_quota>0) ? pass_quota : G_MAXINT;
     367       36458 :     self->prioritiser = prioritiser;
     368       36458 : }
     369             : 
     370       36458 : void rm_mds_finish(RmMDS *mds) {
     371             :     /* wait for any pending threads to finish */
     372      109120 :     while(g_atomic_int_get(&mds->pending_tasks)) {
     373             :         /* make this threadsafe for rare cases where new disks may
     374             :          * be encountered and spawned on-the-fly via rm_mds_factory*/
     375       36204 :         g_mutex_lock(&mds->lock);
     376       36204 :         GList *devices = g_hash_table_get_values(mds->disks);
     377       36204 :         g_mutex_unlock(&mds->lock);
     378             : 
     379      110904 :         while(devices) {
     380       38496 :             RmMDSDevice *device = devices->data;
     381       38496 :             rm_log_debug_line("Finishing device %lu", device->disk);
     382       38496 :             rm_mds_device_finish(device);
     383       38496 :             devices = g_list_delete_link(devices, devices);
     384             :         }
     385             :     }
     386       36458 :     mds->running = FALSE;
     387       36458 : }
     388             : 
     389       36458 : void rm_mds_free(RmMDS *mds, gboolean free_mount_table) {
     390       36458 :     rm_mds_finish(mds);
     391             : 
     392       36458 :     g_thread_pool_free(mds->pool, false, true);
     393       36458 :     g_hash_table_destroy(mds->disks);
     394             : 
     395       36458 :     if(free_mount_table && mds->mount_table) {
     396           0 :         rm_mounts_table_destroy(mds->mount_table);
     397             :     }
     398       36458 :     g_slice_free(RmMDS, mds);
     399       36458 : }
     400             : 
     401           0 : void rm_mds_ref_path(RmMDS *mds, const char *path, const gint ref_count) {
     402           0 :     RmMDSDevice *device = rm_mds_device_get_by_path(mds, path);
     403           0 :     rm_mds_device_ref(device, ref_count);
     404           0 : }
     405             : 
     406      444978 : void rm_mds_ref_dev(RmMDS *mds, dev_t dev, const gint ref_count) {
     407      444978 :     RmMDSDevice *device = rm_mds_device_get_by_dev(mds, dev, NULL);
     408      444978 :     rm_mds_device_ref(device, ref_count);
     409      444978 : }
     410             : 
     411      221829 : static void rm_mds_push_new_task(RmMDSDevice *device, const dev_t dev, const guint64 offset,
     412             :                              const gpointer task_user_data) {
     413      221829 :     g_atomic_int_inc(&device->mds->pending_tasks);
     414             : 
     415      221829 :     RmMDSTask *task = rm_mds_task_new(dev, offset, task_user_data);
     416      221829 :     rm_mds_push_task(device, task);
     417      221829 : }
     418             : 
     419           0 : dev_t rm_mds_dev(const char *path) {
     420             :     RmStat stat_buf;
     421           0 :     if(rm_sys_stat(path, &stat_buf) == -1) {
     422           0 :         return 0;
     423             :     }
     424           0 :     return stat_buf.st_dev;
     425             : }
     426             : 
     427      221829 : void rm_mds_push_task_by_dev(RmMDS *mds, const dev_t dev, gint64 offset, const char *path,
     428             :                              const gpointer task_user_data) {
     429      435765 :     bool is_rotational = (mds->fake_disk) ? (dev %2 == 0) :
     430      213936 :             !rm_mounts_is_nonrotational(mds->mount_table, dev);
     431      221829 :     if(offset == -1 && path && is_rotational) {
     432           0 :         offset = rm_offset_get_from_path(path, 0, NULL);
     433             :     }
     434      221829 :     RmMDSDevice *device = rm_mds_device_get_by_dev(mds, dev, path);
     435      221829 :     rm_mds_push_new_task(device, dev, offset, task_user_data);
     436      221829 : }
     437             : 
     438           0 : void rm_mds_push_task_by_path(RmMDS *mds, const char *path, gint64 offset,
     439             :                               const gpointer task_user_data) {
     440           0 :     rm_mds_push_task_by_dev(mds, rm_mds_dev(path), offset, path, task_user_data);
     441           0 : }
     442             : 
     443             : /**
     444             :  * @brief prioritiser function for basic elevator algorithm
     445             :  **/
     446      434694 : gint rm_mds_elevator_cmp(const RmMDSTask *task_a, const RmMDSTask *task_b) {
     447      869388 :     return (2 * SIGN_DIFF(task_a->dev, task_b->dev) +
     448      434694 :             1 * SIGN_DIFF(task_a->offset, task_b->offset));
     449             : }

Generated by: LCOV version 1.11