1. maven依赖包
1
2
3
4
5
|
<dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka-clients</artifactid> <version> 0.9 . 0.1 </version> </dependency> |
2. 生产者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package com.lnho.example.kafka; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producer; import org.apache.kafka.clients.producer.producerrecord; import java.util.properties; public class kafkaproducerexample { public static void main(string[] args) { properties props = new properties(); props.put( "bootstrap.servers" , "master:9092" ); props.put( "acks" , "all" ); props.put( "retries" , 0 ); props.put( "batch.size" , 16384 ); props.put( "linger.ms" , 1 ); props.put( "buffer.memory" , 33554432 ); props.put( "key.serializer" , "org.apache.kafka.common.serialization.stringserializer" ); props.put( "value.serializer" , "org.apache.kafka.common.serialization.stringserializer" ); producer<string, string> producer = new kafkaproducer<>(props); for ( int i = 0 ; i < 100 ; i++) producer.send( new producerrecord<>( "topic1" , integer.tostring(i), integer.tostring(i))); producer.close(); } } |
3. 消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
package com.lnho.example.kafka; import org.apache.kafka.clients.consumer.consumerrecord; import org.apache.kafka.clients.consumer.consumerrecords; import org.apache.kafka.clients.consumer.kafkaconsumer; import java.util.arrays; import java.util.properties; public class kafkaconsumerexample { public static void main(string[] args) { properties props = new properties(); props.put( "bootstrap.servers" , "master:9092" ); props.put( "group.id" , "test" ); props.put( "enable.auto.commit" , "true" ); props.put( "auto.commit.interval.ms" , "1000" ); props.put( "session.timeout.ms" , "30000" ); props.put( "key.deserializer" , "org.apache.kafka.common.serialization.stringdeserializer" ); props.put( "value.deserializer" , "org.apache.kafka.common.serialization.stringdeserializer" ); kafkaconsumer<string, string> consumer = new kafkaconsumer<>(props); consumer.subscribe(arrays.aslist( "topic1" )); while ( true ) { consumerrecords<string, string> records = consumer.poll( 100 ); for (consumerrecord<string, string> record : records) system.out.printf( "offset = %d, key = %s, value = %s\n" , record.offset(), record.key(), record.value()); } } } |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/u012129558/article/details/80065817