你可能坐过Bus,但是你用过Spring Cloud Bus没?实战来袭

目录

  • 第一步,首先在项目中pom.xml添加如下:
  • 第二步,下载kafka并安装,地址:http://kafka.apache.org/downloads.html
  • 第三步,启动zookeeper, 由于Kafka的设计中依赖了ZooKeeper
  • 第四步,启动kafka, 切换到/kafka_2.12-0.11.0.1目录下
  • 第五步,创建topic,切换到/kafka_2.12-0.11.0.1目录下
  • 第六步,创建消息生产者
  • 第七步,创建消息消费者, 打开另外一个命令窗口,切换到/kafka_2.12-0.11.0.1目录下
  • 第八步,在config-server工程中,application.properties中添加如下kafka配置:

Spring Cloud Bus 使用kafka实现消息总线
在这里插入图片描述

第一步,首先在项目中pom.xml添加如下:

		<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-kafka</artifactId></dependency>

第二步,下载kafka并安装,地址:http://kafka.apache.org/downloads.html

选择Binary downloads中的
Scala 2.12 - kafka_2.12-1.0.0.tgz (asc, sha512)
这个版本,并解压:

tar xf kafka_2.12-0.11.0.1.tgz

第三步,启动zookeeper, 由于Kafka的设计中依赖了ZooKeeper

所以我们可以在bin和config目录中除了看到Kafka相关的内容之外,还有ZooKeeper相关的内容。其中bin目录存放了Kafka和ZooKeeper的命令行工具,bin根目录下是适用于Linux/Unix的shell,而bin/windows下的则是适用于windows下的bat.
切换到/kafka_2.12-0.11.0.1目录下:

sh bin/zookeeper-server-start.sh config/zookeeper.properties 

第四步,启动kafka, 切换到/kafka_2.12-0.11.0.1目录下

 sh bin/kafka-server-start.sh config/server.properties 

该命令也需要指定Kafka配置文件的正确位置,如上命令中指向了解压目录包含的默认配置。若在测试时,使用外部集中环境的ZooKeeper的话,我们可以在该配置文件中通过zookeeper.connect参数来设置ZooKeeper的地址和端口,它默认会连接本地2181端口的ZooKeeper;如果需要设置多个ZooKeeper节点,可以为这个参数配置多个ZooKeeper地址,并用逗号分割。比如:zookeeper.connect=127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002

第五步,创建topic,切换到/kafka_2.12-0.11.0.1目录下

 sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sports_test

第六步,创建消息生产者

sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sports_test

然后输入:
> welcome to Shanghai

第七步,创建消息消费者, 打开另外一个命令窗口,切换到/kafka_2.12-0.11.0.1目录下

 sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sports_test --from-beginning

就打印出:

welcome to Shanghai

到此,kafka的消息服务已经实现,接下来就是整合spring cloud bus,集成到spring cloud config server 和spring cloud config client项目中,可以使用命令
sh bin/kafka-topics.sh --list --zookeeper localhost:2181
查看消息服务列表。

第八步,在config-server工程中,application.properties中添加如下kafka配置:

#kafka
spring.cloud.stream.kafka.binder.zk-nodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9092

备注:

#Kafka的服务端列表
spring.cloud.stream.kafka.binder.brokers=localhost
#Kafka服务端的默认端口,当brokers属性中没有配置端口信息时,就会使用这个默认端口	
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
#Kafka服务端连接的ZooKeeper节点列表
spring.cloud.stream.kafka.binder.zkNodes=localhost
#ZooKeeper节点的默认端口,当zkNodes属性中没有配置端口信息时,就会使用这个默认端口
spring.cloud.stream.kafka.binder.defaultZkPort=2181

更多系列文章推荐:

  1. Spring Cloud (20) | Spring Cloud Bus 使用kafka消息总线、gitlab添加webhooks实现自动刷新配置
  2. Spring Cloud (19) | Eureka Server 高可用服务注册中心
  3. Spring Cloud (18) | 给Eureka Server加上安全验证
  4. Spring Cloud (15) | Spring Boot、HikariCP、Mybatis和MySQL 配置HikariCP数据库连接池
  5. Spring Cloud (14) | 微服务不能从git/github/gitlab中获取数据库信息 can’t load properties from git/github/gitlab
  6. Spring Cloud (12) | Spring Cloud Zuul网关调用微服务,request请求参数是application/json
  7. Spring Cloud (11) | healthcheck开启健康检查
  8. Spring Cloud (10) | Eureka 各项参数详解
  9. Spring Cloud (8) | 把Spring Boot项目改造成tomcat容器启动
  10. Spring Cloud (7) | Mongodb 微服务
  11. Spring Cloud (6) | spring cloud zuul 跨域问题No ‘Access-Control-Allow-Origin’ header
  12. Spring Cloud (5) | 配置中心 Config Server 配置git or github or gitlab
  13. Spring Cloud (3) | spring cloud bus 消息总线kafka应用
  14. Spring Cloud (1) | java.net.UnknownHostException: eureka-server


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部