springCloud--消息总线Kafka

简介

Kafka是基于消息发布/订阅模式实现的消息系统,其主要设计⽬标如下:

  • 消息持久化:以时间复杂度为O(1)的⽅式提供消息持久化能⼒,即使对TB级以上数据也能保证常 数时间复杂度的访问性能。
  • ⾼吞吐:在廉价的商⽤机器上也能⽀持单机每秒100K条以上的吞吐量
  • 分布式:⽀持消息分区以及分布式消费,并保证分区内的消息顺序
  • 跨平台:⽀持不同技术平台的客户端(如:Java、PHP、Python等)
  • 实时性:⽀持实时数据处理和离线数据处理 伸缩性:⽀持⽔平扩展

Kafka中涉及的⼀些基本概念:

  • Broker:Kafka集群包含⼀个或多个服务器,这些服务器被称为Broker
  • Topic:逻辑上同Rabbit的Queue队列相似,每条发布到Kafka集群的消息都必须有⼀个Topic。 (物理上不同Topic的消息分开存储,逻辑上⼀个Topic的消息虽然保存于⼀个或多个Broker上,但 ⽤户只需指定消息的Topic即可⽣产或消费数据⽽不必关⼼数据存于何处)
  • Partition:Partition是物理概念上的分区,为了提供系统吞吐率,在物理上每个Topic会分成⼀个或 多个Partition,每个Partition对应⼀个⽂件夹(存储对应分区的消息内容和索引⽂件)。
  • Producer:消息⽣产者,负责⽣产消息并发送到Kafka Broker。
  • Consumer:消息消费者,向Kafka Broker读取消息并处理的客户端。
  • Consumer Group:每个Consumer属于⼀个特定的组(可为每个Consumer指定属于⼀个组,若 不指定则属于默认组),组可以⽤来实现⼀条消息被组内多个成员消费等功能。

安装下载

下载地址:http://kafka.apache.org/downloads.html

由于 Kafka 的设计中依赖了ZooKeeper,所以在 config ⽬录中,则是⽤来存放了关于Kafka与ZooKeeper的配置信息

启动Kafka,执⾏命令:kafka-server-start config/server.properties

创建Topic,执⾏命令:kafka-topics --create --zookeeper localhost:2181 -replication-factor 1 --partitions 1 --topic test

查看当前的Topic: kafka-topics --list -zookeeper localhost:2181

创建消息⽣产者:kafka-console-producer --broker-list localhost:9092 -topic test

创建消息消费者:kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginning

整合Spring Cloud Bus

在上⼀篇使⽤Rabbit实现消息总线的案例中,
1、添加依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

2、添加配置:

1
2
3
4
5
6
7
8
//Kafka的服务端列表
spring.cloud.stream.kafka.binder.brokers=localhost
//Kafka服务端的默认端⼝
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
//ZooKeeper节点列表
spring.cloud.stream.kafka.binder.zkNodes=localhost
//ZooKeeper节点的默认端⼝
spring.cloud.stream.kafka.binder.defaultZkPort=2181

文章作者: gqsu
文章链接: http://www.ipdax.com/2019/03/14/springCloud-消息总线Kafka/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 技术笔记分享
支付宝打赏
微信打赏