Как ограничить время выполнения вызова функции в Python

В моем коде есть вызов функции сокетной функции, эта функция находится из другого модуля, поэтому из-под моего контроля проблема состоит в том, что она блокирует время от времени, что совершенно неприемлемо. Как ограничить время выполнения функции из моего кода? Я предполагаю, что решение должно использовать другой поток.

  • Ошибка Python3: initial_value должно быть str или None
  • Scikit Learn Gaussian HMM: ValueError: startprob должен суммироваться до 1.0
  • Автоматически вводить параметры ввода в Python
  • py2app подбирает .git subdir пакета во время сборки
  • Как сделать шахматную доску в numpy?
  • Как удалить дубликаты из списка Python и сохранить порядок?
  • Часто повторяется try / except в Python
  • Построение категориальных данных с помощью pandas и matplotlib
  • 9 Solutions collect form web for “Как ограничить время выполнения вызова функции в Python”

    Я не уверен, насколько кросс-платформенный это может быть, но использование сигналов и сигналов тревоги может быть хорошим способом взглянуть на это. С небольшой работой вы могли бы сделать это полностью общим, а также использовать в любой ситуации.

    http://docs.python.org/library/signal.html

    Таким образом, ваш код будет выглядеть примерно так.

    import signal def signal_handler(signum, frame): raise Exception("Timed out!") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(10) # Ten seconds try: long_function_call() except Exception, msg: print "Timed out!" 

    Улучшение ответа @ rik.the.vik было бы использовать оператор with чтобы дать функции тайм-аута синтаксического сахара:

     import signal from contextlib import contextmanager class TimeoutException(Exception): pass @contextmanager def time_limit(seconds): def signal_handler(signum, frame): raise TimeoutException("Timed out!") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0) try: with time_limit(10): long_function_call() except TimeoutException as e: print("Timed out!") 

    Вот способ Linux / OSX ограничить время работы функции. Это на случай, если вы не хотите использовать потоки и хотите, чтобы ваша программа дождалась завершения функции или истечения срока.

     from multiprocessing import Process from time import sleep def f(time): sleep(time) def run_with_limited_time(func, args, kwargs, time): """Runs a function with time limit :param func: The function to run :param args: The functions args, given as tuple :param kwargs: The functions keywords, given as dict :param time: The time limit in seconds :return: True if the function ended successfully. False if it was terminated. """ p = Process(target=func, args=args, kwargs=kwargs) p.start() p.join(time) if p.is_alive(): p.terminate() return False return True if __name__ == '__main__': print run_with_limited_time(f, (1.5, ), {}, 2.5) # True print run_with_limited_time(f, (3.5, ), {}, 2.5) # False 

    Выполнение этого из обработчика сигнала опасно: вы можете быть внутри обработчика исключений во время поднятия исключения и оставить вещи в разбитом состоянии. Например,

     def function_with_enforced_timeout(): f = open_temporary_file() try: ... finally: here() unlink(f.filename) 

    Если ваше исключение вызывается здесь (), временный файл никогда не будет удален.

    Решение здесь заключается в том, что асинхронные исключения откладываются до тех пор, пока код не будет находиться внутри кода обработки исключений (исключающий или окончательный блок), но Python этого не делает.

    Обратите внимание, что это не будет прерывать что-либо во время выполнения собственного кода; он только прервет его, когда функция вернется, поэтому это может не помочь этому конкретному случаю. (Сам SIGALRM может прервать блокировку вызова – но код сокета обычно просто повторяет попытку после EINTR.)

    Выполнение этого с помощью потоков – лучшая идея, поскольку она более переносима, чем сигналы. Поскольку вы начинаете рабочий поток и блокируете его до тех пор, пока он не завершится, нет никаких обычных проблем с параллелизмом. К сожалению, нет способа сделать асинхронное исключение для другого потока в Python (это могут использовать другие API-интерфейсы нитей). Это также будет иметь ту же проблему с отправкой исключения во время обработчика исключений и требует того же исправления.

    Единственным «безопасным» способом сделать это на любом языке является использование вторичного процесса для выполнения этого тайм-аута, в противном случае вам нужно построить свой код таким образом, чтобы он безопасно выполнял бы сам по себе, например, проверяя время, прошедшее в цикле или аналогичном. Если изменение метода не является вариантом, поток будет недостаточным.

    Зачем? Потому что вы рискуете оставить вещи в плохом состоянии, когда вы это делаете. Если поток просто убит средним методом, удерживаемые блокировки и т. Д. Будут только удерживаться и не могут быть выпущены.

    Так что посмотрите на процесс, не смотрите на поток.

    Вам не нужно использовать потоки. Вы можете использовать другой процесс для выполнения блокировки, например, возможно, используя модуль подпроцесса . Если вы хотите разделить структуры данных между различными частями вашей программы, Twisted – отличная библиотека для того, чтобы дать вам контроль над этим, и я бы рекомендовал его, если вы заботитесь о блокировании и ожидаете, что у вас будет такая проблема. Плохая новость с Twisted – вам нужно переписать свой код, чтобы избежать блокировки, и есть справедливая кривая обучения.

    Вы можете использовать потоки, чтобы избежать блокировки, но я рассматривал бы это как последнее средство, так как оно подвергает вас целому миру боли. Прочтите хорошую книгу о параллелизме, прежде чем даже подумайте об использовании потоков в производстве, например, «Concurrent Systems» Жана Бэкона. Я работаю с кучей людей, которые действительно охлаждают высокопроизводительные материалы с помощью потоков, и мы не вводим темы в проекты, если они нам действительно не нужны.

    Я предпочитаю подход к контекстному менеджеру, поскольку он позволяет выполнять несколько операторов python внутри a with time_limit инструкции with time_limit . Поскольку в системе Windows нет SIGALARM , более портативный и, возможно, более простой способ может использовать Timer

     from contextlib import contextmanager import threading import _thread class TimeoutException(Exception): def __init__(self, msg=''): self.msg = msg @contextmanager def time_limit(seconds, msg=''): timer = threading.Timer(seconds, lambda: _thread.interrupt_main()) timer.start() try: yield except KeyboardInterrupt: raise TimeoutException("Timed out for operation {}".format(msg)) finally: # if the action ends in specified time, timer is canceled timer.cancel() import time # ends after 5 seconds with time_limit(5, 'sleep'): for i in range(10): time.sleep(1) # this will actually end after 10 seconds with time_limit(5, 'sleep'): time.sleep(10) 

    Ключевым приемом здесь является использование _thread.interrupt_main для прерывания основного потока из потока таймера. Одно из предостережений заключается в том, что основной поток не всегда быстро реагирует на KeyboardInterrupt поднятый Timer . Например, time.sleep() вызывает системную функцию, поэтому KeyboardInterrupt будет обрабатываться после вызова sleep .

    Вот функция тайм-аута, которую, я думаю, я нашел через Google, и она работает для меня.

    От: http://code.activestate.com/recipes/473878/

     def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None): '''This function will spwan a thread and run the given function using the args, kwargs and return the given default value if the timeout_duration is exceeded ''' import threading class InterruptableThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.result = default def run(self): try: self.result = func(*args, **kwargs) except: self.result = default it = InterruptableThread() it.start() it.join(timeout_duration) if it.isAlive(): return it.result else: return it.result 

    Обычно я предпочитаю использовать контекст-менеджер, как было предложено @ josh-lee

    Но если кто-то заинтересован в том, чтобы это реализовано как декоратор, вот альтернатива.

    Вот как это будет выглядеть:

     import time from timeout import timeout class Test(object): @timeout(2) def test_a(self, foo, bar): print foo time.sleep(1) print bar return 'A Done' @timeout(2) def test_b(self, foo, bar): print foo time.sleep(3) print bar return 'B Done' t = Test() print t.test_a('python', 'rocks') print t.test_b('timing', 'out') 

    И это модуль timeout.py :

     import threading class TimeoutError(Exception): pass class InterruptableThread(threading.Thread): def __init__(self, func, *args, **kwargs): threading.Thread.__init__(self) self._func = func self._args = args self._kwargs = kwargs self._result = None def run(self): self._result = self._func(*self._args, **self._kwargs) @property def result(self): return self._result class timeout(object): def __init__(self, sec): self._sec = sec def __call__(self, f): def wrapped_f(*args, **kwargs): it = InterruptableThread(f, *args, **kwargs) it.start() it.join(self._sec) if not it.is_alive(): return it.result raise TimeoutError('execution expired') return wrapped_f 

    Выход:

     python rocks A Done timing Traceback (most recent call last): ... timeout.TimeoutError: execution expired out 

    Обратите внимание, что даже если TimeoutError будет брошен, декорированный метод будет продолжать работать в другом потоке. Если вы также хотите, чтобы этот поток был «остановлен», см.: Есть ли способ убить Thread в Python?

    Python - лучший язык программирования в мире.