python学习-python对kafka的相关操作
python对kafka的操作
【1】kafka简介
如果你对kafka有一定的理解可以忽略以下内容
kafka的基础知识如下:
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写,用于处理实时数据流。它以高吞吐量、低延迟和可扩展性等特点而著称,广泛应用于Web数据抓取、日志收集、消息系统等领域。 Kafka的主要特点包括: 1. 高吞吐量:Kafka能够处理大规模的数据流,每秒可以处理几十万条消息。 2. 低延迟:Kafka能够快速地传输消息,通常延迟在毫秒级别。 3. 可扩展性:Kafka能够轻松地扩展到多个节点,以满足不同规模和负载的需求。 4. 数据持久化:Kafka将数据持久化到磁盘上,可以在数据丢失或节点故障时进行恢复。 5. 多语言支持:Kafka支持多种编程语言,包括Java、Scala、Python等。 Kafka的核心概念包括: 1. Topic:主题,即消息的类别或主题。 2. Producer:生产者,用于向Kafka发送消息。 3. Consumer:消费者,用于从Kafka接收消息。 4. Broker:代理,Kafka节点的服务器。 5. Partition:分区,一个主题可以被分为多个分区,每个分区可以在不同的节点上。 6. Offset:偏移量,每个分区中的每个消息都有一个唯一的偏移量。 7. ZooKeeper:Kafka使用ZooKeeper来管理集群中的各个节点。 Kafka的应用场景包括: 1. 实时数据流处理:Kafka能够快速地传输和处理大规模的实时数据流,适用于实时数据分析、实时监控等场景。 2. 日志收集:Kafka能够快速地收集和处理大量的日志数据,适用于日志分析、日志管理等场景。 3. 消息系统:Kafka能够快速地传输消息,适用于消息推送、消息队列等场景。 总之,Kafka是一个高性能、高可靠、可扩展的流处理平台,适用于处理实时数据流、日志收集、消息系统等场景。
开始使用kafka是,我先来描述一下我的配置文件
kafka: brokers: 'kafka46:9092,kafka47:9092,kafka48:9092,kafka49:9092' zookeeper_hosts: 'kafka46:2181,kafka47:2181,kafka48/kafka' topic: srvdbMessage group_id: message_to_srvdb-20210810165601 # 初始kafka消费偏移: smallest - 从最早消费; largest - 从最新消费 offset_reset: largest
【2】创建消费者消费数据
简单说一下,想消费同一个topic里面的数据两次的话,可以用不同的group_id去消费
from confluent_kafka import Consumer from confluent_kafka import KafkaError def get_message(self): def print_assignment(consumer, partitions): logging.info("Assignment: {}".format(partitions)) def print_revoke(consumer, partitions): logging.info("Revoke: {}".format(partitions)) logging.info("Initialize kafka consumer.") consumer_conf = { 'bootstrap.servers': self.server_config['kafka_coach']['brokers'], 'group.id': self.server_config['kafka_coach']['group_id'], 'enable.auto.commit': 'true', 'default.topic.config': { 'auto.offset.reset': self.server_config['kafka_coach']['offset_reset'] } } consumer = Consumer(consumer_conf) consumer.subscribe([self.server_config['kafka_coach']['topic']], on_assign=print_assignment, on_revoke=print_revoke) logging.info("start analysis") while True: message = consumer.poll(timeout=1.0) if message is None: if not consumer.assignment(): logging.error("Partition is not assignment. ") time.sleep(0.1) continue message_partition = message.partition() message_offset = message.offset() if message.error(): if message.error().code() == KafkaError._PARTITION_EOF: # logger.info('partition: %d reached end at offset %d.', partition, offset) pass else: logging.error("kafka consumer error! {}".format(message.error())) continue message_value = message.value() if message_value: print("消费到的数据是:{}".format(message_value))
【3】kafka的数据生产
from confluent_kafka import Producer def save_result_to_kafka(self): """将数据保存到kafka""" def delivery_callback(err, msg): if err: logging.error('Message failed delivery: %s' % err) producer_conf = { 'bootstrap.servers': self.config['kafka']['brokers']) } while 1: try: producer = Producer(**producer_conf) while 1: result = {"key":123} try: producer.produce( self.config['kafka']['topic_in'], json.dumps(result), callback=delivery_callback) except Exception as e: logging.exception('保存kafka时错误: %s', str(e)) self.result_queen.put(result) time.sleep(0.1) producer.poll(0) except Exception as e: logging.exception('连接kafka时错误: %s', str(e)) time.sleep(10)
原文地址:https://blog.csdn.net/wu73guang5jian/article/details/142857318
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!