diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkValidationHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkValidationHandler.java index 788f1f6d0f89b..cc7ef3b2a5611 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkValidationHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkValidationHandler.java @@ -43,10 +43,13 @@ public void handle(ConnectorServiceProto.ValidateSinkRequest request) { ConnectorServiceProto.ValidateSinkResponse.newBuilder() .setError( ConnectorServiceProto.ValidationError.newBuilder() - .setErrorMessage(e.toString()) + .setErrorMessage(e.getMessage()) .build()) .build()); responseObserver.onCompleted(); } + + responseObserver.onNext(ConnectorServiceProto.ValidateSinkResponse.newBuilder().build()); + responseObserver.onCompleted(); } } diff --git a/src/common/src/util/addr.rs b/src/common/src/util/addr.rs index a3050a8f66149..6cbd448405107 100644 --- a/src/common/src/util/addr.rs +++ b/src/common/src/util/addr.rs @@ -15,10 +15,9 @@ use std::net::SocketAddr; use std::str::FromStr; +use anyhow::anyhow; use risingwave_pb::common::HostAddress as ProstHostAddress; -use crate::error::{internal_error, Result}; - /// General host address and port. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct HostAddr { @@ -41,33 +40,33 @@ impl From for HostAddr { } impl TryFrom<&str> for HostAddr { - type Error = crate::error::RwError; + type Error = anyhow::Error; - fn try_from(s: &str) -> Result { - let addr = url::Url::parse(&format!("http://{}", s)) - .map_err(|e| internal_error(format!("{}: {}", e, s)))?; + fn try_from(s: &str) -> Result { + let addr = + url::Url::parse(&format!("http://{}", s)).map_err(|e| anyhow!("{}: {}", e, s))?; Ok(HostAddr { host: addr .host() - .ok_or_else(|| internal_error("invalid host"))? + .ok_or_else(|| anyhow!("invalid host"))? .to_string(), - port: addr.port().ok_or_else(|| internal_error("invalid port"))?, + port: addr.port().ok_or_else(|| anyhow!("invalid port"))?, }) } } impl TryFrom<&String> for HostAddr { - type Error = crate::error::RwError; + type Error = anyhow::Error; - fn try_from(s: &String) -> Result { + fn try_from(s: &String) -> Result { Self::try_from(s.as_str()) } } impl FromStr for HostAddr { - type Err = crate::error::RwError; + type Err = anyhow::Error; - fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> Result { Self::try_from(s) } } diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index d34e9c612f1a7..dbcca4ca5a50f 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -17,7 +17,9 @@ pub mod desc; use std::collections::HashMap; use itertools::Itertools; -use risingwave_common::catalog::{ColumnCatalog, DatabaseId, SchemaId, TableId, UserId}; +use risingwave_common::catalog::{ + ColumnCatalog, DatabaseId, Field, Schema, SchemaId, TableId, UserId, +}; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::{Sink as ProstSink, SinkType as ProstSinkType}; @@ -165,6 +167,19 @@ impl SinkCatalog { sink_type: self.sink_type.to_proto() as i32, } } + + pub fn schema(&self) -> Schema { + let fields = self + .columns + .iter() + .map(|column| Field::from(column.column_desc.clone())) + .collect_vec(); + Schema { fields } + } + + pub fn pk_indices(&self) -> Vec { + self.pk.iter().map(|k| k.column_index).collect_vec() + } } impl From for SinkCatalog { diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 012d3fce5d278..57fb456de880f 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -31,7 +31,7 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; pub use tracing; -use self::catalog::SinkType; +use self::catalog::{SinkCatalog, SinkType}; use crate::sink::console::{ConsoleConfig, ConsoleSink, CONSOLE_SINK}; use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK}; use crate::sink::redis::{RedisConfig, RedisSink}; @@ -160,6 +160,37 @@ impl SinkImpl { SinkConfig::BlackHole => SinkImpl::Blackhole, }) } + + pub async fn validate( + cfg: SinkConfig, + sink_catalog: SinkCatalog, + connector_rpc_endpoint: Option, + ) -> Result<()> { + match cfg { + SinkConfig::Redis(cfg) => RedisSink::new(cfg, sink_catalog.schema()).map(|_| ()), + SinkConfig::Kafka(cfg) => { + // We simply call `KafkaSink::new` here to validate a Kafka sink. + if sink_catalog.sink_type.is_append_only() { + KafkaSink::::new(*cfg, sink_catalog.schema(), sink_catalog.pk_indices()) + .await + .map(|_| ()) + } else { + KafkaSink::::new(*cfg, sink_catalog.schema(), sink_catalog.pk_indices()) + .await + .map(|_| ()) + } + } + SinkConfig::Remote(cfg) => { + if sink_catalog.sink_type.is_append_only() { + RemoteSink::::validate(cfg, sink_catalog, connector_rpc_endpoint).await + } else { + RemoteSink::::validate(cfg, sink_catalog, connector_rpc_endpoint).await + } + } + SinkConfig::Console(_) => Ok(()), + SinkConfig::BlackHole => Ok(()), + } + } } macro_rules! impl_sink { @@ -222,7 +253,7 @@ pub enum SinkError { impl From for SinkError { fn from(value: RpcError) -> Self { - SinkError::Remote(format!("{:?}", value)) + SinkError::Remote(format!("{}", value)) } } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 266a2425622d0..198d7081a2d58 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -13,65 +13,65 @@ // limitations under the License. use std::collections::HashMap; -use std::time::Duration; use anyhow::anyhow; use async_trait::async_trait; +use itertools::Itertools; use risingwave_common::array::StreamChunk; #[cfg(test)] use risingwave_common::catalog::Field; use risingwave_common::catalog::Schema; -use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE}; use risingwave_common::row::Row; #[cfg(test)] use risingwave_common::types::DataType; use risingwave_common::types::{DatumRef, ScalarRefImpl}; +use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_pb::connector_service::connector_service_client::ConnectorServiceClient; use risingwave_pb::connector_service::sink_stream_request::write_batch::json_payload::RowOp; use risingwave_pb::connector_service::sink_stream_request::write_batch::{JsonPayload, Payload}; use risingwave_pb::connector_service::sink_stream_request::{ - Request as SinkRequest, StartEpoch, StartSink, SyncBatch, WriteBatch, + Request as SinkRequest, StartEpoch, SyncBatch, WriteBatch, }; use risingwave_pb::connector_service::table_schema::Column; -use risingwave_pb::connector_service::{SinkConfig, SinkResponse, SinkStreamRequest, TableSchema}; +use risingwave_pb::connector_service::{SinkResponse, SinkStreamRequest, TableSchema}; +use risingwave_rpc_client::ConnectorClient; use serde_json::Value; use serde_json::Value::Number; -use tokio::sync::mpsc; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::StreamExt; -use tonic::transport::{Channel, Endpoint}; -use tonic::{Request, Status, Streaming}; +use tonic::{Status, Streaming}; +use super::catalog::SinkCatalog; use crate::sink::{Result, Sink, SinkError}; use crate::ConnectorParams; pub const VALID_REMOTE_SINKS: [&str; 3] = ["jdbc", "file", "iceberg"]; -pub fn is_valid_remote_sink(sink_type: &str) -> bool { - VALID_REMOTE_SINKS.contains(&sink_type) +pub fn is_valid_remote_sink(connector_type: &str) -> bool { + VALID_REMOTE_SINKS.contains(&connector_type) } #[derive(Clone, Debug)] pub struct RemoteConfig { - pub sink_type: String, + pub connector_type: String, pub properties: HashMap, } impl RemoteConfig { pub fn from_hashmap(values: HashMap) -> Result { - let sink_type = values + let connector_type = values .get("connector") .expect("sink type must be specified") .to_string(); - if !is_valid_remote_sink(sink_type.as_str()) { - return Err(SinkError::Config(anyhow!("invalid sink type: {sink_type}"))); + if !is_valid_remote_sink(connector_type.as_str()) { + return Err(SinkError::Config(anyhow!( + "invalid connector type: {connector_type}" + ))); } Ok(RemoteConfig { - sink_type, + connector_type, properties: values, }) } @@ -102,12 +102,12 @@ impl ResponseStreamImpl { #[derive(Debug)] pub struct RemoteSink { - pub sink_type: String, + pub connector_type: String, properties: HashMap, epoch: Option, batch_id: u64, schema: Schema, - _client: Option>, + _client: Option, request_sender: Option>, response_stream: ResponseStreamImpl, } @@ -119,69 +119,40 @@ impl RemoteSink { pk_indices: Vec, connector_params: ConnectorParams, ) -> Result { - let address = format!( - "http://{}", - connector_params - .connector_rpc_endpoint - .ok_or_else(|| SinkError::Remote( - "connector sink endpoint not specified".parse().unwrap() - ))? - ); - let channel = Endpoint::from_shared(address.clone()) - .map_err(|e| { - SinkError::Remote(format!( - "invalid connector endpoint `{}`: {:?}", - &address, e - )) - })? - .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE) - .initial_stream_window_size(STREAM_WINDOW_SIZE) - .tcp_nodelay(true) - .connect_timeout(Duration::from_secs(5)) - .connect() - .await - .map_err(|e| { - SinkError::Remote(format!( - "failed to connect to connector endpoint `{}`: {:?}", - &address, e - )) - })?; // create client and start sink - let mut client = ConnectorServiceClient::new(channel); - - let (request_sender, request_receiver) = mpsc::unbounded_channel::(); - - // send initial request in case of the blocking receive call from creating streaming request - request_sender - .send(SinkStreamRequest { - request: Some(SinkRequest::Start(StartSink { - sink_config: Some(SinkConfig { - sink_type: config.sink_type.clone(), - properties: config.properties.clone(), - table_schema: Some(TableSchema { - columns: schema - .fields() - .iter() - .map(|c| Column { - name: c.name.clone(), - data_type: c.data_type().to_protobuf().type_name, - }) - .collect(), - pk_indices: pk_indices.iter().map(|i| *i as u32).collect(), - }), - }), - })), - }) - .map_err(|e| SinkError::Remote(e.to_string()))?; + let address = connector_params.connector_rpc_endpoint.ok_or_else(|| { + SinkError::Remote("connector sink endpoint not specified".parse().unwrap()) + })?; + let host_addr = HostAddr::try_from(&address).map_err(SinkError::from)?; + let client = ConnectorClient::new(host_addr).await.map_err(|err| { + SinkError::Remote(format!( + "failed to connect to connector endpoint `{}`: {:?}", + &address, err + )) + })?; - let mut response = client - .sink_stream(Request::new(UnboundedReceiverStream::new(request_receiver))) + let table_schema = Some(TableSchema { + columns: schema + .fields() + .iter() + .map(|c| Column { + name: c.name.clone(), + data_type: c.data_type().to_protobuf().type_name, + }) + .collect(), + pk_indices: pk_indices.iter().map(|i| *i as u32).collect(), + }); + let (request_sender, mut response) = client + .start_sink_stream( + config.connector_type.clone(), + config.properties.clone(), + table_schema, + ) .await - .map_err(|e| SinkError::Remote(format!("failed to start sink: {:?}", e)))? - .into_inner(); + .map_err(SinkError::from)?; let _ = response.next().await.unwrap(); Ok(RemoteSink { - sink_type: config.sink_type, + connector_type: config.connector_type, properties: config.properties, epoch: None, batch_id: 0, @@ -192,6 +163,45 @@ impl RemoteSink { }) } + pub async fn validate( + config: RemoteConfig, + sink_catalog: SinkCatalog, + connector_rpc_endpoint: Option, + ) -> Result<()> { + let address = connector_rpc_endpoint.ok_or_else(|| { + SinkError::Remote("connector sink endpoint not specified".parse().unwrap()) + })?; + let host_addr = HostAddr::try_from(&address).map_err(SinkError::from)?; + let client = ConnectorClient::new(host_addr).await.map_err(|err| { + SinkError::Remote(format!( + "failed to connect to connector endpoint `{}`: {:?}", + &address, err + )) + })?; + + let columns = sink_catalog + .columns + .iter() + .map(|column| Column { + name: column.column_desc.name.clone(), + data_type: column.column_desc.data_type.to_protobuf().type_name, + }) + .collect_vec(); + let table_schema = TableSchema { + columns, + pk_indices: sink_catalog + .pk_indices() + .iter() + .map(|i| *i as _) + .collect_vec(), + }; + + client + .validate_sink_properties(config.connector_type, config.properties, Some(table_schema)) + .await + .map_err(SinkError::from) + } + fn on_sender_alive(&mut self) -> Result<&UnboundedSender> { self.request_sender .as_ref() @@ -203,7 +213,7 @@ impl RemoteSink { response_receiver: UnboundedReceiver, request_sender: UnboundedSender, ) -> Self { - let properties = HashMap::from([("output_path".to_string(), "/tmp/rw".to_string())]); + let properties = HashMap::from([("output.path".to_string(), "/tmp/rw".to_string())]); let schema = Schema::new(vec![ Field { @@ -221,7 +231,7 @@ impl RemoteSink { ]); Self { - sink_type: "file".to_string(), + connector_type: "file".to_string(), properties, epoch: None, batch_id: 0, diff --git a/src/connector/src/source/cdc/enumerator/mod.rs b/src/connector/src/source/cdc/enumerator/mod.rs index 6858337875813..ec2227b02e147 100644 --- a/src/connector/src/source/cdc/enumerator/mod.rs +++ b/src/connector/src/source/cdc/enumerator/mod.rs @@ -38,7 +38,7 @@ impl SplitEnumerator for DebeziumSplitEnumerator { // validate connector properties cdc_client - .validate_properties( + .validate_source_properties( props.source_id as u64, props.source_type_enum()?, props.props, diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 386714290931a..c03d9944dec58 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -151,16 +151,4 @@ impl StreamingJob { None } } - - /// Returns the optional [`Source`] if this is a `Table` streaming job. - /// - /// Only used for registering sources for creating tables with connectors. - pub fn source(&self) -> Option<&Source> { - match self { - Self::MaterializedView(_) => None, - Self::Sink(_) => None, - Self::Table(source, _) => source.as_ref(), - Self::Index(_, _) => None, - } - } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 1568b08b5e6ad..e4d9c1b6b6391 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -27,7 +27,7 @@ use crate::manager::{ use crate::model::{StreamEnvironment, TableFragments}; use crate::storage::MetaStore; use crate::stream::{ - ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, + validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, CreateStreamingJobContext, GlobalStreamManagerRef, ReplaceTableContext, SourceManagerRef, StreamFragmentGraph, }; @@ -237,9 +237,18 @@ where internal_tables = ctx.internal_tables(); - if let Some(source) = stream_job.source() { - self.source_manager.register_source(source).await?; + match &stream_job { + StreamingJob::Table(Some(source), _) => { + // Register the source on the connector node. + self.source_manager.register_source(source).await?; + } + StreamingJob::Sink(sink) => { + // Validate the sink on the connector node. + validate_sink(sink, self.env.opts.connector_rpc_endpoint.clone()).await?; + } + _ => {} } + self.stream_manager .create_streaming_job(table_fragments, ctx) .await?; diff --git a/src/meta/src/rpc/service/meta_member_service.rs b/src/meta/src/rpc/service/meta_member_service.rs index cbc66d40752aa..7c82e322ccb3f 100644 --- a/src/meta/src/rpc/service/meta_member_service.rs +++ b/src/meta/src/rpc/service/meta_member_service.rs @@ -47,7 +47,10 @@ impl MetaMemberService for MetaMemberServiceImpl { Either::Left(election_client) => { let mut members = vec![]; for member in election_client.get_members().await? { - let host_addr = member.id.parse::()?; + let host_addr = member + .id + .parse::() + .map_err(|err| Status::from_error(err.into()))?; members.push(MetaMember { address: Some(HostAddress { host: host_addr.host, @@ -60,7 +63,10 @@ impl MetaMemberService for MetaMemberServiceImpl { members } Either::Right(self_as_leader) => { - let host_addr = self_as_leader.advertise_addr.parse::()?; + let host_addr = self_as_leader + .advertise_addr + .parse::() + .map_err(|err| Status::from_error(err.into()))?; vec![MetaMember { address: Some(HostAddress { host: host_addr.host, diff --git a/src/meta/src/stream/mod.rs b/src/meta/src/stream/mod.rs index 7497ae9de7b3c..79389ae477803 100644 --- a/src/meta/src/stream/mod.rs +++ b/src/meta/src/stream/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod scale; +mod sink; mod source_manager; mod stream_graph; mod stream_manager; @@ -21,6 +22,7 @@ mod test_fragmenter; mod test_scale; pub use scale::*; +pub use sink::*; pub use source_manager::*; pub use stream_graph::*; pub use stream_manager::*; diff --git a/src/meta/src/stream/sink.rs b/src/meta/src/stream/sink.rs new file mode 100644 index 0000000000000..d4ad160c7e0ab --- /dev/null +++ b/src/meta/src/stream/sink.rs @@ -0,0 +1,36 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::anyhow; +use risingwave_connector::sink::catalog::SinkCatalog; +use risingwave_connector::sink::{SinkConfig, SinkImpl}; +use risingwave_pb::catalog::Sink as ProstSinkCatalog; + +use crate::{MetaError, MetaResult}; + +pub async fn validate_sink( + prost_sink_catalog: &ProstSinkCatalog, + connector_rpc_endpoint: Option, +) -> MetaResult<()> { + let sink_catalog = SinkCatalog::from(prost_sink_catalog); + let mut properties = sink_catalog.properties.clone(); + // Insert a value as the `identifier` field to get parsed by serde. + properties.insert("identifier".to_string(), u64::MAX.to_string()); + let sink_config = SinkConfig::from_hashmap(properties) + .map_err(|err| MetaError::from(anyhow!(err.to_string())))?; + + SinkImpl::validate(sink_config, sink_catalog, connector_rpc_endpoint) + .await + .map_err(|err| MetaError::from(anyhow!(err.to_string()))) +} diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index 44eb525a44ee3..b10992e3e4e3b 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -15,27 +15,39 @@ use std::collections::HashMap; use std::time::Duration; +use anyhow::anyhow; use async_trait::async_trait; -use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; +use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE}; use risingwave_common::util::addr::HostAddr; use risingwave_pb::connector_service::connector_service_client::ConnectorServiceClient; use risingwave_pb::connector_service::get_event_stream_request::{ - Request, StartSource, ValidateProperties, + Request as SourceRequest, StartSource, ValidateProperties, }; +use risingwave_pb::connector_service::sink_stream_request::{Request as SinkRequest, StartSink}; use risingwave_pb::connector_service::*; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::transport::{Channel, Endpoint}; -use tonic::Streaming; +use tonic::{Request, Streaming}; -use crate::error::Result; +use crate::error::{Result, RpcError}; use crate::RpcClient; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ConnectorClient(ConnectorServiceClient); impl ConnectorClient { pub async fn new(host_addr: HostAddr) -> Result { - let channel = Endpoint::from_shared(format!("http://{}", &host_addr))? + let channel = Endpoint::from_shared(format!("http://{}", &host_addr)) + .map_err(|e| { + RpcError::Internal(anyhow!(format!( + "invalid connector endpoint `{}`: {:?}", + &host_addr, e + ))) + })? .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE) + .initial_stream_window_size(STREAM_WINDOW_SIZE) + .tcp_nodelay(true) .connect_timeout(Duration::from_secs(5)) .connect() .await?; @@ -54,7 +66,7 @@ impl ConnectorClient { .0 .to_owned() .get_event_stream(GetEventStreamRequest { - request: Some(Request::Start(StartSource { + request: Some(SourceRequest::Start(StartSource { source_id, source_type: source_type as _, start_offset: start_offset.unwrap_or_default(), @@ -73,7 +85,7 @@ impl ConnectorClient { } /// Validate source properties - pub async fn validate_properties( + pub async fn validate_source_properties( &self, source_id: u64, source_type: SourceType, @@ -84,7 +96,7 @@ impl ConnectorClient { .0 .to_owned() .get_event_stream(GetEventStreamRequest { - request: Some(Request::Validate(ValidateProperties { + request: Some(SourceRequest::Validate(ValidateProperties { source_id, source_type: source_type as _, properties, @@ -101,6 +113,70 @@ impl ConnectorClient { })? .into_inner()) } + + pub async fn start_sink_stream( + &self, + sink_type: String, + properties: HashMap, + table_schema: Option, + ) -> Result<(UnboundedSender, Streaming)> { + let (request_sender, request_receiver) = unbounded_channel::(); + + // Send initial request in case of the blocking receive call from creating streaming request + request_sender + .send(SinkStreamRequest { + request: Some(SinkRequest::Start(StartSink { + sink_config: Some(SinkConfig { + sink_type, + properties, + table_schema, + }), + })), + }) + .map_err(|err| RpcError::Internal(anyhow!(err.to_string())))?; + + let response = self + .0 + .to_owned() + .sink_stream(Request::new(UnboundedReceiverStream::new(request_receiver))) + .await + .map_err(RpcError::GrpcStatus)? + .into_inner(); + + Ok((request_sender, response)) + } + + pub async fn validate_sink_properties( + &self, + connector_type: String, + properties: HashMap, + table_schema: Option, + ) -> Result<()> { + let response = self + .0 + .to_owned() + .validate_sink(ValidateSinkRequest { + sink_config: Some(SinkConfig { + sink_type: connector_type, + properties, + table_schema, + }), + }) + .await + .inspect_err(|err| { + tracing::error!("failed to validate sink properties: {}", err.message()) + })? + .into_inner(); + response.error.map_or_else( + || Ok(()), // If there is no error message, return Ok here. + |err| { + Err(RpcError::Internal(anyhow!(format!( + "sink cannot pass validation: {}", + err.error_message + )))) + }, + ) + } } #[async_trait]