Skip to content

feat(replica_stats): adding implementation #1598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 132 additions & 57 deletions io-engine/src/bin/io-engine-client/v1/stats_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ pub fn subcommands() -> Command {
.help("Volume target/nexus name"),
);

let replica = Command::new("replica").about("Get Replica IO Stats").arg(
Arg::new("name")
.required(false)
.index(1)
.help("Replica name"),
);

let reset = Command::new("reset").about("Reset all resource IO Stats");

Command::new("stats")
Expand All @@ -32,13 +39,15 @@ pub fn subcommands() -> Command {
.about("Resource IOStats")
.subcommand(pool)
.subcommand(nexus)
.subcommand(replica)
.subcommand(reset)
}

pub async fn handler(ctx: Context, matches: &ArgMatches) -> crate::Result<()> {
match matches.subcommand().unwrap() {
("pool", args) => pool(ctx, args).await,
("nexus", args) => nexus(ctx, args).await,
("replica", args) => replica(ctx, args).await,
("reset", _) => reset(ctx).await,
(cmd, _) => {
Err(Status::not_found(format!("command {cmd} does not exist")))
Expand Down Expand Up @@ -84,36 +93,26 @@ async fn pool(mut ctx: Context, matches: &ArgMatches) -> crate::Result<()> {

let table = stats
.iter()
.map(|p| {
let read_latency =
ticks_to_time(p.read_latency_ticks, p.tick_rate);
let write_latency =
ticks_to_time(p.write_latency_ticks, p.tick_rate);
let unmap_latency =
ticks_to_time(p.unmap_latency_ticks, p.tick_rate);
let max_read_latency =
ticks_to_time(p.max_read_latency_ticks, p.tick_rate);
let min_read_latency =
ticks_to_time(p.min_read_latency_ticks, p.tick_rate);
let max_write_latency =
ticks_to_time(p.max_write_latency_ticks, p.tick_rate);
let min_write_latency =
ticks_to_time(p.min_write_latency_ticks, p.tick_rate);
.map(|stats| {
let tick_rate = stats.tick_rate;
let ticks_time = |ticks| -> String {
ticks_to_time(ticks, tick_rate).to_string()
};
vec![
p.name.clone(),
p.num_read_ops.to_string(),
adjust_bytes(p.bytes_read),
p.num_write_ops.to_string(),
adjust_bytes(p.bytes_written),
p.num_unmap_ops.to_string(),
adjust_bytes(p.bytes_unmapped),
read_latency.to_string(),
write_latency.to_string(),
unmap_latency.to_string(),
max_read_latency.to_string(),
min_read_latency.to_string(),
max_write_latency.to_string(),
min_write_latency.to_string(),
stats.name.clone(),
stats.num_read_ops.to_string(),
adjust_bytes(stats.bytes_read),
stats.num_write_ops.to_string(),
adjust_bytes(stats.bytes_written),
stats.num_unmap_ops.to_string(),
adjust_bytes(stats.bytes_unmapped),
ticks_time(stats.read_latency_ticks),
ticks_time(stats.write_latency_ticks),
ticks_time(stats.unmap_latency_ticks),
ticks_time(stats.max_read_latency_ticks),
ticks_time(stats.min_read_latency_ticks),
ticks_time(stats.max_write_latency_ticks),
ticks_time(stats.min_write_latency_ticks),
]
})
.collect();
Expand Down Expand Up @@ -176,38 +175,114 @@ async fn nexus(mut ctx: Context, matches: &ArgMatches) -> crate::Result<()> {
return Ok(());
}

let table = stats
.iter()
.map(|stats| {
let tick_rate = stats.tick_rate;
let ticks_time = |ticks| -> String {
ticks_to_time(ticks, tick_rate).to_string()
};
vec![
stats.name.clone(),
stats.num_read_ops.to_string(),
adjust_bytes(stats.bytes_read),
stats.num_write_ops.to_string(),
adjust_bytes(stats.bytes_written),
stats.num_unmap_ops.to_string(),
adjust_bytes(stats.bytes_unmapped),
ticks_time(stats.read_latency_ticks),
ticks_time(stats.write_latency_ticks),
ticks_time(stats.unmap_latency_ticks),
ticks_time(stats.max_read_latency_ticks),
ticks_time(stats.min_read_latency_ticks),
ticks_time(stats.max_write_latency_ticks),
ticks_time(stats.min_write_latency_ticks),
]
})
.collect();
ctx.print_list(
vec![
"NAME",
"NUM_RD_OPS",
"TOTAL_RD",
"NUM_WR_OPS",
"TOTAL_WR",
"NUM_UNMAP_OPS",
"TOTAL_UNMAPPED",
"RD_LAT",
"WR_LAT",
"UNMAP_LATENCY",
"MAX_RD_LAT",
"MIN_RD_LAT",
"MAX_WR_LAT",
"MIN_WR_LAT",
],
table,
);
}
};
Ok(())
}

async fn replica(mut ctx: Context, matches: &ArgMatches) -> crate::Result<()> {
ctx.v2("Requesting Replica metrics");
let replica_name = matches.get_one::<String>("name");
let response = ctx
.v1
.stats
.get_replica_io_stats(v1rpc::stats::ListStatsOption {
name: replica_name.cloned(),
})
.await
.context(GrpcStatus)?;
match ctx.output {
OutputFormat::Json => {
println!(
"{}",
serde_json::to_string_pretty(response.get_ref())
.unwrap()
.to_colored_json_auto()
.unwrap()
);
}
OutputFormat::Default => {
let stats: &Vec<v1rpc::stats::ReplicaIoStats> =
&response.get_ref().stats;
if stats.is_empty() {
if let Some(name) = replica_name {
ctx.v1(&format!(
"No IoStats found for {}, Check if device exists",
name
));
} else {
ctx.v1("No Replica IoStats found");
}
return Ok(());
}

let table = stats
.iter()
.map(|p| {
let read_latency =
ticks_to_time(p.read_latency_ticks, p.tick_rate);
let write_latency =
ticks_to_time(p.write_latency_ticks, p.tick_rate);
let unmap_latency =
ticks_to_time(p.unmap_latency_ticks, p.tick_rate);
let max_read_latency =
ticks_to_time(p.max_read_latency_ticks, p.tick_rate);
let min_read_latency =
ticks_to_time(p.min_read_latency_ticks, p.tick_rate);
let max_write_latency =
ticks_to_time(p.max_write_latency_ticks, p.tick_rate);
let min_write_latency =
ticks_to_time(p.min_write_latency_ticks, p.tick_rate);
let io_stat = p.stats.as_ref().unwrap();
let tick_rate = io_stat.tick_rate;
let ticks_time = |ticks| -> String {
ticks_to_time(ticks, tick_rate).to_string()
};
vec![
p.name.clone(),
p.num_read_ops.to_string(),
adjust_bytes(p.bytes_read),
p.num_write_ops.to_string(),
adjust_bytes(p.bytes_written),
p.num_unmap_ops.to_string(),
adjust_bytes(p.bytes_unmapped),
read_latency.to_string(),
write_latency.to_string(),
unmap_latency.to_string(),
max_read_latency.to_string(),
min_read_latency.to_string(),
max_write_latency.to_string(),
min_write_latency.to_string(),
io_stat.name.clone(),
io_stat.num_read_ops.to_string(),
adjust_bytes(io_stat.bytes_read),
io_stat.num_write_ops.to_string(),
adjust_bytes(io_stat.bytes_written),
io_stat.num_unmap_ops.to_string(),
adjust_bytes(io_stat.bytes_unmapped),
ticks_time(io_stat.read_latency_ticks),
ticks_time(io_stat.write_latency_ticks),
ticks_time(io_stat.unmap_latency_ticks),
ticks_time(io_stat.max_read_latency_ticks),
ticks_time(io_stat.min_read_latency_ticks),
ticks_time(io_stat.max_write_latency_ticks),
ticks_time(io_stat.min_write_latency_ticks),
]
})
.collect();
Expand Down
3 changes: 3 additions & 0 deletions io-engine/src/core/logical_volume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub trait LogicalVolume {
/// Returns the pool uuid of the Logical Volume
fn pool_uuid(&self) -> String;

/// Returns entity id of the Logical Volume.
fn entity_id(&self) -> Option<String>;

/// Returns a boolean indicating if the Logical Volume is thin provisioned
fn is_thin(&self) -> bool;

Expand Down
2 changes: 1 addition & 1 deletion io-engine/src/grpc/v1/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl From<Lvol> for Replica {
l.blob_checked(),
CloneXattrs::SourceUuid.name(),
),
entity_id: Lvol::get_blob_xattr(l.blob_checked(), "entity_id"),
entity_id: l.entity_id(),
}
}
}
Expand Down
62 changes: 59 additions & 3 deletions io-engine/src/grpc/v1/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use crate::{
};
use futures::{future::join_all, FutureExt};
use io_engine_api::v1::stats::*;
use std::fmt::Debug;
use std::{convert::TryFrom, fmt::Debug, panic::AssertUnwindSafe};
use tonic::{Request, Response, Status};

use crate::{
bdev::{nexus, Nexus},
core::{BlockDeviceIoStats, CoreError, UntypedBdev},
core::{BlockDeviceIoStats, CoreError, LogicalVolume, UntypedBdev},
lvs::{Lvol, LvsLvol},
};
use ::function_name::named;
use std::panic::AssertUnwindSafe;

/// RPC service for Resource IoStats.
#[derive(Debug)]
Expand Down Expand Up @@ -216,6 +216,50 @@ impl StatsRpc for StatsService {
.await
}

async fn get_replica_io_stats(
&self,
request: Request<ListStatsOption>,
) -> GrpcResult<ReplicaIoStatsResponse> {
self.shared(self.replica_svc.rw_lock().await, async move {
let args = request.into_inner();
let rx = rpc_submit::<_, _, CoreError>(async move {
let replica_stats_future: Vec<_> = if let Some(name) = args.name
{
UntypedBdev::bdev_first()
.and_then(|bdev| {
bdev.into_iter().find(|b| {
b.driver() == "lvol" && b.name() == name
})
})
.and_then(|b| Lvol::try_from(b).ok())
.map(|lvol| vec![replica_stats(lvol)])
.unwrap_or_default()
} else {
let mut lvols = Vec::new();
if let Some(bdev) = UntypedBdev::bdev_first() {
lvols = bdev
.into_iter()
.filter(|b| b.driver() == "lvol")
.map(|b| Lvol::try_from(b).unwrap())
.collect();
}
lvols.into_iter().map(replica_stats).collect()
};
let replica_stats: Result<Vec<_>, _> =
join_all(replica_stats_future).await.into_iter().collect();
let replica_stats = replica_stats?;
Ok(ReplicaIoStatsResponse {
stats: replica_stats,
})
})?;
rx.await
.map_err(|_| Status::cancelled("cancelled"))?
.map_err(Status::from)
.map(Response::new)
})
.await
}

#[named]
async fn reset_io_stats(&self, request: Request<()>) -> GrpcResult<()> {
self.locked(
Expand Down Expand Up @@ -278,6 +322,18 @@ async fn get_stats(
Ok(IoStats::from(ExternalType((name, uuid, stats))))
}

/// Returns IoStats for a given Lvol(Replica).
async fn replica_stats(lvol: Lvol) -> Result<ReplicaIoStats, CoreError> {
let stats = lvol.as_bdev().stats_async().await?;
let io_stat =
IoStats::from(ExternalType((lvol.name(), lvol.uuid(), stats)));
let replica_stat = ReplicaIoStats {
stats: Some(io_stat),
entity_id: lvol.entity_id(),
};
Ok(replica_stat)
}

/// Returns IoStats for a given Nexus.
async fn nexus_stats(
name: String,
Expand Down
5 changes: 5 additions & 0 deletions io-engine/src/lvs/lvs_lvol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,11 @@ impl LogicalVolume for Lvol {
self.lvs().uuid()
}

/// Returns entity id of the Logical Volume.
fn entity_id(&self) -> Option<String> {
Lvol::get_blob_xattr(self.blob_checked(), "entity_id")
}

/// Returns a boolean indicating if the Logical Volume is thin provisioned.
fn is_thin(&self) -> bool {
unsafe { spdk_blob_is_thin_provisioned(self.blob_checked()) }
Expand Down
2 changes: 1 addition & 1 deletion utils/dependencies