Как лучше всего выполнять многопроцессорство в запросах с сервером Porn Tornado?

Я использую неблокирующий сервер Python I / O Tornado. У меня есть класс запросов GET которые могут занять значительное количество времени (подумайте в диапазоне 5-10 секунд). Проблема в том, что Tornado блокирует эти запросы, чтобы последующие быстрые запросы поддерживались до тех пор, пока не завершится медленный запрос.

Я просмотрел: https://github.com/facebook/tornado/wiki/Threading-and-concurrency и пришел к выводу, что мне нужна комбинация # 3 (другие процессы) и №4 (другие потоки). # 4 сам по себе имел проблемы, и я не смог получить надежный контроль обратно на ioloop, когда был другой поток, выполняющий «heavy_lifting». (Я предполагаю, что это было связано с GIL и тем фактом, что задача heavy_lifting имеет высокую нагрузку на процессор и продолжает отвлекать управление от основного ioloop, но это предположение).

Поэтому я проработал прототипом, как решить эту проблему, выполнив задачи «тяжелого подъема» в рамках этих медленных запросов GET в отдельном процессе, а затем поместив обратный вызов обратно в Tornado ioloop, когда процесс завершен, чтобы завершить запрос. Это освобождает ioloop для обработки других запросов.

Я создал простой пример, демонстрирующий возможное решение, но мне любопытно получить обратную связь от сообщества на нем.

Мой вопрос двоякий: как можно упростить этот нынешний подход? Какие подводные камни потенциально существуют с ним?

Подход

  1. Используйте встроенный asynchronous декоратор Tornado, который позволяет оставаться открытым, а ioloop – продолжать.

  2. Создайте отдельный процесс для задач «тяжелого подъема» с использованием multiprocessing модуля python. Сначала я попытался использовать модуль threading передачи, но не смог получить надежное отключение управления до ioloop. Также представляется, что mutliprocessing также будет использовать многоядерные процессоры.

  3. Начните поток «наблюдателя» в основном процессе ioloop, используя модуль threading передачи, который должен следить за multiprocessing.Queue Результат для выполнения задачи «тяжелого подъема» при ее завершении. Это было необходимо, потому что мне нужен был способ узнать, что задача heavy_lifting была завершена, хотя она все еще может уведомить ioloop о завершении этого запроса.

  4. Убедитесь, что поток «watcher» автоматически отключает управление основным циклом ioloop с time.sleep(0) чтобы другие запросы продолжали легко обрабатываться.

  5. Когда результат возникает в очереди, добавьте обратный вызов из потока «watcher», используя tornado.ioloop.IOLoop.instance().add_callback() который задокументирован как единственный безопасный способ вызова экземпляров ioloop из других потоков.

  6. Не забудьте затем вызвать finish() в обратном вызове, чтобы завершить запрос и передать ответ.

Ниже приведен пример кода, показывающего этот подход. multi_tornado.py – это сервер, реализующий вышеуказанный контур, а call_multi.py – образец сценария, который вызывает сервер двумя способами для тестирования сервера. Оба теста вызывают сервер с 3 медленными запросами GET за которыми следуют 20 быстрых запросов GET . Результаты показаны как для работы с включенным, так и без него.

В случае запуска его с «no threading» блок 3 медленных запросов (каждый занимает чуть больше секунды для завершения). Несколько из 20 быстрых запросов сжимаются между некоторыми медленными запросами внутри ioloop (не совсем уверен, как это происходит), но могут быть артефактом, когда я запускаю скрипт сервера и клиента на той же машине). Дело в том, что все быстрые запросы поддерживаются в различной степени.

В случае запуска с поточной поддержкой 20 быстрых запросов сразу заканчиваются первым, и три медленных запроса завершаются примерно в одно и то же время после того, как они выполнялись параллельно. Это желаемое поведение. Три медленных запроса занимают 2,5 секунды, чтобы завершить их параллельно – в то время как в несетевом случае три медленных запроса занимают около 3,5 секунд. Таким образом, около 35% ускоряется в целом (я предполагаю из-за многоядерного обмена). Но что более важно – быстрые запросы были немедленно обработаны в лею медленных.

У меня нет большого опыта в многопоточном программировании, поэтому, хотя это, похоже, работает здесь, мне любопытно узнать:

