Стратегия RabbitMQ, Pika и пересоединения

Я использую Pika для обработки данных из RabbitMQ. Поскольку я, казалось, сталкивался с различными проблемами, я решил написать небольшое тестовое приложение, чтобы посмотреть, как я могу работать с разъединениями.

Я написал это тестовое приложение, которое делает следующее:

  1. Подключитесь к Брокеру, повторите попытку до
  2. При подключении создайте очередь.
  3. Потребляйте эту очередь и поместите результат в очередь python Queue.Queue (0)
  4. Получите элемент из Queue.Queue (0) и верните его в очередь брокера.

Я заметил 2 вопроса:

  1. Когда я запускаю свой сценарий с одного хоста, подключающегося к rabbitmq на другом хосте (внутри vm), тогда эти скрипты выходят в случайные моменты без возникновения ошибки.
  2. Когда я запускаю свой скрипт на том же хосте, на котором установлен RabbitMQ, он работает нормально и продолжает работать.

Это может быть объяснено из-за сетевых проблем, пакеты упали, хотя я считаю, что соединение не очень надежное.

Когда скрипт выполняется локально на сервере RabbitMQ, и я убиваю RabbitMQ, тогда скрипт выходит с ошибкой: «ERROR pika SelectConnection: Socket Error on 3: 104»

Похоже, я не могу заставить стратегию пересоединения работать так, как должно быть. Может ли кто-нибудь взглянуть на код, чтобы увидеть, что я делаю неправильно?

Благодаря,

сойка

#!/bin/python import logging import threading import Queue import pika from pika.reconnection_strategies import SimpleReconnectionStrategy from pika.adapters import SelectConnection import time from threading import Lock class Broker(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.logging = logging.getLogger(__name__) self.to_broker = Queue.Queue(0) self.from_broker = Queue.Queue(0) self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True) self.srs = SimpleReconnectionStrategy() self.properties = pika.BasicProperties(delivery_mode=2) self.connection = None while True: try: self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs) break except Exception as err: self.logging.warning('Cant connect. Reason: %s' % err) time.sleep(1) self.daemon=True def run(self): while True: self.submitData(self.from_broker.get(block=True)) pass def on_connected(self,connection): connection.channel(self.on_channel_open) def on_channel_open(self,new_channel): self.channel = new_channel self.channel.queue_declare(queue='sandbox', durable=True) self.channel.basic_consume(self.processData, queue='sandbox') def processData(self, ch, method, properties, body): self.logging.info('Received data from broker') self.channel.basic_ack(delivery_tag=method.delivery_tag) self.from_broker.put(body) def submitData(self,data): self.logging.info('Submitting data to broker.') self.channel.basic_publish(exchange='', routing_key='sandbox', body=data, properties=self.properties) if __name__ == '__main__': format=('%(asctime)s %(levelname)s %(name)s %(message)s') logging.basicConfig(level=logging.DEBUG, format=format) broker=Broker() broker.start() try: broker.connection.ioloop.start() except Exception as err: print err 

One Solution collect form web for “Стратегия RabbitMQ, Pika и пересоединения”

Основная проблема с вашим скриптом заключается в том, что он взаимодействует с одним каналом как из основного потока (где работает ioloop), так и из потока «Broker» (вызывает submitData в цикле). Это небезопасно .

Кроме того, SimpleReconnectionStrategy , похоже, ничего полезного не делает. Это не вызывает повторное подключение, если соединение прерывается. Я считаю, что это ошибка в Pika: https://github.com/pika/pika/issues/120

Я попытался реорганизовать ваш код, чтобы он работал, как я думаю, вы этого хотели, но столкнулись с другой проблемой. У Pika нет способа обнаружить отказ в доставке, что означает, что данные могут быть потеряны, если соединение падает. Это кажется таким очевидным требованием! Как невозможно обнаружить, что basic_publish не удалось? Я пробовал всевозможные вещи, включая транзакции и add_on_return_callback (все из которых казались неуклюжими и чрезмерно сложными), но ничего не придумал. Если это действительно так, то pika кажется полезным только в ситуациях, которые могут переносить потерю данных, отправленных в RabbitMQ, или в программы, которые нужно потреблять только у RabbitMQ.

