compilelock.py 14 KB
Newer Older
Larkin Heintzman's avatar
Larkin Heintzman committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
# Locking mechanism to ensure no two compilations occur simultaneously
# in the same compilation directory (which can cause crashes).
from __future__ import absolute_import, print_function, division

import atexit
import os
import socket  # only used for gethostname()
import time
import logging
from six import PY3

from contextlib import contextmanager

import numpy as np

from theano import config

random = np.random.RandomState([2015, 8, 2])

_logger = logging.getLogger("theano.gof.compilelock")
# If the user provided a logging level, we don't want to override it.
if _logger.level == logging.NOTSET:
    # INFO will show the "Refreshing lock" messages
    _logger.setLevel(logging.INFO)

hostname = socket.gethostname()


def force_unlock():
    """
    Delete the compilation lock if someone else has it.

    """
    get_lock(min_wait=0, max_wait=0.001, timeout=0)
    release_lock()


@contextmanager
def lock_ctx(lock_dir=None, keep_lock=False, **kw):
    get_lock(lock_dir=lock_dir, **kw)
    yield
    if not keep_lock:
        release_lock()


# We define this name with an underscore so that python shutdown
# deletes this before non-underscore names (like os).  We need to do
# it this way to avoid errors on shutdown.
def _get_lock(lock_dir=None, **kw):
    """
    Obtain lock on compilation directory.

    Parameters
    ----------
    kw
        Additional arguments to be forwarded to the `lock` function when
        acquiring the lock.

    Notes
    -----
    We can lock only on 1 directory at a time.

    """
    if lock_dir is None:
        lock_dir = os.path.join(config.compiledir, 'lock_dir')
    if not hasattr(get_lock, 'n_lock'):
        # Initialization.
        get_lock.n_lock = 0
        if not hasattr(get_lock, 'lock_is_enabled'):
            # Enable lock by default.
            get_lock.lock_is_enabled = True
        get_lock.lock_dir = lock_dir
        get_lock.unlocker = Unlocker(get_lock.lock_dir)
    else:
        if lock_dir != get_lock.lock_dir:
            # Compilation directory has changed.
            # First ensure all old locks were released.
            assert get_lock.n_lock == 0
            # Update members for new compilation directory.
            get_lock.lock_dir = lock_dir
            get_lock.unlocker = Unlocker(get_lock.lock_dir)

    if get_lock.lock_is_enabled:
        # Only really try to acquire the lock if we do not have it already.
        if get_lock.n_lock == 0:
            lock(get_lock.lock_dir, **kw)
            atexit.register(Unlocker.unlock, get_lock.unlocker)
            # Store time at which the lock was set.
            get_lock.start_time = time.time()
        else:
            # Check whether we need to 'refresh' the lock. We do this
            # every 'config.compile.timeout / 2' seconds to ensure
            # no one else tries to override our lock after their
            # 'config.compile.timeout' timeout period.
            if get_lock.start_time is None:
                # This should not happen. So if this happen, clean up
                # the lock state and raise an error.
                while get_lock.n_lock > 0:
                    release_lock()
                raise Exception("For some unknow reason, the lock was already "
                                "taken, but no start time was registered.")
            now = time.time()
            if now - get_lock.start_time > config.compile.timeout / 2:
                lockpath = os.path.join(get_lock.lock_dir, 'lock')
                _logger.info('Refreshing lock %s', str(lockpath))
                refresh_lock(lockpath)
                get_lock.start_time = now
    get_lock.n_lock += 1


get_lock = _get_lock


def release_lock():
    """
    Release lock on compilation directory.

    """
    get_lock.n_lock -= 1
    assert get_lock.n_lock >= 0
    # Only really release lock once all lock requests have ended.
    if get_lock.lock_is_enabled and get_lock.n_lock == 0:
        get_lock.start_time = None
        get_lock.unlocker.unlock(force=False)