Есть ли более простой способ сделать это? Какие монстры могут скрываться в этом подходе?

(Примечание: будущий компромисс может заключаться в том, чтобы просто запускать больше экземпляров Tornado с обратным прокси-сервером, например nginx, выполняющим балансировку нагрузки. Независимо от того, что я буду запускать несколько экземпляров с балансировщиком нагрузки, – но меня беспокоит просто бросание аппаратного обеспечения по этой проблеме так как кажется, что аппаратное обеспечение так напрямую связано с проблемой с точки зрения блокировки.)

Образец кода

multi_tornado.py (пример сервера):

 import time import threading import multiprocessing import math from tornado.web import RequestHandler, Application, asynchronous from tornado.ioloop import IOLoop # run in some other process - put result in q def heavy_lifting(q): t0 = time.time() for k in range(2000): math.factorial(k) t = time.time() q.put(t - t0) # report time to compute in queue class FastHandler(RequestHandler): def get(self): res = 'fast result ' + self.get_argument('id') print res self.write(res) self.flush() class MultiThreadedHandler(RequestHandler): # Note: This handler can be called with threaded = True or False def initialize(self, threaded=True): self._threaded = threaded self._q = multiprocessing.Queue() def start_process(self, worker, callback): # method to start process and watcher thread self._callback = callback if self._threaded: # launch process multiprocessing.Process(target=worker, args=(self._q,)).start() # start watching for process to finish threading.Thread(target=self._watcher).start() else: # threaded = False just call directly and block worker(self._q) self._watcher() def _watcher(self): # watches the queue for process result while self._q.empty(): time.sleep(0) # relinquish control if not ready # put callback back into the ioloop so we can finish request response = self._q.get(False) IOLoop.instance().add_callback(lambda: self._callback(response)) class SlowHandler(MultiThreadedHandler): @asynchronous def get(self): # start a thread to watch for self.start_process(heavy_lifting, self._on_response) def _on_response(self, delta): _id = self.get_argument('id') res = 'slow result {} <--- {:0.3f} s'.format(_id, delta) print res self.write(res) self.flush() self.finish() # be sure to finish request application = Application([ (r"/fast", FastHandler), (r"/slow", SlowHandler, dict(threaded=False)), (r"/slow_threaded", SlowHandler, dict(threaded=True)), ]) if __name__ == "__main__": application.listen(8888) IOLoop.instance().start() 

call_multi.py (клиентский тестер):

 import sys from tornado.ioloop import IOLoop from tornado import httpclient def run(slow): def show_response(res): print res.body # make 3 "slow" requests on server requests = [] for k in xrange(3): uri = 'http://localhost:8888/{}?id={}' requests.append(uri.format(slow, str(k + 1))) # followed by 20 "fast" requests for k in xrange(20): uri = 'http://localhost:8888/fast?id={}' requests.append(uri.format(k + 1)) # show results as they return http_client = httpclient.AsyncHTTPClient() print 'Scheduling Get Requests:' print '------------------------' for req in requests: print req http_client.fetch(req, show_response) # execute requests on server print '\nStart sending requests....' IOLoop.instance().start() if __name__ == '__main__': scenario = sys.argv[1] if scenario == 'slow' or scenario == 'slow_threaded': run(scenario) 

Результаты теста

python call_multi.py slow (поведение блокировки):

 Scheduling Get Requests: ------------------------ http://localhost:8888/slow?id=1 http://localhost:8888/slow?id=2 http://localhost:8888/slow?id=3 http://localhost:8888/fast?id=1 http://localhost:8888/fast?id=2 http://localhost:8888/fast?id=3 http://localhost:8888/fast?id=4 http://localhost:8888/fast?id=5 http://localhost:8888/fast?id=6 http://localhost:8888/fast?id=7 http://localhost:8888/fast?id=8 http://localhost:8888/fast?id=9 http://localhost:8888/fast?id=10 http://localhost:8888/fast?id=11 http://localhost:8888/fast?id=12 http://localhost:8888/fast?id=13 http://localhost:8888/fast?id=14 http://localhost:8888/fast?id=15 http://localhost:8888/fast?id=16 http://localhost:8888/fast?id=17 http://localhost:8888/fast?id=18 http://localhost:8888/fast?id=19 http://localhost:8888/fast?id=20 Start sending requests.... slow result 1 <--- 1.338 s fast result 1 fast result 2 fast result 3 fast result 4 fast result 5 fast result 6 fast result 7 slow result 2 <--- 1.169 s slow result 3 <--- 1.130 s fast result 8 fast result 9 fast result 10 fast result 11 fast result 13 fast result 12 fast result 14 fast result 15 fast result 16 fast result 18 fast result 17 fast result 19 fast result 20 

