你可能坐过Bus,但是你用过Spring Cloud Bus没?实战来袭
目录
- 第一步,首先在项目中pom.xml添加如下:
- 第二步,下载kafka并安装,地址:http://kafka.apache.org/downloads.html
- 第三步,启动zookeeper, 由于Kafka的设计中依赖了ZooKeeper
- 第四步,启动kafka, 切换到/kafka_2.12-0.11.0.1目录下
- 第五步,创建topic,切换到/kafka_2.12-0.11.0.1目录下
- 第六步,创建消息生产者
- 第七步,创建消息消费者, 打开另外一个命令窗口,切换到/kafka_2.12-0.11.0.1目录下
- 第八步,在config-server工程中,application.properties中添加如下kafka配置:
Spring Cloud Bus 使用kafka实现消息总线
第一步,首先在项目中pom.xml添加如下:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-kafka</artifactId></dependency>
第二步,下载kafka并安装,地址:http://kafka.apache.org/downloads.html
选择Binary downloads中的
Scala 2.12 - kafka_2.12-1.0.0.tgz (asc, sha512)
这个版本,并解压:
tar xf kafka_2.12-0.11.0.1.tgz
第三步,启动zookeeper, 由于Kafka的设计中依赖了ZooKeeper
所以我们可以在bin和config目录中除了看到Kafka相关的内容之外,还有ZooKeeper相关的内容。其中bin目录存放了Kafka和ZooKeeper的命令行工具,bin根目录下是适用于Linux/Unix的shell,而bin/windows下的则是适用于windows下的bat.
切换到/kafka_2.12-0.11.0.1目录下:
sh bin/zookeeper-server-start.sh config/zookeeper.properties
第四步,启动kafka, 切换到/kafka_2.12-0.11.0.1目录下
sh bin/kafka-server-start.sh config/server.properties
该命令也需要指定Kafka配置文件的正确位置,如上命令中指向了解压目录包含的默认配置。若在测试时,使用外部集中环境的ZooKeeper的话,我们可以在该配置文件中通过zookeeper.connect参数来设置ZooKeeper的地址和端口,它默认会连接本地2181端口的ZooKeeper;如果需要设置多个ZooKeeper节点,可以为这个参数配置多个ZooKeeper地址,并用逗号分割。比如:zookeeper.connect=127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
第五步,创建topic,切换到/kafka_2.12-0.11.0.1目录下
sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sports_test
第六步,创建消息生产者
sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sports_test
然后输入:
> welcome to Shanghai
第七步,创建消息消费者, 打开另外一个命令窗口,切换到/kafka_2.12-0.11.0.1目录下
sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sports_test --from-beginning
就打印出:
welcome to Shanghai
到此,kafka的消息服务已经实现,接下来就是整合spring cloud bus,集成到spring cloud config server 和spring cloud config client项目中,可以使用命令
sh bin/kafka-topics.sh --list --zookeeper localhost:2181
查看消息服务列表。
第八步,在config-server工程中,application.properties中添加如下kafka配置:
#kafka
spring.cloud.stream.kafka.binder.zk-nodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9092
备注:
#Kafka的服务端列表
spring.cloud.stream.kafka.binder.brokers=localhost
#Kafka服务端的默认端口,当brokers属性中没有配置端口信息时,就会使用这个默认端口
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
#Kafka服务端连接的ZooKeeper节点列表
spring.cloud.stream.kafka.binder.zkNodes=localhost
#ZooKeeper节点的默认端口,当zkNodes属性中没有配置端口信息时,就会使用这个默认端口
spring.cloud.stream.kafka.binder.defaultZkPort=2181
更多系列文章推荐:
- Spring Cloud (20) | Spring Cloud Bus 使用kafka消息总线、gitlab添加webhooks实现自动刷新配置
- Spring Cloud (19) | Eureka Server 高可用服务注册中心
- Spring Cloud (18) | 给Eureka Server加上安全验证
- Spring Cloud (15) | Spring Boot、HikariCP、Mybatis和MySQL 配置HikariCP数据库连接池
- Spring Cloud (14) | 微服务不能从git/github/gitlab中获取数据库信息 can’t load properties from git/github/gitlab
- Spring Cloud (12) | Spring Cloud Zuul网关调用微服务,request请求参数是application/json
- Spring Cloud (11) | healthcheck开启健康检查
- Spring Cloud (10) | Eureka 各项参数详解
- Spring Cloud (8) | 把Spring Boot项目改造成tomcat容器启动
- Spring Cloud (7) | Mongodb 微服务
- Spring Cloud (6) | spring cloud zuul 跨域问题No ‘Access-Control-Allow-Origin’ header
- Spring Cloud (5) | 配置中心 Config Server 配置git or github or gitlab
- Spring Cloud (3) | spring cloud bus 消息总线kafka应用
- Spring Cloud (1) | java.net.UnknownHostException: eureka-server
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!