本文环境如下:
操作系统:CentOS 6 32位
JDK版本:1.8.0_77 32位
Kafka版本:0.9.0.1(Scala 2.11)
1. maven依赖包
1
2
3
4
5
|
< dependency > < groupId >org.apache.kafka</ groupId > < artifactId >kafka-clients</ artifactId > < version >0.9.0.1</ version > </ dependency > |
2. 生产者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
package com.lnho.example.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put( "bootstrap.servers" , "master:9092" ); props.put( "acks" , "all" ); props.put( "retries" , 0 ); props.put( "batch.size" , 16384 ); props.put( "linger.ms" , 1 ); props.put( "buffer.memory" , 33554432 ); props.put( "key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); props.put( "value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); Producer<String, String> producer = new KafkaProducer<>(props); for ( int i = 0 ; i < 100 ; i++) producer.send( new ProducerRecord<>( "topic1" , Integer.toString(i), Integer.toString(i))); producer.close(); } } |
3. 消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
package com.lnho.example.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put( "bootstrap.servers" , "master:9092" ); props.put( "group.id" , "test" ); props.put( "enable.auto.commit" , "true" ); props.put( "auto.commit.interval.ms" , "1000" ); props.put( "session.timeout.ms" , "30000" ); props.put( "key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put( "value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList( "topic1" )); while ( true ) { ConsumerRecords<String, String> records = consumer.poll( 100 ); for (ConsumerRecord<String, String> record : records) System.out.printf( "offset = %d, key = %s, value = %s\n" , record.offset(), record.key(), record.value()); } } } |
4. 执行程序
lib底下需要有:kafka-clients-0.9.0.1.jar log4j-1.2.17.jar slf4j-api-1.7.6.jar slf4j-log4j12-1.7.6.jar
生产者:
java -classpath kafka-example-1.0-SNAPSHOT.jar:lib/* com.lnho.example.kafka.KafkaProducerExample
消费者:
java -classpath kafka-example-1.0-SNAPSHOT.jar:lib/* com.lnho.example.kafka.KafkaConsumerExample
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://blog.csdn.net/lnho2015/article/details/51353936