def set_lock_status(use_lock):
    """
    Enable or disable the lock on the compilation directory (which is enabled
    by default). Disabling may make compilation slightly faster (but is not
    recommended for parallel execution).

    Parameters
    ----------
    use_lock : bool
        Whether to use the compilation lock or not.

    """
    get_lock.lock_is_enabled = use_lock

# This is because None is a valid input for timeout
notset = object()


def lock(tmp_dir, timeout=notset, min_wait=None, max_wait=None, verbosity=1):
    """
    Obtain lock access by creating a given temporary directory (whose base will
    be created if needed, but will not be deleted after the lock is removed).
    If access is refused by the same lock owner during more than 'timeout'
    seconds, then the current lock is overridden. If timeout is None, then no
    timeout is performed.

    The lock is performed by creating a 'lock' file in 'tmp_dir' that contains
    a unique id identifying the owner of the lock (the process id, followed by
    a random string).

    When there is already a lock, the process sleeps for a random amount of
    time between min_wait and max_wait seconds before trying again.

    If 'verbosity' is >= 1, then a message will be displayed when we need to
    wait for the lock. If it is set to a value >1, then this message will be
    displayed each time we re-check for the presence of the lock. Otherwise it
    is displayed only when we notice the lock's owner has changed.

    Parameters
    ----------
    tmp_dir : str
        Lock directory that will be created when acquiring the lock.
    timeout : int or None
        Time (in seconds) to wait before replacing an existing lock (default
        config 'compile.timeout').
    min_wait: int
        Minimum time (in seconds) to wait before trying again to get the lock
        (default config 'compile.wait').
    max_wait: int
        Maximum time (in seconds) to wait before trying again to get the lock
        (default 2 * min_wait).
    verbosity : int
        Amount of feedback displayed to screen (default 1).

    """
    if min_wait is None:
        min_wait = config.compile.wait
    if max_wait is None:
        max_wait = min_wait * 2
    if timeout is notset:
        timeout = config.compile.timeout
    # Create base of lock directory if required.
    base_lock = os.path.dirname(tmp_dir)
    if not os.path.isdir(base_lock):
        try:
            os.makedirs(base_lock)
        except OSError:
            # Someone else was probably trying to create it at the same time.
            # We wait two seconds just to make sure the following assert does
            # not fail on some NFS systems.
            time.sleep(2)
    assert os.path.isdir(base_lock)

    # Variable initialization.
    lock_file = os.path.join(tmp_dir, 'lock')
    my_pid = os.getpid()
    no_display = (verbosity == 0)

    nb_error = 0
    # The number of time we sleep when their is no errors.
    # Used to don't display it the first time to display it less frequently.
    # And so don't get as much email about this!
    nb_wait = 0
    # Acquire lock.
    while True:
        try:
            last_owner = 'no_owner'
            time_start = time.time()
            other_dead = False
            while os.path.isdir(tmp_dir):
                try:
                    with open(lock_file) as f:
                        read_owner = f.readlines()[0].strip()

                    # The try is transition code for old locks.
                    # It may be removed when people have upgraded.
                    try:
                        other_host = read_owner.split('_')[2]
                    except IndexError:
                        other_host = ()  # make sure it isn't equal to any host
                    if other_host == hostname:
                        try:
                            # Just check if the other process still exist.
                            os.kill(int(read_owner.split('_')[0]), 0)
                        except OSError:
                            other_dead = True
                        except AttributeError:
                            pass  # os.kill does not exist on windows
                except Exception:
                    read_owner = 'failure'
                if other_dead:
                    if not no_display:
                        msg = "process '%s'" % read_owner.split('_')[0]
                        _logger.warning("Overriding existing lock by dead %s "
                                        "(I am process '%s')", msg, my_pid)
                    get_lock.unlocker.unlock(force=True)
                    continue
                if last_owner == read_owner:
                    if (timeout is not None and
                            time.time() - time_start >= timeout):
                        # Timeout exceeded or locking process dead.
                        if not no_display:
                            if read_owner == 'failure':
                                msg = 'unknown process'
                            else:
                                msg = "process '%s'" % read_owner.split('_')[0]
                            _logger.warning("Overriding existing lock by %s "
                                            "(I am process '%s')", msg, my_pid)
                        get_lock.unlocker.unlock(force=True)
                        continue
                else:
                    last_owner = read_owner
                    time_start = time.time()
                    no_display = (verbosity == 0)
                if not no_display and nb_wait > 0:
                    if read_owner == 'failure':
                        msg = 'unknown process'
                    else:
                        msg = "process '%s'" % read_owner.split('_')[0]
                    _logger.info("Waiting for existing lock by %s (I am "
                                 "process '%s')", msg, my_pid)
                    _logger.info("To manually release the lock, delete %s",
                                 tmp_dir)
                    if verbosity <= 1:
                        no_display = True
                nb_wait += 1
                time.sleep(random.uniform(min_wait, max_wait))

            if PY3:
                exception = FileExistsError  # noqa
            else:
                exception = OSError

            try:
                os.mkdir(tmp_dir)
            except exception:
                # Error while creating the directory: someone else
                # must have tried at the exact same time.
                nb_error += 1
                if nb_error < 10:
                    continue
                else:
                    raise
            # Safety check: the directory should be here.
            assert os.path.isdir(tmp_dir)

            # Write own id into lock file.
            unique_id = refresh_lock(lock_file)

            # Verify we are really the lock owner (this should not be needed,
            # but better be safe than sorry).
            with open(lock_file) as f:
                owner = f.readlines()[0].strip()

            if owner != unique_id:
                # Too bad, try again.
                continue
            else:
                # We got the lock, hoorray!
                return

        except Exception as e:
            # If something wrong happened, we try again.
            _logger.warning("Something wrong happened: %s %s", type(e), e)
            nb_error += 1
            if nb_error > 10:
                raise
            time.sleep(random.uniform(min_wait, max_wait))
            continue


