-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConsumerDemo.java
52 lines (40 loc) · 1.95 KB
/
ConsumerDemo.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package kafka.tutorial1;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerDemo {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(ConsumerDemo.class.getName());
String bootstrapServers = "127.0.0.1:9092";
String groupId = "my-fourth-application";
String topic = "first_topic";
//create consumer properties or configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//create consumer
KafkaConsumer<String, String> consumer =
new KafkaConsumer<String, String>(properties);
//subscribe consumer to our topic(s)
consumer.subscribe(Arrays.asList(topic));
//poll for new data
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records){
logger.info("Key: " + record.key() + ", Value: " + record.value());
logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
}
}