python call_multi.py slow_threaded (желаемое поведение):

 Scheduling Get Requests: ------------------------ http://localhost:8888/slow_threaded?id=1 http://localhost:8888/slow_threaded?id=2 http://localhost:8888/slow_threaded?id=3 http://localhost:8888/fast?id=1 http://localhost:8888/fast?id=2 http://localhost:8888/fast?id=3 http://localhost:8888/fast?id=4 http://localhost:8888/fast?id=5 http://localhost:8888/fast?id=6 http://localhost:8888/fast?id=7 http://localhost:8888/fast?id=8 http://localhost:8888/fast?id=9 http://localhost:8888/fast?id=10 http://localhost:8888/fast?id=11 http://localhost:8888/fast?id=12 http://localhost:8888/fast?id=13 http://localhost:8888/fast?id=14 http://localhost:8888/fast?id=15 http://localhost:8888/fast?id=16 http://localhost:8888/fast?id=17 http://localhost:8888/fast?id=18 http://localhost:8888/fast?id=19 http://localhost:8888/fast?id=20 Start sending requests.... fast result 1 fast result 2 fast result 3 fast result 4 fast result 5 fast result 6 fast result 7 fast result 8 fast result 9 fast result 10 fast result 11 fast result 12 fast result 13 fast result 14 fast result 15 fast result 19 fast result 20 fast result 17 fast result 16 fast result 18 slow result 2 <--- 2.485 s slow result 3 <--- 2.491 s slow result 1 <--- 2.517 s 

Если вы хотите использовать concurrent.futures.ProcessPoolExecutor вместо multiprocessing , это на самом деле очень просто. Iloop Tornado уже поддерживает concurrent.futures.Future , поэтому они будут прекрасно играть вместе из коробки. concurrent.futures включен в Python 3.2+ и был отправлен обратно в Python 2.x.

Вот пример:

 import time from concurrent.futures import ProcessPoolExecutor from tornado.ioloop import IOLoop from tornado import gen def f(a, b, c, blah=None): print "got %s %s %s and %s" % (a, b, c, blah) time.sleep(5) return "hey there" @gen.coroutine def test_it(): pool = ProcessPoolExecutor(max_workers=1) fut = pool.submit(f, 1, 2, 3, blah="ok") # This returns a concurrent.futures.Future print("running it asynchronously") ret = yield fut print("it returned %s" % ret) pool.shutdown() IOLoop.instance().run_sync(test_it) 

Вывод:

 running it asynchronously got 1 2 3 and ok it returned hey there 

ProcessPoolExecutor имеет более ограниченный API, чем multiprocessing.Pool , но если вам не нужны более сложные функции multiprocessing.Pool , его стоит использовать, потому что интеграция намного проще.

multiprocessing.Pool может быть интегрирован в цикл ввода-вывода tornado , но это немного грязно. Более чистую интеграцию можно выполнить с помощью concurrent.futures (см. Мой другой ответ для деталей), но если вы застряли на Python 2.x и не можете установить concurrent.futures back, то можете сделать это строго используя multiprocessing :

У методов multiprocessing.Pool.apply_async и multiprocessing.Pool.map_async есть необязательный параметр callback , что означает, что оба они могут быть подключены к tornado.gen.Task . Поэтому в большинстве случаев асинхронный запуск асинхронного кода в подпроцессе так же просто:

 import multiprocessing import contextlib from tornado import gen from tornado.gen import Return from tornado.ioloop import IOLoop from functools import partial def worker(): print "async work here" @gen.coroutine def async_run(func, *args, **kwargs): result = yield gen.Task(pool.apply_async, func, args, kwargs) raise Return(result) if __name__ == "__main__": pool = multiprocessing.Pool(multiprocessing.cpu_count()) func = partial(async_run, worker) IOLoop().run_sync(func) 