def refresh_lock(lock_file):
    """
    'Refresh' an existing lock by re-writing the file containing the owner's
    unique id, using a new (randomly generated) id, which is also returned.

    """
    unique_id = '%s_%s_%s' % (
        os.getpid(),
        ''.join([str(random.randint(0, 9)) for i in range(10)]),
        hostname)
    try:
        with open(lock_file, 'w') as lock_write:
            lock_write.write(unique_id + '\n')
    except Exception:
        # In some strange case, this happen.  To prevent all tests
        # from failing, we release the lock, but as there is a
        # problem, we still keep the original exception.
        # This way, only 1 test would fail.
        while get_lock.n_lock > 0:
            release_lock()
        _logger.warn('Refreshing lock failed, we release the'
                     ' lock before raising again the exception')
        raise
    return unique_id


class Unlocker(object):
    """
    Class wrapper around release mechanism so that the lock is automatically
    released when the program exits (even when crashing or being interrupted),
    using the __del__ class method.

    """

    def __init__(self, tmp_dir):
        self.tmp_dir = tmp_dir

    def unlock(self, force=False):
        """
        Remove current lock.

        This function does not crash if it is unable to properly
        delete the lock file and directory. The reason is that it
        should be allowed for multiple jobs running in parallel to
        unlock the same directory at the same time (e.g. when reaching
        their timeout limit).

        """
        # If any error occurs, we assume this is because someone else tried to
        # unlock this directory at the same time.
        # Note that it is important not to have both remove statements within
        # the same try/except block. The reason is that while the attempt to
        # remove the file may fail (e.g. because for some reason this file does
        # not exist), we still want to try and remove the directory.

        # Check if someone else didn't took our lock.
        lock_file = os.path.join(self.tmp_dir, 'lock')
        if not force:
            try:
                with open(lock_file) as f:
                    owner = f.readlines()[0].strip()
                    pid, _, hname = owner.split('_')
                    if pid != str(os.getpid()) or hname != hostname:
                        return
            except Exception:
                pass

        try:
            os.remove(lock_file)
        except Exception:
            pass
        try:
            os.rmdir(self.tmp_dir)
        except Exception:
            pass