自学内容网 自学内容网

使用python操作kafka

第一步:安装kafka的模块

pip install kafka-python

第二步:编写代码

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import random
import time

class StationLog:
    def __init__(self, station_id, call_out, call_in, call_status, timestamp, call_duration):
        self.station_id = station_id
        self.call_out = call_out
        self.call_in = call_in
        self.call_status = call_status
        self.timestamp = timestamp
        self.call_duration = call_duration

    def to_string(self):
        return json.dumps(self.__dict__)

def main():
    # 设置连接kafka集群的ip和端口
    producer = KafkaProducer(bootstrap_servers='bigdata01:9092',
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'))

    arr = ["fail", "busy", "barring", "success", "success", "success", "success", "success", "success", "success", "success", "success"]

    while True:
        call_out = "1860000" + str(random.randint(0, 9999)).zfill(4)
        call_in = "1890000" + str(random.randint(0, 9999)).zfill(4)
        call_status = random.choice(arr)
        call_duration = 1000 * (10 + random.randint(0, 9)) if call_status == "success" else 0

        # 随机产生一条基站日志数据
        station_log = StationLog(
            "station_" + str(random.randint(0, 9)),
            call_out,
            call_in,
            call_status,
            int(time.time() * 1000),  # 当前时间戳
            call_duration
        )
        print(station_log.to_string())
        time.sleep(0.1 + random.randint(0, 99) / 100)

        try:
            # 发送数据到Kafka
            producer.send('topicA', station_log.to_string())
        except KafkaError as e:
            print(f"Failed to send message: {e}")

        # 确保所有异步消息都被发送
        producer.flush()

if __name__ == "__main__":
    main()

以上案例是通过python操作kafka,将一些模拟数据发送到kafka中。


原文地址:https://blog.csdn.net/wozhendeyumenle/article/details/143778820

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!