Как я уже говорил, в большинстве случаев это хорошо работает. Но если worker() выдает исключение, callback никогда не вызывается, что означает, что gen.Task никогда не заканчивается, и вы вешаете вечно. Теперь, если вы знаете, что ваша работа никогда не будет генерировать исключение (потому что вы завернули все это в try / except , например, например), вы можете с радостью использовать этот подход. Однако, если вы хотите, чтобы исключения исключались из вашего рабочего, единственным решением, которое я нашел, было подклассирование некоторых компонентов многопроцессорности и заставить их вызвать callback даже если подпроцесс рабочего вызвал исключение:

 from multiprocessing.pool import ApplyResult, Pool, RUN import multiprocessing class TornadoApplyResult(ApplyResult): def _set(self, i, obj): self._success, self._value = obj if self._callback: self._callback(self._value) self._cond.acquire() try: self._ready = True self._cond.notify() finally: self._cond.release() del self._cache[self._job] class TornadoPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): ''' Asynchronous equivalent of `apply()` builtin This version will call `callback` even if an exception is raised by `func`. ''' assert self._state == RUN result = TornadoApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result ... if __name__ == "__main__": pool = TornadoPool(multiprocessing.cpu_count()) ... 

С этими изменениями объект исключения будет возвращен gen.Task , а не gen.Task висит на неопределенный срок. Я также обновил свой метод async_run чтобы повторно собрать исключение при его async_run , и сделал некоторые другие изменения, чтобы обеспечить лучшие трассировки для исключений, async_run . Вот полный код:

 import multiprocessing from multiprocessing.pool import Pool, ApplyResult, RUN from functools import wraps import tornado.web from tornado.ioloop import IOLoop from tornado.gen import Return from tornado import gen class WrapException(Exception): def __init__(self): exc_type, exc_value, exc_tb = sys.exc_info() self.exception = exc_value self.formatted = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb)) def __str__(self): return '\n%s\nOriginal traceback:\n%s' % (Exception.__str__(self), self.formatted) class TornadoApplyResult(ApplyResult): def _set(self, i, obj): self._success, self._value = obj if self._callback: self._callback(self._value) self._cond.acquire() try: self._ready = True self._cond.notify() finally: self._cond.release() del self._cache[self._job] class TornadoPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): ''' Asynchronous equivalent of `apply()` builtin This version will call `callback` even if an exception is raised by `func`. ''' assert self._state == RUN result = TornadoApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result @gen.coroutine def async_run(func, *args, **kwargs): """ Runs the given function in a subprocess. This wraps the given function in a gen.Task and runs it in a multiprocessing.Pool. It is meant to be used as a Tornado co-routine. Note that if func returns an Exception (or an Exception sub-class), this function will raise the Exception, rather than return it. """ result = yield gen.Task(pool.apply_async, func, args, kwargs) if isinstance(result, Exception): raise result raise Return(result) def handle_exceptions(func): """ Raise a WrapException so we get a more meaningful traceback""" @wraps(func) def inner(*args, **kwargs): try: return func(*args, **kwargs) except Exception: raise WrapException() return inner # Test worker functions @handle_exceptions def test2(x): raise Exception("eeee") @handle_exceptions def test(x): print x time.sleep(2) return "done" class TestHandler(tornado.web.RequestHandler): @gen.coroutine def get(self): try: result = yield async_run(test, "inside get") self.write("%s\n" % result) result = yield async_run(test2, "hi2") except Exception as e: print("caught exception in get") self.write("Caught an exception: %s" % e) finally: self.finish() app = tornado.web.Application([ (r"/test", TestHandler), ]) if __name__ == "__main__": pool = TornadoPool(4) app.listen(8888) IOLoop.instance().start() 

Вот как это работает для клиента:

 [email protected]:~$ curl localhost:8888/test done Caught an exception: Original traceback: Traceback (most recent call last): File "./mutli.py", line 123, in inner return func(*args, **kwargs) File "./mutli.py", line 131, in test2 raise Exception("eeee") Exception: eeee 

