-
Notifications
You must be signed in to change notification settings - Fork 293
/
Copy pathmetadata.rs
125 lines (110 loc) · 3.83 KB
/
metadata.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::time::Duration;
use clap::{value_t, App, Arg};
use log::trace;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, Consumer};
use crate::example_utils::setup_logger;
mod example_utils;
fn print_metadata(brokers: &str, topic: Option<&str>, timeout: Duration, fetch_offsets: bool) {
let consumer: BaseConsumer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.create()
.expect("Consumer creation failed");
trace!("Consumer created");
let metadata = consumer
.fetch_metadata(topic, timeout)
.expect("Failed to fetch metadata");
let mut message_count = 0;
println!("Cluster information:");
println!(" Broker count: {}", metadata.brokers().len());
println!(" Topics count: {}", metadata.topics().len());
println!(" Metadata broker name: {}", metadata.orig_broker_name());
println!(" Metadata broker id: {}\n", metadata.orig_broker_id());
println!("Brokers:");
for broker in metadata.brokers() {
println!(
" Id: {} Host: {}:{} ",
broker.id(),
broker.host(),
broker.port()
);
}
println!("\nTopics:");
for topic in metadata.topics() {
println!(" Topic: {} Err: {:?}", topic.name(), topic.error());
for partition in topic.partitions() {
println!(
" Partition: {} Leader: {} Replicas: {:?} ISR: {:?} Err: {:?}",
partition.id(),
partition.leader(),
partition.replicas(),
partition.isr(),
partition.error()
);
if fetch_offsets {
let (low, high) = consumer
.fetch_watermarks(topic.name(), partition.id(), Duration::from_secs(1))
.unwrap_or((-1, -1));
println!(
" Low watermark: {} High watermark: {} (difference: {})",
low,
high,
high - low
);
message_count += high - low;
}
}
if fetch_offsets {
println!(" Total message count: {}", message_count);
}
}
}
fn main() {
let matches = App::new("metadata fetch example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Fetch and print the cluster metadata")
.arg(
Arg::with_name("brokers")
.short("b")
.long("brokers")
.help("Broker list in kafka format")
.takes_value(true)
.default_value("localhost:9092"),
)
.arg(
Arg::with_name("offsets")
.long("offsets")
.help("Enables offset fetching"),
)
.arg(
Arg::with_name("topic")
.long("topic")
.help("Only fetch the metadata of the specified topic")
.takes_value(true),
)
.arg(
Arg::with_name("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')")
.takes_value(true),
)
.arg(
Arg::with_name("timeout")
.long("timeout")
.help("Metadata fetch timeout in milliseconds")
.takes_value(true)
.default_value("60000"),
)
.get_matches();
setup_logger(true, matches.value_of("log-conf"));
let brokers = matches.value_of("brokers").unwrap();
let timeout = value_t!(matches, "timeout", u64).unwrap();
let topic = matches.value_of("topic");
let fetch_offsets = matches.is_present("offsets");
print_metadata(
brokers,
topic,
Duration::from_millis(timeout),
fetch_offsets,
);
}