第1步:生成我们的项目: spring initializr 来生成我们的项目。我们的项目将提供spring mvc / web支持和apache kafka支持。
第2步:发布/读取kafka主题中的消息:
1
2
3
4
5
6
7
8
|
<b> public </b> <b> class </b> user { <b> private </b> string name; <b> private </b> <b> int </b> age; <b> public </b> user(string name, <b> int </b> age) { <b> this </b>.name = name; <b> this </b>.age = age; } } |
第3步:通过application.yml
配置文件配置kafka:
我们需要创建配置文件。我们需要以某种方式配置我们的kafka生产者和消费者,以便能够发布和读取与主题相关的消息。相比建立一个使用@configuration
标注的java类,我们可以直接使用配置文件application.properties或application.yml。spring boot让我们避免像过去一样编写的所有样板代码,同时为我们提供了更加智能的配置应用程序的方法,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
server: port: 9000 spring: kafka: consumer: bootstrap: localhost: 9092 group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.stringdeserializer value-deserializer: org.apache.kafka.common.serialization.stringdeserializer producer: bootstrap: localhost: 9092 key-serializer: org.apache.kafka.common.serialization.stringserializer value-serializer: org.apache.kafka.common.serialization.stringserializer |
第4步:创建一个生产者,创建生产者会将我们的消息写入该主题。
1
2
3
4
5
6
7
8
9
10
11
|
<b> public </b> <b> class </b> producer { <b> private </b> <b> static </b> <b> final </b> logger logger = loggerfactory.getlogger(producer.<b> class </b>); <b> private </b> <b> static </b> <b> final </b> string topic = <font> "users" </font><font>; @autowired <b> private </b> kafkatemplate<string, string> kafkatemplate; <b> public </b> <b> void </b> sendmessage(string message) { logger.info(string.format(</font><font> "#### -> producing message -> %s" </font><font>, message)); <b> this </b>.kafkatemplate.send(topic, message); } } </font> |
自动连接autowire
到 kafkatemplate
,使用它将消息发布到主题 - 这就是消息的生产者!
第5步:创建一个消费者,消费者是负责根据您自己的业务逻辑的需求阅读处理消息的消息的服务。要进行设置,请输入以下内容:
1
2
3
4
5
6
7
8
9
10
11
|
@service <b> public </b> <b> class </b> consumer { <b> private </b> <b> final </b> logger logger = loggerfactory.getlogger(producer.<b> class </b>); @kafkalistener (topics = <font> "users" </font><font>, groupid = </font><font> "group_id" </font><font>) <b> public </b> <b> void </b> consume(string message) throws ioexception { logger.info(string.format(</font><font> "#### -> consumed message -> %s" </font><font>, message)); } } </font> |
在这里,我们告诉我们的方法void consume(string message)
订阅用户的主题,并将每条消息发送到应用程序日志。在您的实际应用程序中,您可以按照业务需要的方式处理消息。
第6步:创建rest控制器,们已经拥有了能够消费kafka消息所需的全部内容。
为了充分展示我们创建的所有内容的工作原理,我们需要创建一个具有单一端点的控制器。消息将发布到此端点,然后由我们的生产者处理。然后,我们的消费者将通过登录到控制台来捕获并处理它。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@restcontroller @requestmapping (value = <font> "/kafka" </font><font>) <b> public </b> <b> class </b> kafkacontroller { <b> private </b> <b> final </b> producer producer; @autowired kafkacontroller(producer producer) { <b> this </b>.producer = producer; } @postmapping (value = </font><font> "/publish" </font><font>) <b> public </b> <b> void </b> sendmessagetokafkatopic( @requestparam (</font><font> "message" </font><font>) string message) { <b> this </b>.producer.sendmessage(message); } } </font> |
让我们使用curl将消息发送给kafka:
curl -x post -f 'message=test' http://localhost:9000/kafka/publish
基本上就是这样!在不到10个步骤中,您了解了将apache kafka添加到spring boot项目是多么容易。如果您遵循本指南,您现在知道如何将kafka集成到spring boot项目中,并且您已准备好使用这个超级工具!
总结
以上所述是小编给大家介绍的在spring boot应用程序中使用apache kafka的方法步骤详解,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
原文链接:https://www.jdon.com/50613