И если я отправлю два одновременных запроса на завивки, мы увидим, что они обрабатываются асинхронно на стороне сервера:

 [email protected]:~$ ./mutli.py inside get inside get caught exception inside get caught exception inside get 

Редактировать:

Обратите внимание, что этот код становится проще с Python 3, потому что он вводит error_callback ключевого слова error_callback для всех асинхронных multiprocessing.Pool методов. Это значительно упрощает интеграцию с Tornado:

 class TornadoPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): ''' Asynchronous equivalent of `apply()` builtin This version will call `callback` even if an exception is raised by `func`. ''' super().apply_async(func, args, kwds, callback=callback, error_callback=callback) @gen.coroutine def async_run(func, *args, **kwargs): """ Runs the given function in a subprocess. This wraps the given function in a gen.Task and runs it in a multiprocessing.Pool. It is meant to be used as a Tornado co-routine. Note that if func returns an Exception (or an Exception sub-class), this function will raise the Exception, rather than return it. """ result = yield gen.Task(pool.apply_async, func, args, kwargs) raise Return(result) 

Все, что нам нужно сделать в нашем переопределенном apply_async , вызывает родителя с error_callback ключевого слова error_callback в дополнение к callback kwarg. Нет необходимости переопределять ApplyResult .

Мы можем стать еще более привлекательными, используя MetaClass в нашем TornadoPool , чтобы его методы *_async вызывались напрямую, как если бы они были сопрограммами:

 import time from functools import wraps from multiprocessing.pool import Pool import tornado.web from tornado import gen from tornado.gen import Return from tornado import stack_context from tornado.ioloop import IOLoop from tornado.concurrent import Future def _argument_adapter(callback): def wrapper(*args, **kwargs): if kwargs or len(args) > 1: callback(Arguments(args, kwargs)) elif args: callback(args[0]) else: callback(None) return wrapper def PoolTask(func, *args, **kwargs): """ Task function for use with multiprocessing.Pool methods. This is very similar to tornado.gen.Task, except it sets the error_callback kwarg in addition to the callback kwarg. This way exceptions raised in pool worker methods get raised in the parent when the Task is yielded from. """ future = Future() def handle_exception(typ, value, tb): if future.done(): return False future.set_exc_info((typ, value, tb)) return True def set_result(result): if future.done(): return if isinstance(result, Exception): future.set_exception(result) else: future.set_result(result) with stack_context.ExceptionStackContext(handle_exception): cb = _argument_adapter(set_result) func(*args, callback=cb, error_callback=cb) return future def coro_runner(func): """ Wraps the given func in a PoolTask and returns it. """ @wraps(func) def wrapper(*args, **kwargs): return PoolTask(func, *args, **kwargs) return wrapper class MetaPool(type): """ Wrap all *_async methods in Pool with coro_runner. """ def __new__(cls, clsname, bases, dct): pdct = bases[0].__dict__ for attr in pdct: if attr.endswith("async") and not attr.startswith('_'): setattr(bases[0], attr, coro_runner(pdct[attr])) return super().__new__(cls, clsname, bases, dct) class TornadoPool(Pool, metaclass=MetaPool): pass # Test worker functions def test2(x): print("hi2") raise Exception("eeee") def test(x): print(x) time.sleep(2) return "done" class TestHandler(tornado.web.RequestHandler): @gen.coroutine def get(self): try: result = yield pool.apply_async(test, ("inside get",)) self.write("%s\n" % result) result = yield pool.apply_async(test2, ("hi2",)) self.write("%s\n" % result) except Exception as e: print("caught exception in get") self.write("Caught an exception: %s" % e) raise finally: self.finish() app = tornado.web.Application([ (r"/test", TestHandler), ]) if __name__ == "__main__": pool = TornadoPool() app.listen(8888) IOLoop.instance().start() 

Если ваши запросы будут длиться так долго, то торнадо – неправильная структура.

Я предлагаю вам использовать nginx для маршрутизации быстрого доступа к торнадо, а медленнее – к другому серверу.

У PeterBe есть интересная статья, в которой он запускает несколько серверов Tornado и устанавливает один из них как «медленный» для обработки длительных запросов. См.: Беспокойство-о-io-blocking. Я бы попробовал этот метод.