博客
关于我
python使用kafka收发消息
阅读量:635 次
发布时间:2019-03-15

本文共 2846 字,大约阅读时间需要 9 分钟。

Kafka消息队列入门:Python发布与订阅实践指南

Kafka简介

Kafka是一款流行的开源消息队列中间件,广泛应用于大数据处理、后端服务等领域。作为发布-订阅模式的核心组件,Kafka以其高效的分区机制和可扩展性,成为现代应用的重要数据通信工具。本文将从基础到实践,向您展示如何使用Python与Kafka进行消息发布与订阅操作。

Kafka消息发布

安装依赖

首先,安装Python的Kafka连接模块:

pip install kafka-python

消息生产代码

以下是实现Kafka消息发布的Python代码示例:

import timefrom kafka import KafkaProducerclass KafkaMsgProducer:    def __init__(self, server):        self.server = server        self.producer = None    def connect(self):        if self.producer is None:            self.producer = KafkaProducer(bootstrap_servers=self.server)    def close(self):        if self.producer is not None:            self.producer.close()            self.producer = None    def send(self, topic, msg):        if self.producer is not None:            if not isinstance(msg, bytes):                msg = msg.encode("utf-8")            self.producer.send(topic=topic, value=msg)def run():    producer = KafkaMsgProducer("localhost:9092")    producer.connect()        topic = "YanChampion-Test"    print("开始向Kafka发送消息!")        for msg in "Hello! This is YanChampion speaking!".split():        producer.send(topic=topic, msg=msg)        time.sleep(1)if __name__ == "__main__":    run()

注意事项

  • 代码中使用bootstrap_servers配置项时,请确保Kafka服务器地址正确。
  • 消息体必须是bytes类型,代码中已提供了将str类型转换为bytes的处理方式。
  • 在实际使用中,建议为每个KafkaMsgProducer实例设置合理的brokering_servers参数。

Kafka消息订阅

消息消费代码

以下是实现Kafka消息订阅的Python代码示例:

import timefrom kafka import KafkaConsumerclass KafkaMsgConsumer:    def __init__(self, server, group_id):        self.server = server        self.group_id = group_id        self.consumer = None    def connect(self):        if self.consumer is None:            self.consumer = KafkaConsumer(                bootstrap_servers=self.server,                group_id=self.group_id            )    def close(self):        if self.consumer is not None:            self.consumer.close()            self.consumer = None    def consume(self, topic):        if self.consumer is not None:            for msg in self.consumer.consume():                if msg is not None:                    print(f"接收到消息:{msg.value.decode('utf-8')}")                    time.sleep(0.1)def run():    consumer = KafkaMsgConsumer("localhost:9092", "yanchampion-group")    consumer.connect()        topic = "YanChampion-Test"    print("开始监听Kafka消息...")        while True:        consumer.consume(topic)if __name__ == "__main__":    run()

消息订阅注意事项

  • group_id可根据实际需求进行设置,确保每个消费者组有唯一的group_id
  • 消费者会持续监听消息,使用time.sleep(0.1)控制消费频率。
  • 消息消费成功后,会打印消息内容,用户可以根据实际需求进行处理。

测试与验证

消息生产与消费验证

  • 打开终端1,运行以下命令:
python3 consumer.py
  • 打开终端2,运行以下命令:
python3 producer.py
  • 消息将通过Kafka进行分发,消费者终端会接收到每条消息。注意到消息会被自动分区处理,相同的消息可能会被多个消费者同时接收。

测试结果示例

消费者终端可能会显示如下输出:

接收到消息:b'111'接收到消息:b'222'接收到消息:b'333'接收到消息:b'444'

总结

通过以上代码示例,我们成功实现了Kafka的消息发布与订阅功能。Kafka的发布-订阅模式使得消息生产者与消费者之间的耦合度极低,适合处理大规模的异步数据通信场景。在实际应用中,可以根据具体需求调整消息分区数、消费者组大小等参数,以优化Kafka的性能表现。

转载地址:http://inplz.baihongyu.com/

你可能感兴趣的文章
浏览器刷新页面
查看>>
代码错误信息,微信报错
查看>>
easyui日期处理(开始时间和结束时间)
查看>>
WPF画椭圆
查看>>
XMLHttpRequest对象的一个简单运用示例
查看>>
java文件上传
查看>>
DHCP跨网段分配IP地址
查看>>
10.多线程与并行
查看>>
Callable中call方法和Runnable中run方法的区别
查看>>
IDEA上移除项目(逻辑删除)
查看>>
Docker方式启动tomcat,访问首页出现404错误
查看>>
Docker方式启动tomcat,访问首页出现404错误(第二篇 -- 将修改过的容器映射成镜像)
查看>>
【蓝桥杯】 java 大学c组 省赛 1、隔行变色
查看>>
BIM轻量化——浏览器展示 | 利用unity
查看>>
超市账单管理系统
查看>>
Springboot实现热部署
查看>>
composer 介绍、安装及基本使用方法
查看>>
PHP 的 ::class 用法
查看>>
Python学习之列表用法
查看>>
升级qiime2
查看>>