Это ненадежно, но для справки, вот какой-то код, который решает вашу проблему с несколькими потоками:

 import logging import pika import Queue import sys import threading import time from functools import partial from pika.adapters import SelectConnection, BlockingConnection from pika.exceptions import AMQPConnectionError from pika.reconnection_strategies import SimpleReconnectionStrategy log = logging.getLogger(__name__) DEFAULT_PROPERTIES = pika.BasicProperties(delivery_mode=2) class Broker(object): def __init__(self, parameters, on_channel_open, name='broker'): self.parameters = parameters self.on_channel_open = on_channel_open self.name = name def connect(self, forever=False): name = self.name while True: try: connection = SelectConnection( self.parameters, self.on_connected) log.debug('%s connected', name) except Exception: if not forever: raise log.warning('%s cannot connect', name, exc_info=True) time.sleep(10) continue try: connection.ioloop.start() finally: try: connection.close() connection.ioloop.start() # allow connection to close except Exception: pass if not forever: break def on_connected(self, connection): connection.channel(self.on_channel_open) def setup_submitter(channel, data_queue, properties=DEFAULT_PROPERTIES): def on_queue_declared(frame): # PROBLEM pika does not appear to have a way to detect delivery # failure, which means that data could be lost if the connection # drops... channel.confirm_delivery(on_delivered) submit_data() def on_delivered(frame): if frame.method.NAME in ['Confirm.SelectOk', 'Basic.Ack']: log.info('submission confirmed %r', frame) # increasing this value seems to cause a higher failure rate time.sleep(0) submit_data() else: log.warn('submission failed: %r', frame) #data_queue.put(...) def submit_data(): log.info('waiting on data queue') data = data_queue.get() log.info('got data to submit') channel.basic_publish(exchange='', routing_key='sandbox', body=data, properties=properties, mandatory=True) log.info('submitted data to broker') channel.queue_declare( queue='sandbox', durable=True, callback=on_queue_declared) def blocking_submitter(parameters, data_queue, properties=DEFAULT_PROPERTIES): while True: try: connection = BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue='sandbox', durable=True) except Exception: log.error('connection failure', exc_info=True) time.sleep(1) continue while True: log.info('waiting on data queue') try: data = data_queue.get(timeout=1) except Queue.Empty: try: connection.process_data_events() except AMQPConnectionError: break continue log.info('got data to submit') try: channel.basic_publish(exchange='', routing_key='sandbox', body=data, properties=properties, mandatory=True) except Exception: log.error('submission failed', exc_info=True) data_queue.put(data) break log.info('submitted data to broker') def setup_receiver(channel, data_queue): def process_data(channel, method, properties, body): log.info('received data from broker') data_queue.put(body) channel.basic_ack(delivery_tag=method.delivery_tag) def on_queue_declared(frame): channel.basic_consume(process_data, queue='sandbox') channel.queue_declare( queue='sandbox', durable=True, callback=on_queue_declared) if __name__ == '__main__': if len(sys.argv) != 2: print 'usage: %s RABBITMQ_HOST' % sys.argv[0] sys.exit() format=('%(asctime)s %(levelname)s %(name)s %(message)s') logging.basicConfig(level=logging.DEBUG, format=format) host = sys.argv[1] log.info('connecting to host: %s', host) parameters = pika.ConnectionParameters(host=host, heartbeat=True) data_queue = Queue.Queue(0) data_queue.put('message') # prime the pump # run submitter in a thread setup = partial(setup_submitter, data_queue=data_queue) broker = Broker(parameters, setup, 'submitter') thread = threading.Thread(target= partial(broker.connect, forever=True)) # uncomment these lines to use the blocking variant of the submitter #thread = threading.Thread(target= # partial(blocking_submitter, parameters, data_queue)) thread.daemon = True thread.start() # run receiver in main thread setup = partial(setup_receiver, data_queue=data_queue) broker = Broker(parameters, setup, 'receiver') broker.connect(forever=True) 
  • SparkStreaming, RabbitMQ и MQTT в питоне с использованием pika
  • Невозможно подключиться к RabbitMQ на Heroku с помощью pika из-за ProbableAccessDeniedError
  • Проверка достоверности учетных данных rabbitmq
  • pika, stop_consuming не работает
  • Где вы должны обновить настройки сельдерея? О удаленном работнике или отправителе?
  • Как сделать простой Pika SelectConnection для отправки сообщения в python?
  • Состояние задачи сельдерея зависит от CELERY_TASK_RESULT_EXPIRES
  • Как сделать паузу и возобновить потребление изящно в rabbitmq, pika python
  • Python - лучший язык программирования в мире.