Клиент веб-сайта Tornado теряет ответные сообщения?

Мне нужно обработать кадры с веб-камеры и отправить несколько выбранных кадров на удаленный сервер веб-сервера. Сервер немедленно отвечает на сообщение с подтверждением (похожее на эхо-сервер). Обработка кадров медленная и интенсивная, поэтому я хочу сделать это, используя отдельный пул потоков (продюсер), чтобы использовать все доступные ядра. Таким образом, клиент (потребитель) просто сидит без дела, пока пул не сможет что-то отправить. Моя текущая реализация, как показано ниже, отлично работает, только если я добавляю небольшой сон внутри цикла тестирования производителя. Если я удалю эту задержку, я перестаю получать ответ от сервера (как эхо-сервера, так и моего реального сервера). Даже первый ответ потерян, поэтому я не думаю, что это механизм защиты от наводнений. Что я делаю не так?

import tornado from tornado.websocket import websocket_connect from tornado import gen, queues import time class TornadoClient(object): url = None onMessageReceived = None onMessageSent = None ioloop = tornado.ioloop.IOLoop.current() q = queues.Queue() def __init__(self, url, onMessageReceived, onMessageSent): self.url = url self.onMessageReceived = onMessageReceived self.onMessageSent = onMessageSent def enqueueMessage(self, msgData, binary=False): print("TornadoClient.enqueueMessage") self.ioloop.add_callback(self.addToQueue, (msgData, binary)) print("TornadoClient.enqueueMessage done") @gen.coroutine def addToQueue(self, msgTuple): yield self.q.put(msgTuple) @gen.coroutine def main_loop(self): connection = None try: while True: while connection is None: try: print("Connecting...") connection = yield websocket_connect(self.url) print("Connected " + str(connection)) except Exception, e: print("Exception on connection " + str(e)) connection = None print("Retry in a few seconds...") yield gen.Task(self.ioloop.add_timeout, time.time() + 3) try: print("Waiting for data to send...") msgData, binaryVal = yield self.q.get() print("Writing...") sendFuture = connection.write_message(msgData, binary=binaryVal) print("Write scheduled...") finally: self.q.task_done() yield sendFuture self.onMessageSent("Sent ok") print("Write done. Reading...") msg = yield connection.read_message() print("Got msg.") self.onMessageReceived(msg) if msg is None: print("Connection lost") connection = None print("main loop completed") except Exception, e: print("ExceptionExceptionException") print(e) connection = None print("Exit main_loop function") def start(self): self.ioloop.run_sync(self.main_loop) print("Main loop completed") ######### TEST METHODS ######### def sendMessages(client): time.sleep(2) #TEST only: wait for client startup while True: client.enqueueMessage("msgData", binary=False) time.sleep(1) # <--- comment this line to break it def testPrintMessage(msg): print("Received: " + str(msg)) def testPrintSentMessage(msg): print("Sent: " + msg) if __name__=='__main__': from threading import Thread client = TornadoClient("ws://echo.websocket.org", testPrintMessage, testPrintSentMessage) thread = Thread(target = sendMessages, args = (client, )) thread.start() client.start() 

Моя настоящая проблема

В моей реальной программе я использую механизм «как окно» для защиты потребителя (сервер autobahn.twisted.websocket): производитель может отправлять до максимального количества сообщений без подтверждения (фреймы веб-камеры), а затем останавливается половину окна для освобождения. Потребитель отправляет сообщение «PROCESSED» обратно, подтверждающее одно или несколько сообщений (только счетчик, а не идентификатор). То, что я вижу в журнале пользователя, – это то, что сообщения обрабатываются и ответ отправляется обратно, но эти данные исчезают где-то в сети.

У меня мало опыта работы с asynchio, поэтому я хотел быть уверенным, что у меня нет недостатка, аннотации или чего-то еще.

Это журнал потребительской стороны:

 2017-05-13 18:59:54+0200 [-] TX Frame to tcp4:192.168.0.5:48964 : fin = True, rsv = 0, opcode = 1, mask = -, length = 21, repeat_length = None, chopsize = None, sync = False, payload = {"type": "PROCESSED"} 2017-05-13 18:59:54+0200 [-] TX Octets to tcp4:192.168.0.5:48964 : sync = False, octets = 81157b2274797065223a202250524f434553534544227d 

One Solution collect form web for “Клиент веб-сайта Tornado теряет ответные сообщения?”

Это аккуратный код. Я верю, что причина, по которой вам нужен сон в потоке sendMessages заключается в том, что в противном случае он продолжает вызывать enqueueMessage как можно быстрее, миллионы раз в секунду. Поскольку enqueueMessage не enqueueMessage обработки обработанного сообщения, он продолжает как можно быстрее вызывать IOLoop.add_callback , не предоставляя петле достаточную возможность выполнить обратные вызовы.

Петля может сделать некоторый прогресс в основном потоке, так как вы на самом деле не блокируете его. Но поток sendMessages добавляет обратные вызовы намного быстрее, чем цикл может обрабатывать их. К тому времени, когда цикл вытолкнул одно сообщение из очереди и начал его обрабатывать, уже добавлены миллионы новых обратных вызовов, которые цикл должен выполнить, прежде чем он сможет перейти на следующий этап обработки сообщений.

Поэтому для вашего тестового кода, я думаю, правильно спать между вызовами enqueueMessage в потоке.

  • Python - Ожидание изменения переменной
  • Openshift Tornado WebSocket (Demo) не отвечает
  • Торнадо отправит сообщение на мероприятие
  • AttributeError: объект 'module' не имеет атрибута 'WebSocketApp'
  • Форматирование сообщений для отправки на сервер socket.io node.js из клиента python
  • python Tornado websockets, как отправлять сообщения каждые X секунд?
  • Можно ли использовать websockets в Flask и Python 3?
  • pyqt и клиент websocket. слушать веб-сайт в фоновом режиме
  • Python - лучший язык программирования в мире.