Skip to content

Commit 30fc472

Browse files
refactor(backend): split replica and pool svc
This tidies up the impl of the different backend types a bit and hopefully will make it easier to add other backend types. Co-authored-by: Akhil Mohan <akhil.mohan@mayadata.io> Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
1 parent a12a6f2 commit 30fc472

File tree

7 files changed

+1168
-868
lines changed

7 files changed

+1168
-868
lines changed

io-engine/src/grpc/mod.rs

+8
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ pub mod v1 {
8585
pub mod snapshot_rebuild;
8686
pub mod stats;
8787
pub mod test;
88+
pub mod lvm {
89+
pub mod pool;
90+
pub mod replica;
91+
}
92+
pub mod lvs {
93+
pub mod pool;
94+
pub mod replica;
95+
}
8896
}
8997

9098
/// Default timeout for gRPC calls, in seconds. Should be enforced in case

io-engine/src/grpc/v1/lvm/pool.rs

+178
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
use crate::{
2+
grpc::{lvm_enabled, GrpcResult},
3+
lvm::{CmnQueryArgs, Error as LvmError, VolumeGroup},
4+
lvs::Lvs,
5+
pool_backend::PoolArgs,
6+
};
7+
use io_engine_api::v1::pool::*;
8+
use std::{convert::TryFrom, fmt::Debug};
9+
use tonic::{Request, Response, Status};
10+
11+
#[derive(Debug)]
12+
struct UnixStream(tokio::net::UnixStream);
13+
14+
#[derive(Debug, Clone)]
15+
pub(crate) struct PoolService {}
16+
17+
impl PoolService {
18+
pub(crate) fn new() -> Self {
19+
Self {}
20+
}
21+
pub(crate) async fn list_svc_pools(
22+
&self,
23+
args: &ListPoolOptions,
24+
) -> Result<Vec<Pool>, tonic::Status> {
25+
lvm_enabled()?;
26+
27+
let pools = VolumeGroup::list(
28+
&CmnQueryArgs::ours()
29+
.named_opt(&args.name)
30+
.uuid_opt(&args.uuid),
31+
)
32+
.await?;
33+
Ok(pools.into_iter().map(Into::into).collect())
34+
}
35+
}
36+
37+
#[tonic::async_trait]
38+
impl PoolRpc for PoolService {
39+
async fn create_pool(
40+
&self,
41+
request: Request<CreatePoolRequest>,
42+
) -> GrpcResult<Pool> {
43+
lvm_enabled()?;
44+
45+
let args = PoolArgs::try_from(request.into_inner())?;
46+
47+
// bail if an lvs pool already exists with the same name
48+
if let Some(_pool) = Lvs::lookup(args.name.as_str()) {
49+
return Err(Status::invalid_argument(
50+
"lvs pool with the same name already exists",
51+
));
52+
};
53+
// check if the disks are used by existing lvs pool
54+
if Lvs::iter()
55+
.map(|l| l.base_bdev().name().to_string())
56+
.any(|d| args.disks.contains(&d))
57+
{
58+
return Err(Status::invalid_argument(
59+
"an lvs pool already uses the disk",
60+
));
61+
};
62+
VolumeGroup::import_or_create(args)
63+
.await
64+
.map_err(Status::from)
65+
.map(Pool::from)
66+
.map(Response::new)
67+
}
68+
69+
async fn destroy_pool(
70+
&self,
71+
request: Request<DestroyPoolRequest>,
72+
) -> GrpcResult<()> {
73+
lvm_enabled()?;
74+
75+
let args = request.into_inner();
76+
let pool = VolumeGroup::lookup(
77+
CmnQueryArgs::ours().named(&args.name).uuid_opt(&args.uuid),
78+
)
79+
.await?;
80+
pool.destroy()
81+
.await
82+
.map_err(Status::from)
83+
.map(Response::new)
84+
}
85+
86+
async fn export_pool(
87+
&self,
88+
request: Request<ExportPoolRequest>,
89+
) -> GrpcResult<()> {
90+
lvm_enabled()?;
91+
92+
let args = request.into_inner();
93+
let mut pool = VolumeGroup::lookup(
94+
CmnQueryArgs::ours().named(&args.name).uuid_opt(&args.uuid),
95+
)
96+
.await?;
97+
pool.export().await?;
98+
return Ok(Response::new(()));
99+
}
100+
101+
async fn import_pool(
102+
&self,
103+
request: Request<ImportPoolRequest>,
104+
) -> GrpcResult<Pool> {
105+
lvm_enabled()?;
106+
107+
let args = PoolArgs::try_from(request.into_inner())?;
108+
109+
// bail if an lvs pool already exists with the same name
110+
if let Some(_pool) = Lvs::lookup(args.name.as_str()) {
111+
return Err(Status::invalid_argument(
112+
"lvs pool with the same name already exists",
113+
));
114+
};
115+
// check if the disks are used by existing lvs pool
116+
if Lvs::iter()
117+
.map(|l| l.base_bdev().name().to_string())
118+
.any(|d| args.disks.contains(&d))
119+
{
120+
return Err(Status::invalid_argument(
121+
"an lvs pool already uses the disk",
122+
));
123+
};
124+
VolumeGroup::import(args)
125+
.await
126+
.map_err(Status::from)
127+
.map(Pool::from)
128+
.map(Response::new)
129+
}
130+
131+
async fn list_pools(
132+
&self,
133+
_request: Request<ListPoolOptions>,
134+
) -> GrpcResult<ListPoolsResponse> {
135+
unimplemented!("Request is not cloneable, so we have to use another fn")
136+
}
137+
}
138+
139+
impl From<LvmError> for Status {
140+
fn from(e: LvmError) -> Self {
141+
match e {
142+
LvmError::InvalidPoolType {
143+
..
144+
}
145+
| LvmError::VgUuidSet {
146+
..
147+
}
148+
| LvmError::DisksMismatch {
149+
..
150+
} => Status::invalid_argument(e.to_string()),
151+
LvmError::NotFound {
152+
..
153+
}
154+
| LvmError::LvNotFound {
155+
..
156+
} => Status::not_found(e.to_string()),
157+
LvmError::NoSpace {
158+
..
159+
} => Status::resource_exhausted(e.to_string()),
160+
_ => Status::internal(e.to_string()),
161+
}
162+
}
163+
}
164+
impl From<VolumeGroup> for Pool {
165+
fn from(v: VolumeGroup) -> Self {
166+
Self {
167+
uuid: v.uuid().to_string(),
168+
name: v.name().into(),
169+
disks: v.disks(),
170+
state: PoolState::PoolOnline.into(),
171+
capacity: v.capacity(),
172+
used: v.used(),
173+
pooltype: PoolType::Lvm as i32,
174+
committed: v.committed(),
175+
cluster_size: v.cluster_size() as u32,
176+
}
177+
}
178+
}

0 commit comments

Comments
 (0)