Проверьте, существует ли тема Kafka в Python

Я хочу создать тему Kafka, если она еще не существует. Я знаю, как создать тему через bash, но я не знаю, как проверить, существует ли она.

topic_exists = ?????? if not topic_exists: subprocess.call([os.path.join(KAFKABIN, 'kafka-topics.sh'), '--create', '--zookeeper', '{}:2181'.format(KAFKAHOST), '--topic', str(self.topic), '--partitions', str(self.partitions), '--replication-factor', str(self.replication_factor)]) 

Вы можете использовать --list (List all available topics) для kafka-topics.sh и посмотреть, существует ли self.topic в массиве topics , как показано ниже.

В зависимости от количества тем, которые у вас есть, этот подход может быть немного тяжелым. Если это так, вы можете избавиться от использования --describe (List details for the given topics) которые, скорее всего, вернутся пустым, если тема не существует. Я не проверил это полностью, поэтому не могу точно сказать, насколько это твердое решение ( --describe ), но, возможно, вам стоит немного поработать над этим.

 wanted_topics = ['host_updates_queue', 'foo_bar'] topics = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'), '--list', '--zookeeper', '{}:2181'.format(KAFKAHOST)]) for wanted in wanted_topics: if wanted in topics: print '\'{}\' topic exists!'.format(wanted) else: print '\'{}\' topic does NOT exist!'.format(wanted) topic_desc = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'), '--describe', '--topic', wanted, '--zookeeper', '{}:2181'.format(KAFKAHOST)]) if not topic_desc: print 'No description found for the topic \'{}\''.format(wanted) 

ВЫВОД:

 root@dev:/opt/kafka/kafka_2.10-0.8.2.1# ./t.py 'host_updates_queue' topic exists! 'foo_bar' topic does NOT exist! No description found for the topic 'foo_bar' 

Существует также конфигурация брокера, поэтому вам не нужно предпринимать ни одного из следующих шагов:

auto.create.topics.enable | true | Включить автоматическое создание темы на сервере. Если для этого параметра установлено значение true, попытки создания метаданных данных или получения метаданных для несуществующей темы автоматически создадут его с коэффициентом репликации по умолчанию и количеством разделов.

По возможности я бы взял этот подход.

Обратите внимание, что вы должны установить для вашего брокера темы configs ( server.properties ) для num.partitions и default.replication.factor чтобы они соответствовали вашим настройкам в фрагменте кода.

Другим приятным способом является модуль python kafka:

 kafka_client = kafka.KafkaClient(kafka_server_name) server_topics = kafka_client.topic_partitions if topic_name in server_topics: your code.... 

kafka_client.topic_partitions возвращает список тем.