From 86ac808477ab66e3abf6fd5e2aa770d4437b1a42 Mon Sep 17 00:00:00 2001
From: baojinri <baojinri@gmail.com>
Date: Mon, 13 May 2024 14:46:50 +0800
Subject: [PATCH 1/8] impl basic auth

---
 src/proxy/src/auth/auth_with_file.rs       | 77 ++++++++++++++++++++++
 src/proxy/src/auth/mod.rs                  | 76 +++++++++++++++++++++
 src/proxy/src/context.rs                   | 18 +++++
 src/proxy/src/forward.rs                   | 25 ++++++-
 src/proxy/src/grpc/prom_query.rs           | 15 +++++
 src/proxy/src/grpc/route.rs                | 55 ++++++++++++----
 src/proxy/src/grpc/sql_query.rs            | 30 +++++++++
 src/proxy/src/http/prom.rs                 | 34 ++++++----
 src/proxy/src/http/sql.rs                  |  7 +-
 src/proxy/src/influxdb/mod.rs              |  2 +-
 src/proxy/src/lib.rs                       | 20 +++++-
 src/proxy/src/opentsdb/mod.rs              |  2 +-
 src/proxy/src/read.rs                      |  2 +
 src/proxy/src/write.rs                     | 16 +++++
 src/server/src/config.rs                   |  6 +-
 src/server/src/consts.rs                   |  2 -
 src/server/src/grpc/storage_service/mod.rs | 67 ++++++++++++++++---
 src/server/src/http.rs                     | 11 +++-
 src/server/src/mysql/worker.rs             |  3 +-
 src/server/src/postgresql/handler.rs       |  3 +-
 src/server/src/server.rs                   | 23 ++++++-
 21 files changed, 447 insertions(+), 47 deletions(-)
 create mode 100644 src/proxy/src/auth/auth_with_file.rs
 create mode 100644 src/proxy/src/auth/mod.rs

diff --git a/src/proxy/src/auth/auth_with_file.rs b/src/proxy/src/auth/auth_with_file.rs
new file mode 100644
index 0000000000..ba2074215c
--- /dev/null
+++ b/src/proxy/src/auth/auth_with_file.rs
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The proxy module provides features such as forwarding and authentication,
+//! adapts to different protocols.
+
+use std::{collections::HashMap, fs::File, io, io::BufRead, path::Path};
+
+use snafu::ResultExt;
+
+use crate::auth::{Auth, FileNotExisted, OpenFile, ReadLine, Result, ADMIN_TENANT};
+
+pub struct AuthWithFile {
+    file_path: String,
+    users: HashMap<String, String>,
+}
+
+impl AuthWithFile {
+    pub fn new(file_path: String) -> Self {
+        Self {
+            file_path,
+            users: HashMap::new(),
+        }
+    }
+}
+
+impl Auth for AuthWithFile {
+    /// Load credential from file
+    fn load_credential(&mut self) -> Result<()> {
+        let path = Path::new(&self.file_path);
+        if !path.exists() {
+            return FileNotExisted {
+                path: self.file_path.clone(),
+            }
+            .fail();
+        }
+
+        let file = File::open(path).context(OpenFile)?;
+        let reader = io::BufReader::new(file);
+
+        for line in reader.lines() {
+            let line = line.context(ReadLine)?;
+            if let Some((value, key)) = line.split_once(':') {
+                self.users.insert(key.to_string(), value.to_string());
+            }
+        }
+
+        Ok(())
+    }
+
+    fn identify(&self, tenant: Option<String>, token: Option<String>) -> bool {
+        if let Some(tenant) = tenant {
+            if tenant == ADMIN_TENANT {
+                return true;
+            }
+        }
+
+        match token {
+            Some(token) => self.users.contains_key(&token),
+            None => false,
+        }
+    }
+}
diff --git a/src/proxy/src/auth/mod.rs b/src/proxy/src/auth/mod.rs
new file mode 100644
index 0000000000..78552c818a
--- /dev/null
+++ b/src/proxy/src/auth/mod.rs
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The proxy module provides features such as forwarding and authentication,
+//! adapts to different protocols.
+
+use std::sync::{Arc, Mutex};
+
+use macros::define_result;
+use serde::{Deserialize, Serialize};
+use snafu::Snafu;
+
+pub mod auth_with_file;
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("Failed to open file, err:{}.", source))]
+    OpenFile { source: std::io::Error },
+
+    #[snafu(display("Failed to read line, err:{}.", source))]
+    ReadLine { source: std::io::Error },
+
+    #[snafu(display("File not existed, file path:{}", path))]
+    FileNotExisted { path: String },
+}
+
+define_result!(Error);
+
+pub type AuthRef = Arc<Mutex<dyn Auth>>;
+
+/// Header of tenant name
+pub const TENANT_HEADER: &str = "x-horaedb-access-tenant";
+/// Header of tenant name
+pub const TENANT_TOKEN_HEADER: &str = "x-horaedb-access-token";
+
+/// Admin tenant name
+pub const ADMIN_TENANT: &str = "admin";
+
+#[derive(Debug, Clone, Deserialize, Serialize, Default)]
+pub struct Config {
+    pub enable: bool,
+    pub auth_type: String,
+    pub source: String,
+}
+
+pub trait Auth: Send + Sync {
+    fn load_credential(&mut self) -> Result<()>;
+    fn identify(&self, tenant: Option<String>, token: Option<String>) -> bool;
+}
+
+#[derive(Default)]
+pub struct AuthBase;
+
+impl Auth for AuthBase {
+    fn load_credential(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    fn identify(&self, _tenant: Option<String>, _token: Option<String>) -> bool {
+        true
+    }
+}
diff --git a/src/proxy/src/context.rs b/src/proxy/src/context.rs
index cb2fcb6763..fb2c752f58 100644
--- a/src/proxy/src/context.rs
+++ b/src/proxy/src/context.rs
@@ -56,6 +56,10 @@ pub struct RequestContext {
     pub timeout: Option<Duration>,
     /// Request id
     pub request_id: RequestId,
+    /// Tenant
+    pub tenant: Option<String>,
+    /// Access token
+    pub access_token: Option<String>,
 }
 
 impl RequestContext {
@@ -69,6 +73,8 @@ pub struct Builder {
     catalog: String,
     schema: String,
     timeout: Option<Duration>,
+    tenant: Option<String>,
+    access_token: Option<String>,
 }
 
 impl Builder {
@@ -87,6 +93,16 @@ impl Builder {
         self
     }
 
+    pub fn tenant(mut self, tenant: Option<String>) -> Self {
+        self.tenant = tenant;
+        self
+    }
+
+    pub fn access_token(mut self, access_token: Option<String>) -> Self {
+        self.access_token = access_token;
+        self
+    }
+
     pub fn build(self) -> Result<RequestContext> {
         ensure!(!self.catalog.is_empty(), MissingCatalog);
         ensure!(!self.schema.is_empty(), MissingSchema);
@@ -96,6 +112,8 @@ impl Builder {
             schema: self.schema,
             timeout: self.timeout,
             request_id: RequestId::next_id(),
+            tenant: self.tenant,
+            access_token: self.access_token,
         })
     }
 }
diff --git a/src/proxy/src/forward.rs b/src/proxy/src/forward.rs
index 0d8a856f79..bc73ed5ff4 100644
--- a/src/proxy/src/forward.rs
+++ b/src/proxy/src/forward.rs
@@ -37,7 +37,10 @@ use tonic::{
     transport::{self, Channel},
 };
 
-use crate::FORWARDED_FROM;
+use crate::{
+    auth::{TENANT_HEADER, TENANT_TOKEN_HEADER},
+    FORWARDED_FROM,
+};
 
 #[derive(Debug, Snafu)]
 pub enum Error {
@@ -206,6 +209,8 @@ pub struct ForwardRequest<Req> {
     pub table: String,
     pub req: tonic::Request<Req>,
     pub forwarded_from: Option<String>,
+    pub tenant: Option<String>,
+    pub access_token: Option<String>,
 }
 
 impl Forwarder<DefaultClientBuilder> {
@@ -283,6 +288,8 @@ impl<B: ClientBuilder> Forwarder<B> {
             table,
             req,
             forwarded_from,
+            tenant,
+            access_token,
         } = forward_req;
 
         let req_pb = RouteRequestPb {
@@ -309,7 +316,7 @@ impl<B: ClientBuilder> Forwarder<B> {
             }
         };
 
-        self.forward_with_endpoint(endpoint, req, forwarded_from, do_rpc)
+        self.forward_with_endpoint(endpoint, req, forwarded_from, tenant, access_token, do_rpc)
             .await
     }
 
@@ -318,6 +325,8 @@ impl<B: ClientBuilder> Forwarder<B> {
         endpoint: Endpoint,
         mut req: tonic::Request<Req>,
         forwarded_from: Option<String>,
+        tenant: Option<String>,
+        access_token: Option<String>,
         do_rpc: F,
     ) -> Result<ForwardResult<Resp, Err>>
     where
@@ -351,6 +360,16 @@ impl<B: ClientBuilder> Forwarder<B> {
             self.local_endpoint.to_string().parse().unwrap(),
         );
 
+        if let Some(tenant) = tenant {
+            req.metadata_mut()
+                .insert(TENANT_HEADER, tenant.parse().unwrap());
+        }
+
+        if let Some(access_token) = access_token {
+            req.metadata_mut()
+                .insert(TENANT_TOKEN_HEADER, access_token.parse().unwrap());
+        }
+
         let client = self.get_or_create_client(&endpoint).await?;
         match do_rpc(client, req, &endpoint).await {
             Err(e) => {
@@ -503,6 +522,8 @@ mod tests {
                 table: table.to_string(),
                 req: query_request.into_request(),
                 forwarded_from: None,
+                tenant: None,
+                access_token: None,
             }
         };
 
diff --git a/src/proxy/src/grpc/prom_query.rs b/src/proxy/src/grpc/prom_query.rs
index 673b6131a5..0a189eb920 100644
--- a/src/proxy/src/grpc/prom_query.rs
+++ b/src/proxy/src/grpc/prom_query.rs
@@ -81,6 +81,21 @@ impl Proxy {
             msg: "Missing context",
             code: StatusCode::BAD_REQUEST,
         })?;
+
+        // Check if the tenant is authorized to access the database.
+        if !self
+            .auth
+            .lock()
+            .unwrap()
+            .identify(ctx.tenant.clone(), ctx.access_token)
+        {
+            return ErrNoCause {
+                msg: format!("tenant: {:?} unauthorized", ctx.tenant),
+                code: StatusCode::UNAUTHORIZED,
+            }
+            .fail();
+        }
+
         let schema = req_ctx.database;
         let catalog = self.instance.catalog_manager.default_catalog_name();
 
diff --git a/src/proxy/src/grpc/route.rs b/src/proxy/src/grpc/route.rs
index 0955cec2bd..1ee085bed1 100644
--- a/src/proxy/src/grpc/route.rs
+++ b/src/proxy/src/grpc/route.rs
@@ -16,30 +16,59 @@
 // under the License.
 
 use horaedbproto::storage::{RouteRequest as RouteRequestPb, RouteResponse};
+use http::StatusCode;
 use router::RouteRequest;
 
-use crate::{error, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy};
+use crate::{
+    error,
+    error::{ErrNoCause, Result},
+    metrics::GRPC_HANDLER_COUNTER_VEC,
+    Context, Proxy,
+};
 
 impl Proxy {
-    pub async fn handle_route(&self, _ctx: Context, req: RouteRequestPb) -> RouteResponse {
+    pub async fn handle_route(&self, ctx: Context, req: RouteRequestPb) -> RouteResponse {
         let request = RouteRequest::new(req, true);
-        let routes = self.route(request).await;
 
-        let mut resp = RouteResponse::default();
-        match routes {
+        match self.handle_route_internal(ctx, request).await {
+            Ok(v) => {
+                GRPC_HANDLER_COUNTER_VEC.route_succeeded.inc();
+                v
+            }
             Err(e) => {
-                GRPC_HANDLER_COUNTER_VEC.route_failed.inc();
-
                 error!("Failed to handle route, err:{e}");
-                resp.header = Some(error::build_err_header(e));
+                GRPC_HANDLER_COUNTER_VEC.route_failed.inc();
+                RouteResponse {
+                    header: Some(error::build_err_header(e)),
+                    ..Default::default()
+                }
             }
-            Ok(v) => {
-                GRPC_HANDLER_COUNTER_VEC.route_succeeded.inc();
+        }
+    }
 
-                resp.header = Some(error::build_ok_header());
-                resp.routes = v;
+    async fn handle_route_internal(
+        &self,
+        ctx: Context,
+        req: RouteRequest,
+    ) -> Result<RouteResponse> {
+        // Check if the tenant is authorized to access the database.
+        if !self
+            .auth
+            .lock()
+            .unwrap()
+            .identify(ctx.tenant.clone(), ctx.access_token.clone())
+        {
+            return ErrNoCause {
+                msg: format!("tenant: {:?} unauthorized", ctx.tenant),
+                code: StatusCode::UNAUTHORIZED,
             }
+            .fail();
         }
-        resp
+
+        let routes = self.route(req).await?;
+        Ok(RouteResponse {
+            header: Some(error::build_ok_header()),
+            routes,
+        })
     }
 }
diff --git a/src/proxy/src/grpc/sql_query.rs b/src/proxy/src/grpc/sql_query.rs
index 4a2a5d8080..cc2332ced5 100644
--- a/src/proxy/src/grpc/sql_query.rs
+++ b/src/proxy/src/grpc/sql_query.rs
@@ -87,6 +87,20 @@ impl Proxy {
             .fail();
         }
 
+        // Check if the tenant is authorized to access the database.
+        if !self
+            .auth
+            .lock()
+            .unwrap()
+            .identify(ctx.tenant.clone(), ctx.access_token.clone())
+        {
+            return ErrNoCause {
+                msg: format!("tenant: {:?} unauthorized", ctx.tenant),
+                code: StatusCode::UNAUTHORIZED,
+            }
+            .fail();
+        }
+
         let req_context = req.context.as_ref().unwrap();
         let schema = &req_context.database;
 
@@ -156,6 +170,20 @@ impl Proxy {
             .fail();
         }
 
+        // Check if the tenant is authorized to access the database.
+        if !self
+            .auth
+            .lock()
+            .unwrap()
+            .identify(ctx.tenant.clone(), ctx.access_token.clone())
+        {
+            return ErrNoCause {
+                msg: format!("tenant: {:?} unauthorized", ctx.tenant),
+                code: StatusCode::UNAUTHORIZED,
+            }
+            .fail();
+        }
+
         let req_context = req.context.as_ref().unwrap();
         let schema = &req_context.database;
         let req = match self.clone().maybe_forward_stream_sql_query(ctx, req).await {
@@ -227,6 +255,8 @@ impl Proxy {
             table: req.tables[0].clone(),
             req: req.clone().into_request(),
             forwarded_from: ctx.forwarded_from.clone(),
+            tenant: ctx.tenant.clone(),
+            access_token: ctx.access_token.clone(),
         };
         let do_query = |mut client: StorageServiceClient<Channel>,
                         request: tonic::Request<SqlQueryRequest>,
diff --git a/src/proxy/src/http/prom.rs b/src/proxy/src/http/prom.rs
index c113b3e705..43ea5b3c8e 100644
--- a/src/proxy/src/http/prom.rs
+++ b/src/proxy/src/http/prom.rs
@@ -19,11 +19,7 @@
 //! It converts write request to gRPC write request, and
 //! translates query request to SQL for execution.
 
-use std::{
-    collections::HashMap,
-    result::Result as StdResult,
-    time::{Duration, Instant},
-};
+use std::{collections::HashMap, result::Result as StdResult, time::Instant};
 
 use async_trait::async_trait;
 use catalog::consts::DEFAULT_CATALOG;
@@ -83,7 +79,7 @@ impl Proxy {
             }),
             table_requests: write_table_requests,
         };
-        let ctx = ProxyContext::new(ctx.timeout, None);
+        let ctx = ProxyContext::new(ctx.timeout, None, ctx.tenant, ctx.access_token);
 
         match self.handle_write_internal(ctx, table_request).await {
             Ok(result) => {
@@ -120,6 +116,20 @@ impl Proxy {
         metric: String,
         query: Query,
     ) -> Result<QueryResult> {
+        // Check if the tenant is authorized to access the database.
+        if !self
+            .auth
+            .lock()
+            .unwrap()
+            .identify(ctx.tenant.clone(), ctx.access_token.clone())
+        {
+            return ErrNoCause {
+                msg: format!("tenant: {:?} unauthorized", ctx.tenant),
+                code: StatusCode::UNAUTHORIZED,
+            }
+            .fail();
+        }
+
         let request_id = &ctx.request_id;
         let begin_instant = Instant::now();
         let deadline = ctx.timeout.map(|t| begin_instant + t);
@@ -178,14 +188,14 @@ impl Proxy {
     /// another HoraeDB instance.
     pub async fn handle_prom_grpc_query(
         &self,
-        timeout: Option<Duration>,
+        ctx: ProxyContext,
         req: PrometheusRemoteQueryRequest,
     ) -> Result<PrometheusRemoteQueryResponse> {
-        let ctx = req.context.context(ErrNoCause {
+        let req_ctx = req.context.context(ErrNoCause {
             code: StatusCode::BAD_REQUEST,
             msg: "request context is missing",
         })?;
-        let database = ctx.database.to_string();
+        let database = req_ctx.database.to_string();
         let query = Query::decode(req.query.as_ref())
             .box_err()
             .context(Internal {
@@ -193,7 +203,9 @@ impl Proxy {
             })?;
         let metric = find_metric(&query.matchers)?;
         let builder = RequestContext::builder()
-            .timeout(timeout)
+            .timeout(ctx.timeout)
+            .tenant(ctx.tenant)
+            .access_token(ctx.access_token)
             .schema(database)
             // TODO: support different catalog
             .catalog(DEFAULT_CATALOG.to_string());
@@ -235,7 +247,7 @@ impl RemoteStorage for Proxy {
                 query: query.encode_to_vec(),
             };
             if let Some(resp) = self
-                .maybe_forward_prom_remote_query(metric.clone(), remote_req)
+                .maybe_forward_prom_remote_query(ctx, metric.clone(), remote_req)
                 .await
                 .map_err(|e| {
                     error!("Forward prom remote query failed, err:{e}");
diff --git a/src/proxy/src/http/sql.rs b/src/proxy/src/http/sql.rs
index 1b1fffdc96..a2c96959ae 100644
--- a/src/proxy/src/http/sql.rs
+++ b/src/proxy/src/http/sql.rs
@@ -50,7 +50,12 @@ impl Proxy {
         req: Request,
     ) -> Result<Output> {
         let schema = &ctx.schema;
-        let ctx = Context::new(ctx.timeout, None);
+        let ctx = Context::new(
+            ctx.timeout,
+            None,
+            ctx.tenant.clone(),
+            ctx.access_token.clone(),
+        );
 
         let query_res = self
             .handle_sql(
diff --git a/src/proxy/src/influxdb/mod.rs b/src/proxy/src/influxdb/mod.rs
index 8c8346dfc6..49b79668ab 100644
--- a/src/proxy/src/influxdb/mod.rs
+++ b/src/proxy/src/influxdb/mod.rs
@@ -81,7 +81,7 @@ impl Proxy {
             }),
             table_requests: write_table_requests,
         };
-        let proxy_context = Context::new(ctx.timeout, None);
+        let proxy_context = Context::new(ctx.timeout, None, ctx.tenant, ctx.access_token);
 
         match self
             .handle_write_internal(proxy_context, table_request)
diff --git a/src/proxy/src/lib.rs b/src/proxy/src/lib.rs
index de07fc2df4..a78722b2d4 100644
--- a/src/proxy/src/lib.rs
+++ b/src/proxy/src/lib.rs
@@ -20,6 +20,7 @@
 
 #![feature(trait_alias)]
 
+pub mod auth;
 pub mod context;
 pub mod error;
 mod error_util;
@@ -80,6 +81,8 @@ use table_engine::{
 use tonic::{transport::Channel, IntoRequest};
 
 use crate::{
+    auth::AuthRef,
+    context::RequestContext,
     error::{ErrNoCause, ErrWithCause, Error, Internal, Result},
     forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef},
     hotspot::HotspotRecorder,
@@ -105,6 +108,7 @@ impl Default for SubTableAccessPerm {
 }
 
 pub struct Proxy {
+    auth: AuthRef,
     router: Arc<dyn Router + Send + Sync>,
     forwarder: ForwarderRef,
     instance: InstanceRef,
@@ -122,6 +126,7 @@ pub struct Proxy {
 impl Proxy {
     #[allow(clippy::too_many_arguments)]
     pub fn new(
+        auth: AuthRef,
         router: Arc<dyn Router + Send + Sync>,
         instance: InstanceRef,
         forward_config: forward::Config,
@@ -143,6 +148,7 @@ impl Proxy {
         ));
 
         Self {
+            auth,
             router,
             instance,
             forwarder,
@@ -168,6 +174,7 @@ impl Proxy {
 
     async fn maybe_forward_prom_remote_query(
         &self,
+        ctx: &RequestContext,
         metric: String,
         req: PrometheusRemoteQueryRequest,
     ) -> Result<Option<ForwardResult<PrometheusRemoteQueryResponse, Error>>> {
@@ -177,6 +184,8 @@ impl Proxy {
             table: metric,
             req: req.into_request(),
             forwarded_from: None,
+            tenant: ctx.tenant.clone(),
+            access_token: ctx.access_token.clone(),
         };
         let do_query = |mut client: StorageServiceClient<Channel>,
                         request: tonic::Request<PrometheusRemoteQueryRequest>,
@@ -536,14 +545,23 @@ pub struct Context {
     request_id: RequestId,
     timeout: Option<Duration>,
     forwarded_from: Option<String>,
+    tenant: Option<String>,
+    access_token: Option<String>,
 }
 
 impl Context {
-    pub fn new(timeout: Option<Duration>, forwarded_from: Option<String>) -> Self {
+    pub fn new(
+        timeout: Option<Duration>,
+        forwarded_from: Option<String>,
+        tenant: Option<String>,
+        access_token: Option<String>,
+    ) -> Self {
         Self {
             request_id: RequestId::next_id(),
             timeout,
             forwarded_from,
+            tenant,
+            access_token,
         }
     }
 }
diff --git a/src/proxy/src/opentsdb/mod.rs b/src/proxy/src/opentsdb/mod.rs
index aae4a4a2d6..9d5ed02f81 100644
--- a/src/proxy/src/opentsdb/mod.rs
+++ b/src/proxy/src/opentsdb/mod.rs
@@ -69,7 +69,7 @@ impl Proxy {
             }),
             table_requests: write_table_requests,
         };
-        let proxy_context = Context::new(ctx.timeout, None);
+        let proxy_context = Context::new(ctx.timeout, None, ctx.tenant, ctx.access_token);
 
         match self
             .handle_write_internal(proxy_context, table_request)
diff --git a/src/proxy/src/read.rs b/src/proxy/src/read.rs
index a34875d13b..ea7eb39bb9 100644
--- a/src/proxy/src/read.rs
+++ b/src/proxy/src/read.rs
@@ -316,6 +316,8 @@ impl Proxy {
             table: table_name.unwrap(),
             req: sql_request.into_request(),
             forwarded_from: ctx.forwarded_from,
+            tenant: ctx.tenant,
+            access_token: ctx.access_token,
         };
         let do_query = |mut client: StorageServiceClient<Channel>,
                         request: tonic::Request<SqlQueryRequest>,
diff --git a/src/proxy/src/write.rs b/src/proxy/src/write.rs
index 1e5ae3564a..24e8e71498 100644
--- a/src/proxy/src/write.rs
+++ b/src/proxy/src/write.rs
@@ -87,6 +87,20 @@ impl Proxy {
         ctx: Context,
         req: WriteRequest,
     ) -> Result<WriteResponse> {
+        // Check if the tenant is authorized to access the database.
+        if !self
+            .auth
+            .lock()
+            .unwrap()
+            .identify(ctx.tenant.clone(), ctx.access_token.clone())
+        {
+            return ErrNoCause {
+                msg: format!("tenant: {:?} unauthorized", ctx.tenant),
+                code: StatusCode::UNAUTHORIZED,
+            }
+            .fail();
+        }
+
         let write_context = req.context.clone();
         let resp = if self.cluster_with_meta {
             self.handle_write_with_meta(ctx, req).await?
@@ -453,6 +467,8 @@ impl Proxy {
                 endpoint,
                 tonic::Request::new(table_write_request),
                 ctx.forwarded_from,
+                ctx.tenant,
+                ctx.access_token,
                 do_write,
             )
             .await;
diff --git a/src/server/src/config.rs b/src/server/src/config.rs
index 054b15329d..b07cef9595 100644
--- a/src/server/src/config.rs
+++ b/src/server/src/config.rs
@@ -25,7 +25,7 @@ use std::{
 use cluster::config::SchemaConfig;
 use common_types::schema::TIMESTAMP_COLUMN;
 use meta_client::types::ShardId;
-use proxy::{forward, hotspot, SubTableAccessPerm};
+use proxy::{auth, forward, hotspot, SubTableAccessPerm};
 use router::{
     endpoint::Endpoint,
     rule_based::{ClusterView, RuleList},
@@ -141,6 +141,9 @@ pub struct ServerConfig {
     /// The minimum length of the response body to compress.
     pub resp_compress_min_length: ReadableSize,
 
+    /// Auth config
+    pub auth: auth::Config,
+
     /// Config for forwarding
     pub forward: forward::Config,
 
@@ -178,6 +181,7 @@ impl Default for ServerConfig {
             http_max_body_size: ReadableSize::mb(64),
             grpc_server_cq_count: 20,
             resp_compress_min_length: ReadableSize::mb(4),
+            auth: auth::Config::default(),
             forward: forward::Config::default(),
             auto_create_table: true,
             default_schema_config: Default::default(),
diff --git a/src/server/src/consts.rs b/src/server/src/consts.rs
index b7b9831002..680e607833 100644
--- a/src/server/src/consts.rs
+++ b/src/server/src/consts.rs
@@ -21,8 +21,6 @@
 pub const CATALOG_HEADER: &str = "x-horaedb-catalog";
 /// Header of schema name
 pub const SCHEMA_HEADER: &str = "x-horaedb-schema";
-/// Header of tenant name
-pub const TENANT_HEADER: &str = "x-horaedb-access-tenant";
 /// Header of content encoding type
 pub const CONTENT_ENCODING_HEADER: &str = "content-encoding";
 
diff --git a/src/server/src/grpc/storage_service/mod.rs b/src/server/src/grpc/storage_service/mod.rs
index 142c5e23ed..1230b80d6b 100644
--- a/src/server/src/grpc/storage_service/mod.rs
+++ b/src/server/src/grpc/storage_service/mod.rs
@@ -35,7 +35,7 @@ use horaedbproto::{
     },
 };
 use http::StatusCode;
-use proxy::{Context, Proxy, FORWARDED_FROM};
+use proxy::{auth::TENANT_TOKEN_HEADER, Context, Proxy, FORWARDED_FROM};
 use table_engine::engine::EngineRuntimes;
 use time_ext::InstantExt;
 
@@ -148,7 +148,12 @@ impl StorageService for StorageServiceImpl {
     ) -> Result<tonic::Response<Self::StreamSqlQueryStream>, tonic::Status> {
         let begin_instant = Instant::now();
         let proxy = self.proxy.clone();
-        let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_tenant(&req),
+            get_access_token(&req),
+        );
 
         let stream = self.stream_sql_query_internal(ctx, proxy, req).await;
 
@@ -166,13 +171,30 @@ fn get_forwarded_from<T>(req: &tonic::Request<T>) -> Option<String> {
         .map(|value| value.to_str().unwrap().to_string())
 }
 
+fn get_tenant<T>(req: &tonic::Request<T>) -> Option<String> {
+    req.metadata()
+        .get(TENANT_TOKEN_HEADER)
+        .map(|value| value.to_str().unwrap().to_string())
+}
+
+fn get_access_token<T>(req: &tonic::Request<T>) -> Option<String> {
+    req.metadata()
+        .get(TENANT_TOKEN_HEADER)
+        .map(|value| value.to_str().unwrap().to_string())
+}
+
 // TODO: Use macros to simplify duplicate code
 impl StorageServiceImpl {
     async fn route_internal(
         &self,
         req: tonic::Request<RouteRequest>,
     ) -> Result<tonic::Response<RouteResponse>, tonic::Status> {
-        let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_tenant(&req),
+            get_access_token(&req),
+        );
         let req = req.into_inner();
         let proxy = self.proxy.clone();
 
@@ -199,7 +221,12 @@ impl StorageServiceImpl {
         &self,
         req: tonic::Request<WriteRequest>,
     ) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
-        let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_tenant(&req),
+            get_access_token(&req),
+        );
 
         let req = req.into_inner();
         let proxy = self.proxy.clone();
@@ -236,7 +263,12 @@ impl StorageServiceImpl {
         &self,
         req: tonic::Request<SqlQueryRequest>,
     ) -> Result<tonic::Response<SqlQueryResponse>, tonic::Status> {
-        let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_tenant(&req),
+            get_access_token(&req),
+        );
         let proxy = self.proxy.clone();
 
         let join_handle = self
@@ -262,11 +294,18 @@ impl StorageServiceImpl {
         &self,
         req: tonic::Request<PrometheusRemoteQueryRequest>,
     ) -> Result<tonic::Response<PrometheusRemoteQueryResponse>, tonic::Status> {
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_tenant(&req),
+            get_access_token(&req),
+        );
+
         let req = req.into_inner();
         let proxy = self.proxy.clone();
-        let timeout = self.timeout;
+
         let join_handle = self.runtimes.read_runtime.spawn(async move {
-            match proxy.handle_prom_grpc_query(timeout, req).await {
+            match proxy.handle_prom_grpc_query(ctx, req).await {
                 Ok(v) => v,
                 Err(e) => PrometheusRemoteQueryResponse {
                     header: Some(error::build_err_header(
@@ -295,7 +334,12 @@ impl StorageServiceImpl {
         &self,
         req: tonic::Request<PrometheusQueryRequest>,
     ) -> Result<tonic::Response<PrometheusQueryResponse>, tonic::Status> {
-        let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_tenant(&req),
+            get_access_token(&req),
+        );
 
         let req = req.into_inner();
         let proxy = self.proxy.clone();
@@ -331,7 +375,12 @@ impl StorageServiceImpl {
         &self,
         req: tonic::Request<tonic::Streaming<WriteRequest>>,
     ) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
-        let ctx = Context::new(self.timeout, get_forwarded_from(&req));
+        let ctx = Context::new(
+            self.timeout,
+            get_forwarded_from(&req),
+            get_tenant(&req),
+            get_access_token(&req),
+        );
         let mut stream = req.into_inner();
         let proxy = self.proxy.clone();
 
diff --git a/src/server/src/http.rs b/src/server/src/http.rs
index 95ce7a1889..c0200db288 100644
--- a/src/server/src/http.rs
+++ b/src/server/src/http.rs
@@ -37,6 +37,7 @@ use macros::define_result;
 use profile::Profiler;
 use prom_remote_api::web;
 use proxy::{
+    auth::{ADMIN_TENANT, TENANT_HEADER, TENANT_TOKEN_HEADER},
     context::RequestContext,
     handlers::{self},
     http::sql::{convert_output, Request},
@@ -732,9 +733,13 @@ impl Service {
 
         header::optional::<String>(consts::CATALOG_HEADER)
             .and(header::optional::<String>(consts::SCHEMA_HEADER))
-            .and(header::optional::<String>(consts::TENANT_HEADER))
+            .and(header::optional::<String>(TENANT_HEADER))
+            .and(header::optional::<String>(TENANT_TOKEN_HEADER))
             .and_then(
-                move |catalog: Option<_>, schema: Option<_>, _tenant: Option<_>| {
+                move |catalog: Option<_>,
+                      schema: Option<_>,
+                      _tenant: Option<_>,
+                      access_token: Option<_>| {
                     // Clone the captured variables
                     let default_catalog = default_catalog.clone();
                     let schema = schema.unwrap_or_else(|| default_schema.clone());
@@ -743,6 +748,8 @@ impl Service {
                             .catalog(catalog.unwrap_or(default_catalog))
                             .schema(schema)
                             .timeout(timeout)
+                            .tenant(Some(ADMIN_TENANT.to_string()))
+                            .access_token(access_token)
                             .build()
                             .context(CreateContext)
                             .map_err(reject::custom)
diff --git a/src/server/src/mysql/worker.rs b/src/server/src/mysql/worker.rs
index b25e756bc5..37d9f676ab 100644
--- a/src/server/src/mysql/worker.rs
+++ b/src/server/src/mysql/worker.rs
@@ -23,7 +23,7 @@ use logger::{error, info};
 use opensrv_mysql::{
     AsyncMysqlShim, ErrorKind, InitWriter, QueryResultWriter, StatementMetaWriter,
 };
-use proxy::{context::RequestContext, http::sql::Request, Proxy};
+use proxy::{auth::ADMIN_TENANT, context::RequestContext, http::sql::Request, Proxy};
 use snafu::ResultExt;
 
 use crate::{
@@ -152,6 +152,7 @@ where
             .catalog(session.catalog().to_string())
             .schema(session.schema().to_string())
             .timeout(self.timeout)
+            .tenant(Some(ADMIN_TENANT.to_string()))
             .build()
             .context(CreateContext)
     }
diff --git a/src/server/src/postgresql/handler.rs b/src/server/src/postgresql/handler.rs
index feecbca8f7..4ed80e795c 100644
--- a/src/server/src/postgresql/handler.rs
+++ b/src/server/src/postgresql/handler.rs
@@ -31,7 +31,7 @@ use pgwire::{
     },
     error::{PgWireError, PgWireResult},
 };
-use proxy::{context::RequestContext, http::sql::Request, Proxy};
+use proxy::{auth::ADMIN_TENANT, context::RequestContext, http::sql::Request, Proxy};
 use snafu::ResultExt;
 
 use crate::postgresql::error::{CreateContext, Result};
@@ -89,6 +89,7 @@ impl PostgresqlHandler {
             .catalog(default_catalog)
             .schema(default_schema)
             .timeout(self.timeout)
+            .tenant(Some(ADMIN_TENANT.to_string()))
             .build()
             .context(CreateContext)
     }
diff --git a/src/server/src/server.rs b/src/server/src/server.rs
index ddc151c809..7da3f276ba 100644
--- a/src/server/src/server.rs
+++ b/src/server/src/server.rs
@@ -17,7 +17,7 @@
 
 //! Server
 
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 
 use catalog::manager::ManagerRef;
 use cluster::ClusterRef;
@@ -29,6 +29,7 @@ use macros::define_result;
 use notifier::notifier::RequestNotifiers;
 use partition_table_engine::PartitionTableEngine;
 use proxy::{
+    auth::{auth_with_file::AuthWithFile, AuthBase, AuthRef},
     hotspot::HotspotRecorder,
     instance::{DynamicConfig, Instance, InstanceRef},
     limiter::Limiter,
@@ -135,6 +136,9 @@ pub enum Error {
 
     #[snafu(display("Failed to build query engine, err:{source}"))]
     BuildQueryEngine { source: query_engine::error::Error },
+
+    #[snafu(display("Failed to load auth credential, err:{source}"))]
+    LoadCredential { source: proxy::auth::Error },
 }
 
 define_result!(Error);
@@ -451,7 +455,24 @@ impl Builder {
             .enable
             .then(|| Arc::new(RequestNotifiers::default()));
 
+        // Build auth
+        let auth: AuthRef =
+            if self.server_config.auth.enable && self.server_config.auth.auth_type == "file" {
+                Arc::new(Mutex::new(AuthWithFile::new(
+                    self.server_config.auth.source.clone(),
+                )))
+            } else {
+                Arc::new(Mutex::new(AuthBase))
+            };
+
+        // Load auth credential
+        auth.lock()
+            .unwrap()
+            .load_credential()
+            .context(LoadCredential)?;
+
         let proxy = Arc::new(Proxy::new(
+            auth,
             router.clone(),
             instance.clone(),
             self.server_config.forward,

From 94bb684733218d64f9acdd6dd1de5332c9af3538 Mon Sep 17 00:00:00 2001
From: baojinri <baojinri@gmail.com>
Date: Tue, 14 May 2024 12:29:33 +0800
Subject: [PATCH 2/8] use interceptor to auth

---
 Cargo.lock                                 |  1 +
 src/proxy/Cargo.toml                       |  1 +
 src/proxy/src/auth/auth_with_file.rs       | 60 +++++++++++++++-------
 src/proxy/src/auth/mod.rs                  | 31 ++---------
 src/proxy/src/context.rs                   | 21 +++-----
 src/proxy/src/forward.rs                   | 28 +++-------
 src/proxy/src/grpc/prom_query.rs           | 14 -----
 src/proxy/src/grpc/route.rs                | 55 +++++---------------
 src/proxy/src/grpc/sql_query.rs            | 31 +----------
 src/proxy/src/http/prom.rs                 | 19 +------
 src/proxy/src/http/sql.rs                  |  7 +--
 src/proxy/src/influxdb/mod.rs              |  2 +-
 src/proxy/src/lib.rs                       | 16 ++----
 src/proxy/src/opentsdb/mod.rs              |  2 +-
 src/proxy/src/read.rs                      |  3 +-
 src/proxy/src/write.rs                     | 17 +-----
 src/server/src/consts.rs                   |  2 +
 src/server/src/grpc/mod.rs                 | 18 +++++--
 src/server/src/grpc/storage_service/mod.rs | 33 ++++--------
 src/server/src/http.rs                     | 11 +---
 src/server/src/mysql/worker.rs             |  3 +-
 src/server/src/postgresql/handler.rs       |  3 +-
 src/server/src/server.rs                   | 26 ++++------
 23 files changed, 127 insertions(+), 277 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c8e331b0f7..a0c6dd8bfa 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5428,6 +5428,7 @@ dependencies = [
  "arrow 49.0.0",
  "arrow_ext",
  "async-trait",
+ "base64 0.13.1",
  "bytes",
  "catalog",
  "clru",
diff --git a/src/proxy/Cargo.toml b/src/proxy/Cargo.toml
index 1f66b131da..c6831b3682 100644
--- a/src/proxy/Cargo.toml
+++ b/src/proxy/Cargo.toml
@@ -34,6 +34,7 @@ workspace = true
 arrow = { workspace = true }
 arrow_ext = { workspace = true }
 async-trait = { workspace = true }
+base64 = { workspace = true }
 bytes = { workspace = true }
 catalog = { workspace = true }
 clru = { workspace = true }
diff --git a/src/proxy/src/auth/auth_with_file.rs b/src/proxy/src/auth/auth_with_file.rs
index ba2074215c..fe977f19a5 100644
--- a/src/proxy/src/auth/auth_with_file.rs
+++ b/src/proxy/src/auth/auth_with_file.rs
@@ -18,29 +18,35 @@
 //! The proxy module provides features such as forwarding and authentication,
 //! adapts to different protocols.
 
-use std::{collections::HashMap, fs::File, io, io::BufRead, path::Path};
+use std::{collections::HashSet, fs::File, io, io::BufRead, path::Path};
 
+use base64::encode;
 use snafu::ResultExt;
+use tonic::service::Interceptor;
 
-use crate::auth::{Auth, FileNotExisted, OpenFile, ReadLine, Result, ADMIN_TENANT};
+use crate::auth::{FileNotExisted, OpenFile, ReadLine, Result, AUTHORIZATION};
 
+#[derive(Debug, Clone, Default)]
 pub struct AuthWithFile {
+    enable: bool,
     file_path: String,
-    users: HashMap<String, String>,
+    auth: HashSet<String>,
 }
 
 impl AuthWithFile {
-    pub fn new(file_path: String) -> Self {
+    pub fn new(enable: bool, file_path: String) -> Self {
         Self {
+            enable,
             file_path,
-            users: HashMap::new(),
+            auth: HashSet::new(),
         }
     }
-}
 
-impl Auth for AuthWithFile {
-    /// Load credential from file
-    fn load_credential(&mut self) -> Result<()> {
+    pub fn load_credential(&mut self) -> Result<()> {
+        if !self.enable {
+            return Ok(());
+        }
+
         let path = Path::new(&self.file_path);
         if !path.exists() {
             return FileNotExisted {
@@ -54,24 +60,40 @@ impl Auth for AuthWithFile {
 
         for line in reader.lines() {
             let line = line.context(ReadLine)?;
-            if let Some((value, key)) = line.split_once(':') {
-                self.users.insert(key.to_string(), value.to_string());
-            }
+            let mut buf = Vec::with_capacity(line.len() + 1);
+            buf.extend_from_slice(line.as_bytes());
+            let auth = encode(&buf);
+            self.auth.insert(format!("Basic {}", auth));
         }
 
         Ok(())
     }
 
-    fn identify(&self, tenant: Option<String>, token: Option<String>) -> bool {
-        if let Some(tenant) = tenant {
-            if tenant == ADMIN_TENANT {
-                return true;
-            }
+    fn identify(&self, authorization: Option<String>) -> bool {
+        if !self.enable {
+            return true;
         }
 
-        match token {
-            Some(token) => self.users.contains_key(&token),
+        match authorization {
+            Some(auth) => self.auth.contains(&auth),
             None => false,
         }
     }
 }
+
+impl Interceptor for AuthWithFile {
+    fn call(
+        &mut self,
+        request: tonic::Request<()>,
+    ) -> std::result::Result<tonic::Request<()>, tonic::Status> {
+        let metadata = request.metadata();
+        let authorization = metadata
+            .get(AUTHORIZATION)
+            .map(|v| v.to_str().unwrap().to_string());
+        if self.identify(authorization) {
+            Ok(request)
+        } else {
+            Err(tonic::Status::unauthenticated("unauthenticated"))
+        }
+    }
+}
diff --git a/src/proxy/src/auth/mod.rs b/src/proxy/src/auth/mod.rs
index 78552c818a..735e0a3287 100644
--- a/src/proxy/src/auth/mod.rs
+++ b/src/proxy/src/auth/mod.rs
@@ -18,8 +18,6 @@
 //! The proxy module provides features such as forwarding and authentication,
 //! adapts to different protocols.
 
-use std::sync::{Arc, Mutex};
-
 use macros::define_result;
 use serde::{Deserialize, Serialize};
 use snafu::Snafu;
@@ -40,15 +38,10 @@ pub enum Error {
 
 define_result!(Error);
 
-pub type AuthRef = Arc<Mutex<dyn Auth>>;
-
-/// Header of tenant name
-pub const TENANT_HEADER: &str = "x-horaedb-access-tenant";
-/// Header of tenant name
-pub const TENANT_TOKEN_HEADER: &str = "x-horaedb-access-token";
+/// Header of authorization
+pub const AUTHORIZATION: &str = "authorization";
 
-/// Admin tenant name
-pub const ADMIN_TENANT: &str = "admin";
+pub const DEFAULT_AUTH_TYPE: &str = "file";
 
 #[derive(Debug, Clone, Deserialize, Serialize, Default)]
 pub struct Config {
@@ -56,21 +49,3 @@ pub struct Config {
     pub auth_type: String,
     pub source: String,
 }
-
-pub trait Auth: Send + Sync {
-    fn load_credential(&mut self) -> Result<()>;
-    fn identify(&self, tenant: Option<String>, token: Option<String>) -> bool;
-}
-
-#[derive(Default)]
-pub struct AuthBase;
-
-impl Auth for AuthBase {
-    fn load_credential(&mut self) -> Result<()> {
-        Ok(())
-    }
-
-    fn identify(&self, _tenant: Option<String>, _token: Option<String>) -> bool {
-        true
-    }
-}
diff --git a/src/proxy/src/context.rs b/src/proxy/src/context.rs
index fb2c752f58..b4452609cc 100644
--- a/src/proxy/src/context.rs
+++ b/src/proxy/src/context.rs
@@ -56,10 +56,8 @@ pub struct RequestContext {
     pub timeout: Option<Duration>,
     /// Request id
     pub request_id: RequestId,
-    /// Tenant
-    pub tenant: Option<String>,
-    /// Access token
-    pub access_token: Option<String>,
+    /// authorization
+    pub authorization: Option<String>,
 }
 
 impl RequestContext {
@@ -73,8 +71,7 @@ pub struct Builder {
     catalog: String,
     schema: String,
     timeout: Option<Duration>,
-    tenant: Option<String>,
-    access_token: Option<String>,
+    authorization: Option<String>,
 }
 
 impl Builder {
@@ -93,13 +90,8 @@ impl Builder {
         self
     }
 
-    pub fn tenant(mut self, tenant: Option<String>) -> Self {
-        self.tenant = tenant;
-        self
-    }
-
-    pub fn access_token(mut self, access_token: Option<String>) -> Self {
-        self.access_token = access_token;
+    pub fn authorization(mut self, tenant: Option<String>) -> Self {
+        self.authorization = tenant;
         self
     }
 
@@ -112,8 +104,7 @@ impl Builder {
             schema: self.schema,
             timeout: self.timeout,
             request_id: RequestId::next_id(),
-            tenant: self.tenant,
-            access_token: self.access_token,
+            authorization: self.authorization,
         })
     }
 }
diff --git a/src/proxy/src/forward.rs b/src/proxy/src/forward.rs
index bc73ed5ff4..93d0dae96b 100644
--- a/src/proxy/src/forward.rs
+++ b/src/proxy/src/forward.rs
@@ -37,10 +37,7 @@ use tonic::{
     transport::{self, Channel},
 };
 
-use crate::{
-    auth::{TENANT_HEADER, TENANT_TOKEN_HEADER},
-    FORWARDED_FROM,
-};
+use crate::{auth::AUTHORIZATION, FORWARDED_FROM};
 
 #[derive(Debug, Snafu)]
 pub enum Error {
@@ -209,8 +206,7 @@ pub struct ForwardRequest<Req> {
     pub table: String,
     pub req: tonic::Request<Req>,
     pub forwarded_from: Option<String>,
-    pub tenant: Option<String>,
-    pub access_token: Option<String>,
+    pub authorization: Option<String>,
 }
 
 impl Forwarder<DefaultClientBuilder> {
@@ -288,8 +284,7 @@ impl<B: ClientBuilder> Forwarder<B> {
             table,
             req,
             forwarded_from,
-            tenant,
-            access_token,
+            authorization,
         } = forward_req;
 
         let req_pb = RouteRequestPb {
@@ -316,7 +311,7 @@ impl<B: ClientBuilder> Forwarder<B> {
             }
         };
 
-        self.forward_with_endpoint(endpoint, req, forwarded_from, tenant, access_token, do_rpc)
+        self.forward_with_endpoint(endpoint, req, forwarded_from, authorization, do_rpc)
             .await
     }
 
@@ -325,8 +320,7 @@ impl<B: ClientBuilder> Forwarder<B> {
         endpoint: Endpoint,
         mut req: tonic::Request<Req>,
         forwarded_from: Option<String>,
-        tenant: Option<String>,
-        access_token: Option<String>,
+        authorization: Option<String>,
         do_rpc: F,
     ) -> Result<ForwardResult<Resp, Err>>
     where
@@ -360,14 +354,9 @@ impl<B: ClientBuilder> Forwarder<B> {
             self.local_endpoint.to_string().parse().unwrap(),
         );
 
-        if let Some(tenant) = tenant {
-            req.metadata_mut()
-                .insert(TENANT_HEADER, tenant.parse().unwrap());
-        }
-
-        if let Some(access_token) = access_token {
+        if let Some(authorization) = authorization {
             req.metadata_mut()
-                .insert(TENANT_TOKEN_HEADER, access_token.parse().unwrap());
+                .insert(AUTHORIZATION, authorization.parse().unwrap());
         }
 
         let client = self.get_or_create_client(&endpoint).await?;
@@ -522,8 +511,7 @@ mod tests {
                 table: table.to_string(),
                 req: query_request.into_request(),
                 forwarded_from: None,
-                tenant: None,
-                access_token: None,
+                authorization: None,
             }
         };
 
diff --git a/src/proxy/src/grpc/prom_query.rs b/src/proxy/src/grpc/prom_query.rs
index 0a189eb920..e596f61a1d 100644
--- a/src/proxy/src/grpc/prom_query.rs
+++ b/src/proxy/src/grpc/prom_query.rs
@@ -82,20 +82,6 @@ impl Proxy {
             code: StatusCode::BAD_REQUEST,
         })?;
 
-        // Check if the tenant is authorized to access the database.
-        if !self
-            .auth
-            .lock()
-            .unwrap()
-            .identify(ctx.tenant.clone(), ctx.access_token)
-        {
-            return ErrNoCause {
-                msg: format!("tenant: {:?} unauthorized", ctx.tenant),
-                code: StatusCode::UNAUTHORIZED,
-            }
-            .fail();
-        }
-
         let schema = req_ctx.database;
         let catalog = self.instance.catalog_manager.default_catalog_name();
 
diff --git a/src/proxy/src/grpc/route.rs b/src/proxy/src/grpc/route.rs
index 1ee085bed1..0955cec2bd 100644
--- a/src/proxy/src/grpc/route.rs
+++ b/src/proxy/src/grpc/route.rs
@@ -16,59 +16,30 @@
 // under the License.
 
 use horaedbproto::storage::{RouteRequest as RouteRequestPb, RouteResponse};
-use http::StatusCode;
 use router::RouteRequest;
 
-use crate::{
-    error,
-    error::{ErrNoCause, Result},
-    metrics::GRPC_HANDLER_COUNTER_VEC,
-    Context, Proxy,
-};
+use crate::{error, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy};
 
 impl Proxy {
-    pub async fn handle_route(&self, ctx: Context, req: RouteRequestPb) -> RouteResponse {
+    pub async fn handle_route(&self, _ctx: Context, req: RouteRequestPb) -> RouteResponse {
         let request = RouteRequest::new(req, true);
+        let routes = self.route(request).await;
 
-        match self.handle_route_internal(ctx, request).await {
-            Ok(v) => {
-                GRPC_HANDLER_COUNTER_VEC.route_succeeded.inc();
-                v
-            }
+        let mut resp = RouteResponse::default();
+        match routes {
             Err(e) => {
-                error!("Failed to handle route, err:{e}");
                 GRPC_HANDLER_COUNTER_VEC.route_failed.inc();
-                RouteResponse {
-                    header: Some(error::build_err_header(e)),
-                    ..Default::default()
-                }
+
+                error!("Failed to handle route, err:{e}");
+                resp.header = Some(error::build_err_header(e));
             }
-        }
-    }
+            Ok(v) => {
+                GRPC_HANDLER_COUNTER_VEC.route_succeeded.inc();
 
-    async fn handle_route_internal(
-        &self,
-        ctx: Context,
-        req: RouteRequest,
-    ) -> Result<RouteResponse> {
-        // Check if the tenant is authorized to access the database.
-        if !self
-            .auth
-            .lock()
-            .unwrap()
-            .identify(ctx.tenant.clone(), ctx.access_token.clone())
-        {
-            return ErrNoCause {
-                msg: format!("tenant: {:?} unauthorized", ctx.tenant),
-                code: StatusCode::UNAUTHORIZED,
+                resp.header = Some(error::build_ok_header());
+                resp.routes = v;
             }
-            .fail();
         }
-
-        let routes = self.route(req).await?;
-        Ok(RouteResponse {
-            header: Some(error::build_ok_header()),
-            routes,
-        })
+        resp
     }
 }
diff --git a/src/proxy/src/grpc/sql_query.rs b/src/proxy/src/grpc/sql_query.rs
index cc2332ced5..16c6201703 100644
--- a/src/proxy/src/grpc/sql_query.rs
+++ b/src/proxy/src/grpc/sql_query.rs
@@ -87,20 +87,6 @@ impl Proxy {
             .fail();
         }
 
-        // Check if the tenant is authorized to access the database.
-        if !self
-            .auth
-            .lock()
-            .unwrap()
-            .identify(ctx.tenant.clone(), ctx.access_token.clone())
-        {
-            return ErrNoCause {
-                msg: format!("tenant: {:?} unauthorized", ctx.tenant),
-                code: StatusCode::UNAUTHORIZED,
-            }
-            .fail();
-        }
-
         let req_context = req.context.as_ref().unwrap();
         let schema = &req_context.database;
 
@@ -170,20 +156,6 @@ impl Proxy {
             .fail();
         }
 
-        // Check if the tenant is authorized to access the database.
-        if !self
-            .auth
-            .lock()
-            .unwrap()
-            .identify(ctx.tenant.clone(), ctx.access_token.clone())
-        {
-            return ErrNoCause {
-                msg: format!("tenant: {:?} unauthorized", ctx.tenant),
-                code: StatusCode::UNAUTHORIZED,
-            }
-            .fail();
-        }
-
         let req_context = req.context.as_ref().unwrap();
         let schema = &req_context.database;
         let req = match self.clone().maybe_forward_stream_sql_query(ctx, req).await {
@@ -255,8 +227,7 @@ impl Proxy {
             table: req.tables[0].clone(),
             req: req.clone().into_request(),
             forwarded_from: ctx.forwarded_from.clone(),
-            tenant: ctx.tenant.clone(),
-            access_token: ctx.access_token.clone(),
+            authorization: ctx.authorization.clone(),
         };
         let do_query = |mut client: StorageServiceClient<Channel>,
                         request: tonic::Request<SqlQueryRequest>,
diff --git a/src/proxy/src/http/prom.rs b/src/proxy/src/http/prom.rs
index 43ea5b3c8e..847da23d0a 100644
--- a/src/proxy/src/http/prom.rs
+++ b/src/proxy/src/http/prom.rs
@@ -79,7 +79,7 @@ impl Proxy {
             }),
             table_requests: write_table_requests,
         };
-        let ctx = ProxyContext::new(ctx.timeout, None, ctx.tenant, ctx.access_token);
+        let ctx = ProxyContext::new(ctx.timeout, None, ctx.authorization);
 
         match self.handle_write_internal(ctx, table_request).await {
             Ok(result) => {
@@ -116,20 +116,6 @@ impl Proxy {
         metric: String,
         query: Query,
     ) -> Result<QueryResult> {
-        // Check if the tenant is authorized to access the database.
-        if !self
-            .auth
-            .lock()
-            .unwrap()
-            .identify(ctx.tenant.clone(), ctx.access_token.clone())
-        {
-            return ErrNoCause {
-                msg: format!("tenant: {:?} unauthorized", ctx.tenant),
-                code: StatusCode::UNAUTHORIZED,
-            }
-            .fail();
-        }
-
         let request_id = &ctx.request_id;
         let begin_instant = Instant::now();
         let deadline = ctx.timeout.map(|t| begin_instant + t);
@@ -204,8 +190,7 @@ impl Proxy {
         let metric = find_metric(&query.matchers)?;
         let builder = RequestContext::builder()
             .timeout(ctx.timeout)
-            .tenant(ctx.tenant)
-            .access_token(ctx.access_token)
+            .authorization(ctx.authorization)
             .schema(database)
             // TODO: support different catalog
             .catalog(DEFAULT_CATALOG.to_string());
diff --git a/src/proxy/src/http/sql.rs b/src/proxy/src/http/sql.rs
index a2c96959ae..8f61039eac 100644
--- a/src/proxy/src/http/sql.rs
+++ b/src/proxy/src/http/sql.rs
@@ -50,12 +50,7 @@ impl Proxy {
         req: Request,
     ) -> Result<Output> {
         let schema = &ctx.schema;
-        let ctx = Context::new(
-            ctx.timeout,
-            None,
-            ctx.tenant.clone(),
-            ctx.access_token.clone(),
-        );
+        let ctx = Context::new(ctx.timeout, None, ctx.authorization.clone());
 
         let query_res = self
             .handle_sql(
diff --git a/src/proxy/src/influxdb/mod.rs b/src/proxy/src/influxdb/mod.rs
index 49b79668ab..3e24478684 100644
--- a/src/proxy/src/influxdb/mod.rs
+++ b/src/proxy/src/influxdb/mod.rs
@@ -81,7 +81,7 @@ impl Proxy {
             }),
             table_requests: write_table_requests,
         };
-        let proxy_context = Context::new(ctx.timeout, None, ctx.tenant, ctx.access_token);
+        let proxy_context = Context::new(ctx.timeout, None, ctx.authorization);
 
         match self
             .handle_write_internal(proxy_context, table_request)
diff --git a/src/proxy/src/lib.rs b/src/proxy/src/lib.rs
index a78722b2d4..f2c36b13c7 100644
--- a/src/proxy/src/lib.rs
+++ b/src/proxy/src/lib.rs
@@ -81,7 +81,6 @@ use table_engine::{
 use tonic::{transport::Channel, IntoRequest};
 
 use crate::{
-    auth::AuthRef,
     context::RequestContext,
     error::{ErrNoCause, ErrWithCause, Error, Internal, Result},
     forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef},
@@ -108,7 +107,6 @@ impl Default for SubTableAccessPerm {
 }
 
 pub struct Proxy {
-    auth: AuthRef,
     router: Arc<dyn Router + Send + Sync>,
     forwarder: ForwarderRef,
     instance: InstanceRef,
@@ -126,7 +124,6 @@ pub struct Proxy {
 impl Proxy {
     #[allow(clippy::too_many_arguments)]
     pub fn new(
-        auth: AuthRef,
         router: Arc<dyn Router + Send + Sync>,
         instance: InstanceRef,
         forward_config: forward::Config,
@@ -148,7 +145,6 @@ impl Proxy {
         ));
 
         Self {
-            auth,
             router,
             instance,
             forwarder,
@@ -184,8 +180,7 @@ impl Proxy {
             table: metric,
             req: req.into_request(),
             forwarded_from: None,
-            tenant: ctx.tenant.clone(),
-            access_token: ctx.access_token.clone(),
+            authorization: ctx.authorization.clone(),
         };
         let do_query = |mut client: StorageServiceClient<Channel>,
                         request: tonic::Request<PrometheusRemoteQueryRequest>,
@@ -545,23 +540,20 @@ pub struct Context {
     request_id: RequestId,
     timeout: Option<Duration>,
     forwarded_from: Option<String>,
-    tenant: Option<String>,
-    access_token: Option<String>,
+    authorization: Option<String>,
 }
 
 impl Context {
     pub fn new(
         timeout: Option<Duration>,
         forwarded_from: Option<String>,
-        tenant: Option<String>,
-        access_token: Option<String>,
+        authorization: Option<String>,
     ) -> Self {
         Self {
             request_id: RequestId::next_id(),
             timeout,
             forwarded_from,
-            tenant,
-            access_token,
+            authorization,
         }
     }
 }
diff --git a/src/proxy/src/opentsdb/mod.rs b/src/proxy/src/opentsdb/mod.rs
index 9d5ed02f81..80affd0392 100644
--- a/src/proxy/src/opentsdb/mod.rs
+++ b/src/proxy/src/opentsdb/mod.rs
@@ -69,7 +69,7 @@ impl Proxy {
             }),
             table_requests: write_table_requests,
         };
-        let proxy_context = Context::new(ctx.timeout, None, ctx.tenant, ctx.access_token);
+        let proxy_context = Context::new(ctx.timeout, None, ctx.authorization);
 
         match self
             .handle_write_internal(proxy_context, table_request)
diff --git a/src/proxy/src/read.rs b/src/proxy/src/read.rs
index ea7eb39bb9..67e1473d71 100644
--- a/src/proxy/src/read.rs
+++ b/src/proxy/src/read.rs
@@ -316,8 +316,7 @@ impl Proxy {
             table: table_name.unwrap(),
             req: sql_request.into_request(),
             forwarded_from: ctx.forwarded_from,
-            tenant: ctx.tenant,
-            access_token: ctx.access_token,
+            authorization: ctx.authorization,
         };
         let do_query = |mut client: StorageServiceClient<Channel>,
                         request: tonic::Request<SqlQueryRequest>,
diff --git a/src/proxy/src/write.rs b/src/proxy/src/write.rs
index 24e8e71498..fd8e54bd5b 100644
--- a/src/proxy/src/write.rs
+++ b/src/proxy/src/write.rs
@@ -87,20 +87,6 @@ impl Proxy {
         ctx: Context,
         req: WriteRequest,
     ) -> Result<WriteResponse> {
-        // Check if the tenant is authorized to access the database.
-        if !self
-            .auth
-            .lock()
-            .unwrap()
-            .identify(ctx.tenant.clone(), ctx.access_token.clone())
-        {
-            return ErrNoCause {
-                msg: format!("tenant: {:?} unauthorized", ctx.tenant),
-                code: StatusCode::UNAUTHORIZED,
-            }
-            .fail();
-        }
-
         let write_context = req.context.clone();
         let resp = if self.cluster_with_meta {
             self.handle_write_with_meta(ctx, req).await?
@@ -467,8 +453,7 @@ impl Proxy {
                 endpoint,
                 tonic::Request::new(table_write_request),
                 ctx.forwarded_from,
-                ctx.tenant,
-                ctx.access_token,
+                ctx.authorization,
                 do_write,
             )
             .await;
diff --git a/src/server/src/consts.rs b/src/server/src/consts.rs
index 680e607833..b7b9831002 100644
--- a/src/server/src/consts.rs
+++ b/src/server/src/consts.rs
@@ -21,6 +21,8 @@
 pub const CATALOG_HEADER: &str = "x-horaedb-catalog";
 /// Header of schema name
 pub const SCHEMA_HEADER: &str = "x-horaedb-schema";
+/// Header of tenant name
+pub const TENANT_HEADER: &str = "x-horaedb-access-tenant";
 /// Header of content encoding type
 pub const CONTENT_ENCODING_HEADER: &str = "content-encoding";
 
diff --git a/src/server/src/grpc/mod.rs b/src/server/src/grpc/mod.rs
index 1c53f205dc..66c45893a9 100644
--- a/src/server/src/grpc/mod.rs
+++ b/src/server/src/grpc/mod.rs
@@ -37,6 +37,7 @@ use logger::{info, warn};
 use macros::define_result;
 use notifier::notifier::RequestNotifiers;
 use proxy::{
+    auth::auth_with_file::AuthWithFile,
     forward,
     hotspot::HotspotRecorder,
     instance::InstanceRef,
@@ -47,7 +48,7 @@ use runtime::{JoinHandle, Runtime};
 use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
 use table_engine::engine::EngineRuntimes;
 use tokio::sync::oneshot::{self, Sender};
-use tonic::transport::Server;
+use tonic::{codegen::InterceptedService, transport::Server};
 use wal::manager::OpenedWals;
 
 use self::remote_engine_service::QueryDedup;
@@ -113,6 +114,9 @@ pub enum Error {
     #[snafu(display("Missing HotspotRecorder.\nBacktrace:\n{}", backtrace))]
     MissingHotspotRecorder { backtrace: Backtrace },
 
+    #[snafu(display("Missing auth.\nBacktrace:\n{}", backtrace))]
+    MissingAuth { backtrace: Backtrace },
+
     #[snafu(display("Catalog name is not utf8.\nBacktrace:\n{}", backtrace))]
     ParseCatalogName {
         source: std::string::FromUtf8Error,
@@ -158,7 +162,7 @@ define_result!(Error);
 /// Rpc services manages all grpc services of the server.
 pub struct RpcServices {
     serve_addr: SocketAddr,
-    rpc_server: StorageServiceServer<StorageServiceImpl>,
+    rpc_server: InterceptedService<StorageServiceServer<StorageServiceImpl>, AuthWithFile>,
     meta_rpc_server: Option<MetaEventServiceServer<MetaServiceImpl>>,
     remote_engine_server: RemoteEngineServiceServer<RemoteEngineServiceImpl>,
     runtime: Arc<Runtime>,
@@ -212,6 +216,7 @@ impl RpcServices {
 }
 
 pub struct Builder {
+    auth: Option<AuthWithFile>,
     endpoint: String,
     timeout: Option<Duration>,
     runtimes: Option<Arc<EngineRuntimes>>,
@@ -226,6 +231,7 @@ pub struct Builder {
 impl Builder {
     pub fn new() -> Self {
         Self {
+            auth: None,
             endpoint: "0.0.0.0:8381".to_string(),
             timeout: None,
             runtimes: None,
@@ -238,6 +244,11 @@ impl Builder {
         }
     }
 
+    pub fn auth(mut self, auth: AuthWithFile) -> Self {
+        self.auth = Some(auth);
+        self
+    }
+
     pub fn endpoint(mut self, endpoint: String) -> Self {
         self.endpoint = endpoint;
         self
@@ -287,6 +298,7 @@ impl Builder {
 
 impl Builder {
     pub fn build(self) -> Result<RpcServices> {
+        let auth = self.auth.context(MissingAuth)?;
         let runtimes = self.runtimes.context(MissingRuntimes)?;
         let instance = self.instance.context(MissingInstance)?;
         let opened_wals = self.opened_wals.context(MissingWals)?;
@@ -330,7 +342,7 @@ impl Builder {
             runtimes,
             timeout: self.timeout,
         };
-        let rpc_server = StorageServiceServer::new(storage_service);
+        let rpc_server = StorageServiceServer::with_interceptor(storage_service, auth);
 
         let serve_addr = self.endpoint.parse().context(InvalidRpcServeAddr)?;
 
diff --git a/src/server/src/grpc/storage_service/mod.rs b/src/server/src/grpc/storage_service/mod.rs
index 1230b80d6b..228c1742c9 100644
--- a/src/server/src/grpc/storage_service/mod.rs
+++ b/src/server/src/grpc/storage_service/mod.rs
@@ -35,7 +35,7 @@ use horaedbproto::{
     },
 };
 use http::StatusCode;
-use proxy::{auth::TENANT_TOKEN_HEADER, Context, Proxy, FORWARDED_FROM};
+use proxy::{auth::AUTHORIZATION, Context, Proxy, FORWARDED_FROM};
 use table_engine::engine::EngineRuntimes;
 use time_ext::InstantExt;
 
@@ -151,8 +151,7 @@ impl StorageService for StorageServiceImpl {
         let ctx = Context::new(
             self.timeout,
             get_forwarded_from(&req),
-            get_tenant(&req),
-            get_access_token(&req),
+            get_authorization(&req),
         );
 
         let stream = self.stream_sql_query_internal(ctx, proxy, req).await;
@@ -171,15 +170,9 @@ fn get_forwarded_from<T>(req: &tonic::Request<T>) -> Option<String> {
         .map(|value| value.to_str().unwrap().to_string())
 }
 
-fn get_tenant<T>(req: &tonic::Request<T>) -> Option<String> {
+fn get_authorization<T>(req: &tonic::Request<T>) -> Option<String> {
     req.metadata()
-        .get(TENANT_TOKEN_HEADER)
-        .map(|value| value.to_str().unwrap().to_string())
-}
-
-fn get_access_token<T>(req: &tonic::Request<T>) -> Option<String> {
-    req.metadata()
-        .get(TENANT_TOKEN_HEADER)
+        .get(AUTHORIZATION)
         .map(|value| value.to_str().unwrap().to_string())
 }
 
@@ -192,8 +185,7 @@ impl StorageServiceImpl {
         let ctx = Context::new(
             self.timeout,
             get_forwarded_from(&req),
-            get_tenant(&req),
-            get_access_token(&req),
+            get_authorization(&req),
         );
         let req = req.into_inner();
         let proxy = self.proxy.clone();
@@ -224,8 +216,7 @@ impl StorageServiceImpl {
         let ctx = Context::new(
             self.timeout,
             get_forwarded_from(&req),
-            get_tenant(&req),
-            get_access_token(&req),
+            get_authorization(&req),
         );
 
         let req = req.into_inner();
@@ -266,8 +257,7 @@ impl StorageServiceImpl {
         let ctx = Context::new(
             self.timeout,
             get_forwarded_from(&req),
-            get_tenant(&req),
-            get_access_token(&req),
+            get_authorization(&req),
         );
         let proxy = self.proxy.clone();
 
@@ -297,8 +287,7 @@ impl StorageServiceImpl {
         let ctx = Context::new(
             self.timeout,
             get_forwarded_from(&req),
-            get_tenant(&req),
-            get_access_token(&req),
+            get_authorization(&req),
         );
 
         let req = req.into_inner();
@@ -337,8 +326,7 @@ impl StorageServiceImpl {
         let ctx = Context::new(
             self.timeout,
             get_forwarded_from(&req),
-            get_tenant(&req),
-            get_access_token(&req),
+            get_authorization(&req),
         );
 
         let req = req.into_inner();
@@ -378,8 +366,7 @@ impl StorageServiceImpl {
         let ctx = Context::new(
             self.timeout,
             get_forwarded_from(&req),
-            get_tenant(&req),
-            get_access_token(&req),
+            get_authorization(&req),
         );
         let mut stream = req.into_inner();
         let proxy = self.proxy.clone();
diff --git a/src/server/src/http.rs b/src/server/src/http.rs
index c0200db288..95ce7a1889 100644
--- a/src/server/src/http.rs
+++ b/src/server/src/http.rs
@@ -37,7 +37,6 @@ use macros::define_result;
 use profile::Profiler;
 use prom_remote_api::web;
 use proxy::{
-    auth::{ADMIN_TENANT, TENANT_HEADER, TENANT_TOKEN_HEADER},
     context::RequestContext,
     handlers::{self},
     http::sql::{convert_output, Request},
@@ -733,13 +732,9 @@ impl Service {
 
         header::optional::<String>(consts::CATALOG_HEADER)
             .and(header::optional::<String>(consts::SCHEMA_HEADER))
-            .and(header::optional::<String>(TENANT_HEADER))
-            .and(header::optional::<String>(TENANT_TOKEN_HEADER))
+            .and(header::optional::<String>(consts::TENANT_HEADER))
             .and_then(
-                move |catalog: Option<_>,
-                      schema: Option<_>,
-                      _tenant: Option<_>,
-                      access_token: Option<_>| {
+                move |catalog: Option<_>, schema: Option<_>, _tenant: Option<_>| {
                     // Clone the captured variables
                     let default_catalog = default_catalog.clone();
                     let schema = schema.unwrap_or_else(|| default_schema.clone());
@@ -748,8 +743,6 @@ impl Service {
                             .catalog(catalog.unwrap_or(default_catalog))
                             .schema(schema)
                             .timeout(timeout)
-                            .tenant(Some(ADMIN_TENANT.to_string()))
-                            .access_token(access_token)
                             .build()
                             .context(CreateContext)
                             .map_err(reject::custom)
diff --git a/src/server/src/mysql/worker.rs b/src/server/src/mysql/worker.rs
index 37d9f676ab..b25e756bc5 100644
--- a/src/server/src/mysql/worker.rs
+++ b/src/server/src/mysql/worker.rs
@@ -23,7 +23,7 @@ use logger::{error, info};
 use opensrv_mysql::{
     AsyncMysqlShim, ErrorKind, InitWriter, QueryResultWriter, StatementMetaWriter,
 };
-use proxy::{auth::ADMIN_TENANT, context::RequestContext, http::sql::Request, Proxy};
+use proxy::{context::RequestContext, http::sql::Request, Proxy};
 use snafu::ResultExt;
 
 use crate::{
@@ -152,7 +152,6 @@ where
             .catalog(session.catalog().to_string())
             .schema(session.schema().to_string())
             .timeout(self.timeout)
-            .tenant(Some(ADMIN_TENANT.to_string()))
             .build()
             .context(CreateContext)
     }
diff --git a/src/server/src/postgresql/handler.rs b/src/server/src/postgresql/handler.rs
index 4ed80e795c..feecbca8f7 100644
--- a/src/server/src/postgresql/handler.rs
+++ b/src/server/src/postgresql/handler.rs
@@ -31,7 +31,7 @@ use pgwire::{
     },
     error::{PgWireError, PgWireResult},
 };
-use proxy::{auth::ADMIN_TENANT, context::RequestContext, http::sql::Request, Proxy};
+use proxy::{context::RequestContext, http::sql::Request, Proxy};
 use snafu::ResultExt;
 
 use crate::postgresql::error::{CreateContext, Result};
@@ -89,7 +89,6 @@ impl PostgresqlHandler {
             .catalog(default_catalog)
             .schema(default_schema)
             .timeout(self.timeout)
-            .tenant(Some(ADMIN_TENANT.to_string()))
             .build()
             .context(CreateContext)
     }
diff --git a/src/server/src/server.rs b/src/server/src/server.rs
index 7da3f276ba..9c3383f7d1 100644
--- a/src/server/src/server.rs
+++ b/src/server/src/server.rs
@@ -17,7 +17,7 @@
 
 //! Server
 
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 
 use catalog::manager::ManagerRef;
 use cluster::ClusterRef;
@@ -29,7 +29,7 @@ use macros::define_result;
 use notifier::notifier::RequestNotifiers;
 use partition_table_engine::PartitionTableEngine;
 use proxy::{
-    auth::{auth_with_file::AuthWithFile, AuthBase, AuthRef},
+    auth::{auth_with_file::AuthWithFile, DEFAULT_AUTH_TYPE},
     hotspot::HotspotRecorder,
     instance::{DynamicConfig, Instance, InstanceRef},
     limiter::Limiter,
@@ -456,23 +456,18 @@ impl Builder {
             .then(|| Arc::new(RequestNotifiers::default()));
 
         // Build auth
-        let auth: AuthRef =
-            if self.server_config.auth.enable && self.server_config.auth.auth_type == "file" {
-                Arc::new(Mutex::new(AuthWithFile::new(
-                    self.server_config.auth.source.clone(),
-                )))
-            } else {
-                Arc::new(Mutex::new(AuthBase))
-            };
+        let mut auth = if self.server_config.auth.enable
+            && self.server_config.auth.auth_type == DEFAULT_AUTH_TYPE
+        {
+            AuthWithFile::new(true, self.server_config.auth.source.clone())
+        } else {
+            AuthWithFile::default()
+        };
 
         // Load auth credential
-        auth.lock()
-            .unwrap()
-            .load_credential()
-            .context(LoadCredential)?;
+        auth.load_credential().context(LoadCredential)?;
 
         let proxy = Arc::new(Proxy::new(
-            auth,
             router.clone(),
             instance.clone(),
             self.server_config.forward,
@@ -521,6 +516,7 @@ impl Builder {
             .context(BuildPostgresqlService)?;
 
         let rpc_services = grpc::Builder::new()
+            .auth(auth)
             .endpoint(grpc_endpoint.to_string())
             .runtimes(engine_runtimes)
             .instance(instance.clone())

From 48e87902e085cef1733070f1dde8957ad616e2e8 Mon Sep 17 00:00:00 2001
From: baojinri <baojinri@gmail.com>
Date: Tue, 14 May 2024 19:26:25 +0800
Subject: [PATCH 3/8] auth in http

---
 src/proxy/src/auth/auth_with_file.rs |  2 +-
 src/proxy/src/context.rs             |  4 ++--
 src/server/src/http.rs               | 32 +++++++++++++++++++++++++++-
 src/server/src/server.rs             |  1 +
 4 files changed, 35 insertions(+), 4 deletions(-)

diff --git a/src/proxy/src/auth/auth_with_file.rs b/src/proxy/src/auth/auth_with_file.rs
index fe977f19a5..1652a829fd 100644
--- a/src/proxy/src/auth/auth_with_file.rs
+++ b/src/proxy/src/auth/auth_with_file.rs
@@ -69,7 +69,7 @@ impl AuthWithFile {
         Ok(())
     }
 
-    fn identify(&self, authorization: Option<String>) -> bool {
+    pub fn identify(&self, authorization: Option<String>) -> bool {
         if !self.enable {
             return true;
         }
diff --git a/src/proxy/src/context.rs b/src/proxy/src/context.rs
index b4452609cc..02aa48c9b2 100644
--- a/src/proxy/src/context.rs
+++ b/src/proxy/src/context.rs
@@ -90,8 +90,8 @@ impl Builder {
         self
     }
 
-    pub fn authorization(mut self, tenant: Option<String>) -> Self {
-        self.authorization = tenant;
+    pub fn authorization(mut self, authorization: Option<String>) -> Self {
+        self.authorization = authorization;
         self
     }
 
diff --git a/src/server/src/http.rs b/src/server/src/http.rs
index 95ce7a1889..35a12926bb 100644
--- a/src/server/src/http.rs
+++ b/src/server/src/http.rs
@@ -37,6 +37,7 @@ use macros::define_result;
 use profile::Profiler;
 use prom_remote_api::web;
 use proxy::{
+    auth::{auth_with_file::AuthWithFile, AUTHORIZATION},
     context::RequestContext,
     handlers::{self},
     http::sql::{convert_output, Request},
@@ -92,6 +93,9 @@ pub enum Error {
     #[snafu(display("Missing proxy.\nBacktrace:\n{}", backtrace))]
     MissingProxy { backtrace: Backtrace },
 
+    #[snafu(display("Missing auth.\nBacktrace:\n{}", backtrace))]
+    MissingAuth { backtrace: Backtrace },
+
     #[snafu(display(
         "Fail to do heap profiling, err:{}.\nBacktrace:\n{}",
         source,
@@ -148,6 +152,9 @@ pub enum Error {
 
     #[snafu(display("Querying shards is only supported in cluster mode"))]
     QueryShards {},
+
+    #[snafu(display("unauthenticated.\nBacktrace:\n{}", backtrace))]
+    UnAuthenticated { backtrace: Backtrace },
 }
 
 define_result!(Error);
@@ -176,6 +183,7 @@ impl TryFrom<&str> for ContentEncodingType {
 /// Endpoints beginning with /debug are for internal use, and may subject to
 /// breaking changes.
 pub struct Service {
+    auth: AuthWithFile,
     // In cluster mode, cluster is valid, while in stand-alone mode, cluster is None
     cluster: Option<ClusterRef>,
     proxy: Arc<Proxy>,
@@ -729,16 +737,27 @@ impl Service {
             .default_schema_name()
             .to_string();
         let timeout = self.config.timeout;
+        let auth = self.auth.clone();
 
         header::optional::<String>(consts::CATALOG_HEADER)
             .and(header::optional::<String>(consts::SCHEMA_HEADER))
             .and(header::optional::<String>(consts::TENANT_HEADER))
+            .and(header::optional::<String>(AUTHORIZATION))
             .and_then(
-                move |catalog: Option<_>, schema: Option<_>, _tenant: Option<_>| {
+                move |catalog: Option<_>,
+                      schema: Option<_>,
+                      _tenant: Option<_>,
+                      authorization: Option<_>| {
                     // Clone the captured variables
                     let default_catalog = default_catalog.clone();
                     let schema = schema.unwrap_or_else(|| default_schema.clone());
+                    let auth = auth.clone();
+
                     async move {
+                        if !auth.identify(authorization) {
+                            return UnAuthenticated.fail().map_err(reject::custom);
+                        }
+
                         RequestContext::builder()
                             .catalog(catalog.unwrap_or(default_catalog))
                             .schema(schema)
@@ -800,6 +819,7 @@ impl Service {
 
 /// Service builder
 pub struct Builder {
+    auth: Option<AuthWithFile>,
     config: HttpConfig,
     engine_runtimes: Option<Arc<EngineRuntimes>>,
     log_runtime: Option<Arc<RuntimeLevel>>,
@@ -812,6 +832,7 @@ pub struct Builder {
 impl Builder {
     pub fn new(config: HttpConfig) -> Self {
         Self {
+            auth: None,
             config,
             engine_runtimes: None,
             log_runtime: None,
@@ -822,6 +843,11 @@ impl Builder {
         }
     }
 
+    pub fn auth(mut self, auth: AuthWithFile) -> Self {
+        self.auth = Some(auth);
+        self
+    }
+
     pub fn engine_runtimes(mut self, engine_runtimes: Arc<EngineRuntimes>) -> Self {
         self.engine_runtimes = Some(engine_runtimes);
         self
@@ -860,12 +886,14 @@ impl Builder {
         let log_runtime = self.log_runtime.context(MissingLogRuntime)?;
         let config_content = self.config_content.context(MissingInstance)?;
         let proxy = self.proxy.context(MissingProxy)?;
+        let auth = self.auth.context(MissingAuth)?;
         let cluster = self.cluster;
         let opened_wals = self.opened_wals.context(MissingWal)?;
 
         let (tx, rx) = oneshot::channel();
 
         let service = Service {
+            auth,
             cluster,
             proxy,
             engine_runtimes,
@@ -908,6 +936,7 @@ fn error_to_status_code(err: &Error) -> StatusCode {
         | Error::MissingInstance { .. }
         | Error::MissingSchemaConfigProvider { .. }
         | Error::MissingProxy { .. }
+        | Error::MissingAuth { .. }
         | Error::ParseIpAddr { .. }
         | Error::ProfileHeap { .. }
         | Error::ProfileCPU { .. }
@@ -919,6 +948,7 @@ fn error_to_status_code(err: &Error) -> StatusCode {
         | Error::QueryShards { .. } => StatusCode::BAD_REQUEST,
         Error::HandleUpdateLogLevel { .. } => StatusCode::INTERNAL_SERVER_ERROR,
         Error::QueryMaybeExceedTTL { .. } => StatusCode::OK,
+        Error::UnAuthenticated { .. } => StatusCode::UNAUTHORIZED,
     }
 }
 
diff --git a/src/server/src/server.rs b/src/server/src/server.rs
index 9c3383f7d1..62d996c0a0 100644
--- a/src/server/src/server.rs
+++ b/src/server/src/server.rs
@@ -484,6 +484,7 @@ impl Builder {
         ));
 
         let http_service = http::Builder::new(http_config)
+            .auth(auth.clone())
             .engine_runtimes(engine_runtimes.clone())
             .log_runtime(log_runtime)
             .config_content(config_content)

From aad84faa97b3b9cf65ef7e32cdafb1fed558775c Mon Sep 17 00:00:00 2001
From: baojinri <baojinri@gmail.com>
Date: Tue, 14 May 2024 23:19:36 +0800
Subject: [PATCH 4/8] fix for cr

---
 src/proxy/src/auth/mod.rs                     | 30 +++++--------------
 .../auth/{auth_with_file.rs => with_file.rs}  | 21 ++++++++-----
 src/proxy/src/lib.rs                          |  8 +++++
 src/server/src/grpc/mod.rs                    |  2 +-
 src/server/src/http.rs                        | 22 +++-----------
 src/server/src/server.rs                      | 14 ++++-----
 6 files changed, 41 insertions(+), 56 deletions(-)
 rename src/proxy/src/auth/{auth_with_file.rs => with_file.rs} (85%)

diff --git a/src/proxy/src/auth/mod.rs b/src/proxy/src/auth/mod.rs
index 735e0a3287..b0b5269123 100644
--- a/src/proxy/src/auth/mod.rs
+++ b/src/proxy/src/auth/mod.rs
@@ -15,37 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! The proxy module provides features such as forwarding and authentication,
-//! adapts to different protocols.
-
-use macros::define_result;
 use serde::{Deserialize, Serialize};
-use snafu::Snafu;
-
-pub mod auth_with_file;
-
-#[derive(Debug, Snafu)]
-pub enum Error {
-    #[snafu(display("Failed to open file, err:{}.", source))]
-    OpenFile { source: std::io::Error },
-
-    #[snafu(display("Failed to read line, err:{}.", source))]
-    ReadLine { source: std::io::Error },
 
-    #[snafu(display("File not existed, file path:{}", path))]
-    FileNotExisted { path: String },
-}
-
-define_result!(Error);
+pub mod with_file;
 
 /// Header of authorization
 pub const AUTHORIZATION: &str = "authorization";
 
-pub const DEFAULT_AUTH_TYPE: &str = "file";
+#[derive(Debug, Clone, Deserialize, Serialize, Default)]
+pub enum AuthType {
+    #[default]
+    #[serde(rename = "file")]
+    File,
+}
 
 #[derive(Debug, Clone, Deserialize, Serialize, Default)]
 pub struct Config {
     pub enable: bool,
-    pub auth_type: String,
+    pub auth_type: AuthType,
     pub source: String,
 }
diff --git a/src/proxy/src/auth/auth_with_file.rs b/src/proxy/src/auth/with_file.rs
similarity index 85%
rename from src/proxy/src/auth/auth_with_file.rs
rename to src/proxy/src/auth/with_file.rs
index 1652a829fd..96067dfb81 100644
--- a/src/proxy/src/auth/auth_with_file.rs
+++ b/src/proxy/src/auth/with_file.rs
@@ -15,16 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! The proxy module provides features such as forwarding and authentication,
-//! adapts to different protocols.
-
 use std::{collections::HashSet, fs::File, io, io::BufRead, path::Path};
 
 use base64::encode;
+use generic_error::BoxError;
 use snafu::ResultExt;
 use tonic::service::Interceptor;
 
-use crate::auth::{FileNotExisted, OpenFile, ReadLine, Result, AUTHORIZATION};
+use crate::{
+    auth::AUTHORIZATION,
+    error::{Internal, InternalNoCause, Result},
+};
 
 #[derive(Debug, Clone, Default)]
 pub struct AuthWithFile {
@@ -49,17 +50,21 @@ impl AuthWithFile {
 
         let path = Path::new(&self.file_path);
         if !path.exists() {
-            return FileNotExisted {
-                path: self.file_path.clone(),
+            return InternalNoCause {
+                msg: format!("file not existed: {:?}", path),
             }
             .fail();
         }
 
-        let file = File::open(path).context(OpenFile)?;
+        let file = File::open(path).box_err().context(Internal {
+            msg: "failed to open file",
+        })?;
         let reader = io::BufReader::new(file);
 
         for line in reader.lines() {
-            let line = line.context(ReadLine)?;
+            let line = line.box_err().context(Internal {
+                msg: "failed to read line",
+            })?;
             let mut buf = Vec::with_capacity(line.len() + 1);
             buf.extend_from_slice(line.as_bytes());
             let auth = encode(&buf);
diff --git a/src/proxy/src/lib.rs b/src/proxy/src/lib.rs
index f2c36b13c7..5488164032 100644
--- a/src/proxy/src/lib.rs
+++ b/src/proxy/src/lib.rs
@@ -81,6 +81,7 @@ use table_engine::{
 use tonic::{transport::Channel, IntoRequest};
 
 use crate::{
+    auth::with_file::AuthWithFile,
     context::RequestContext,
     error::{ErrNoCause, ErrWithCause, Error, Internal, Result},
     forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef},
@@ -107,6 +108,7 @@ impl Default for SubTableAccessPerm {
 }
 
 pub struct Proxy {
+    auth: AuthWithFile,
     router: Arc<dyn Router + Send + Sync>,
     forwarder: ForwarderRef,
     instance: InstanceRef,
@@ -124,6 +126,7 @@ pub struct Proxy {
 impl Proxy {
     #[allow(clippy::too_many_arguments)]
     pub fn new(
+        auth: AuthWithFile,
         router: Arc<dyn Router + Send + Sync>,
         instance: InstanceRef,
         forward_config: forward::Config,
@@ -145,6 +148,7 @@ impl Proxy {
         ));
 
         Self {
+            auth,
             router,
             instance,
             forwarder,
@@ -533,6 +537,10 @@ impl Proxy {
             })
         }
     }
+
+    pub fn check_auth(&self, authorization: Option<String>) -> bool {
+        self.auth.identify(authorization)
+    }
 }
 
 #[derive(Clone, Debug)]
diff --git a/src/server/src/grpc/mod.rs b/src/server/src/grpc/mod.rs
index 66c45893a9..7b02a3a2a2 100644
--- a/src/server/src/grpc/mod.rs
+++ b/src/server/src/grpc/mod.rs
@@ -37,7 +37,7 @@ use logger::{info, warn};
 use macros::define_result;
 use notifier::notifier::RequestNotifiers;
 use proxy::{
-    auth::auth_with_file::AuthWithFile,
+    auth::with_file::AuthWithFile,
     forward,
     hotspot::HotspotRecorder,
     instance::InstanceRef,
diff --git a/src/server/src/http.rs b/src/server/src/http.rs
index 35a12926bb..ef23e0dfc4 100644
--- a/src/server/src/http.rs
+++ b/src/server/src/http.rs
@@ -37,7 +37,7 @@ use macros::define_result;
 use profile::Profiler;
 use prom_remote_api::web;
 use proxy::{
-    auth::{auth_with_file::AuthWithFile, AUTHORIZATION},
+    auth::AUTHORIZATION,
     context::RequestContext,
     handlers::{self},
     http::sql::{convert_output, Request},
@@ -93,9 +93,6 @@ pub enum Error {
     #[snafu(display("Missing proxy.\nBacktrace:\n{}", backtrace))]
     MissingProxy { backtrace: Backtrace },
 
-    #[snafu(display("Missing auth.\nBacktrace:\n{}", backtrace))]
-    MissingAuth { backtrace: Backtrace },
-
     #[snafu(display(
         "Fail to do heap profiling, err:{}.\nBacktrace:\n{}",
         source,
@@ -183,7 +180,6 @@ impl TryFrom<&str> for ContentEncodingType {
 /// Endpoints beginning with /debug are for internal use, and may subject to
 /// breaking changes.
 pub struct Service {
-    auth: AuthWithFile,
     // In cluster mode, cluster is valid, while in stand-alone mode, cluster is None
     cluster: Option<ClusterRef>,
     proxy: Arc<Proxy>,
@@ -737,7 +733,7 @@ impl Service {
             .default_schema_name()
             .to_string();
         let timeout = self.config.timeout;
-        let auth = self.auth.clone();
+        let proxy = self.proxy.clone();
 
         header::optional::<String>(consts::CATALOG_HEADER)
             .and(header::optional::<String>(consts::SCHEMA_HEADER))
@@ -751,10 +747,10 @@ impl Service {
                     // Clone the captured variables
                     let default_catalog = default_catalog.clone();
                     let schema = schema.unwrap_or_else(|| default_schema.clone());
-                    let auth = auth.clone();
+                    let proxy = proxy.clone();
 
                     async move {
-                        if !auth.identify(authorization) {
+                        if !proxy.check_auth(authorization) {
                             return UnAuthenticated.fail().map_err(reject::custom);
                         }
 
@@ -819,7 +815,6 @@ impl Service {
 
 /// Service builder
 pub struct Builder {
-    auth: Option<AuthWithFile>,
     config: HttpConfig,
     engine_runtimes: Option<Arc<EngineRuntimes>>,
     log_runtime: Option<Arc<RuntimeLevel>>,
@@ -832,7 +827,6 @@ pub struct Builder {
 impl Builder {
     pub fn new(config: HttpConfig) -> Self {
         Self {
-            auth: None,
             config,
             engine_runtimes: None,
             log_runtime: None,
@@ -843,11 +837,6 @@ impl Builder {
         }
     }
 
-    pub fn auth(mut self, auth: AuthWithFile) -> Self {
-        self.auth = Some(auth);
-        self
-    }
-
     pub fn engine_runtimes(mut self, engine_runtimes: Arc<EngineRuntimes>) -> Self {
         self.engine_runtimes = Some(engine_runtimes);
         self
@@ -886,14 +875,12 @@ impl Builder {
         let log_runtime = self.log_runtime.context(MissingLogRuntime)?;
         let config_content = self.config_content.context(MissingInstance)?;
         let proxy = self.proxy.context(MissingProxy)?;
-        let auth = self.auth.context(MissingAuth)?;
         let cluster = self.cluster;
         let opened_wals = self.opened_wals.context(MissingWal)?;
 
         let (tx, rx) = oneshot::channel();
 
         let service = Service {
-            auth,
             cluster,
             proxy,
             engine_runtimes,
@@ -936,7 +923,6 @@ fn error_to_status_code(err: &Error) -> StatusCode {
         | Error::MissingInstance { .. }
         | Error::MissingSchemaConfigProvider { .. }
         | Error::MissingProxy { .. }
-        | Error::MissingAuth { .. }
         | Error::ParseIpAddr { .. }
         | Error::ProfileHeap { .. }
         | Error::ProfileCPU { .. }
diff --git a/src/server/src/server.rs b/src/server/src/server.rs
index 62d996c0a0..f7cd72ec7b 100644
--- a/src/server/src/server.rs
+++ b/src/server/src/server.rs
@@ -29,7 +29,7 @@ use macros::define_result;
 use notifier::notifier::RequestNotifiers;
 use partition_table_engine::PartitionTableEngine;
 use proxy::{
-    auth::{auth_with_file::AuthWithFile, DEFAULT_AUTH_TYPE},
+    auth::{with_file::AuthWithFile, AuthType},
     hotspot::HotspotRecorder,
     instance::{DynamicConfig, Instance, InstanceRef},
     limiter::Limiter,
@@ -138,7 +138,7 @@ pub enum Error {
     BuildQueryEngine { source: query_engine::error::Error },
 
     #[snafu(display("Failed to load auth credential, err:{source}"))]
-    LoadCredential { source: proxy::auth::Error },
+    LoadCredential { source: proxy::error::Error },
 }
 
 define_result!(Error);
@@ -456,10 +456,10 @@ impl Builder {
             .then(|| Arc::new(RequestNotifiers::default()));
 
         // Build auth
-        let mut auth = if self.server_config.auth.enable
-            && self.server_config.auth.auth_type == DEFAULT_AUTH_TYPE
-        {
-            AuthWithFile::new(true, self.server_config.auth.source.clone())
+        let mut auth = if self.server_config.auth.enable {
+            match self.server_config.auth.auth_type {
+                AuthType::File => AuthWithFile::new(true, self.server_config.auth.source.clone()),
+            }
         } else {
             AuthWithFile::default()
         };
@@ -468,6 +468,7 @@ impl Builder {
         auth.load_credential().context(LoadCredential)?;
 
         let proxy = Arc::new(Proxy::new(
+            auth.clone(),
             router.clone(),
             instance.clone(),
             self.server_config.forward,
@@ -484,7 +485,6 @@ impl Builder {
         ));
 
         let http_service = http::Builder::new(http_config)
-            .auth(auth.clone())
             .engine_runtimes(engine_runtimes.clone())
             .log_runtime(log_runtime)
             .config_content(config_content)

From cbb75eccb3b72c28f949930f75d2f777cdb3e9b2 Mon Sep 17 00:00:00 2001
From: baojinri <baojinri@gmail.com>
Date: Tue, 14 May 2024 23:24:13 +0800
Subject: [PATCH 5/8] modify buf len

---
 src/proxy/src/auth/with_file.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/proxy/src/auth/with_file.rs b/src/proxy/src/auth/with_file.rs
index 96067dfb81..869c772c75 100644
--- a/src/proxy/src/auth/with_file.rs
+++ b/src/proxy/src/auth/with_file.rs
@@ -65,7 +65,7 @@ impl AuthWithFile {
             let line = line.box_err().context(Internal {
                 msg: "failed to read line",
             })?;
-            let mut buf = Vec::with_capacity(line.len() + 1);
+            let mut buf = Vec::with_capacity(line.len());
             buf.extend_from_slice(line.as_bytes());
             let auth = encode(&buf);
             self.auth.insert(format!("Basic {}", auth));

From f169ee38322bb33b04c31624cc65b7f59a76ac06 Mon Sep 17 00:00:00 2001
From: baojinri <baojinri@gmail.com>
Date: Tue, 14 May 2024 23:34:59 +0800
Subject: [PATCH 6/8] add authorization to RequestContext

---
 src/server/src/http.rs | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/server/src/http.rs b/src/server/src/http.rs
index ef23e0dfc4..da74ee8472 100644
--- a/src/server/src/http.rs
+++ b/src/server/src/http.rs
@@ -750,7 +750,7 @@ impl Service {
                     let proxy = proxy.clone();
 
                     async move {
-                        if !proxy.check_auth(authorization) {
+                        if !proxy.check_auth(authorization.clone()) {
                             return UnAuthenticated.fail().map_err(reject::custom);
                         }
 
@@ -758,6 +758,7 @@ impl Service {
                             .catalog(catalog.unwrap_or(default_catalog))
                             .schema(schema)
                             .timeout(timeout)
+                            .authorization(authorization)
                             .build()
                             .context(CreateContext)
                             .map_err(reject::custom)

From 6c58696e07e9042e4fa02771cad99931b33b70fd Mon Sep 17 00:00:00 2001
From: jiacai2050 <dev@liujiacai.net>
Date: Wed, 15 May 2024 11:12:28 +0800
Subject: [PATCH 7/8] remove unwrap

---
 src/proxy/src/auth/with_file.rs            | 13 +++++++++----
 src/server/src/grpc/storage_service/mod.rs |  8 +-------
 2 files changed, 10 insertions(+), 11 deletions(-)

diff --git a/src/proxy/src/auth/with_file.rs b/src/proxy/src/auth/with_file.rs
index 869c772c75..a38e669dc1 100644
--- a/src/proxy/src/auth/with_file.rs
+++ b/src/proxy/src/auth/with_file.rs
@@ -74,6 +74,7 @@ impl AuthWithFile {
         Ok(())
     }
 
+    // TODO: currently we only support basic auth
     pub fn identify(&self, authorization: Option<String>) -> bool {
         if !self.enable {
             return true;
@@ -86,15 +87,19 @@ impl AuthWithFile {
     }
 }
 
+pub fn get_authorization<T>(req: &tonic::Request<T>) -> Option<String> {
+    req.metadata()
+        .get(AUTHORIZATION)
+        .and_then(|value| value.to_str().ok().map(String::from))
+}
+
 impl Interceptor for AuthWithFile {
     fn call(
         &mut self,
         request: tonic::Request<()>,
     ) -> std::result::Result<tonic::Request<()>, tonic::Status> {
-        let metadata = request.metadata();
-        let authorization = metadata
-            .get(AUTHORIZATION)
-            .map(|v| v.to_str().unwrap().to_string());
+        // TODO: extract username from request
+        let authorization = get_authorization(&request);
         if self.identify(authorization) {
             Ok(request)
         } else {
diff --git a/src/server/src/grpc/storage_service/mod.rs b/src/server/src/grpc/storage_service/mod.rs
index 228c1742c9..9cf1fa2237 100644
--- a/src/server/src/grpc/storage_service/mod.rs
+++ b/src/server/src/grpc/storage_service/mod.rs
@@ -35,7 +35,7 @@ use horaedbproto::{
     },
 };
 use http::StatusCode;
-use proxy::{auth::AUTHORIZATION, Context, Proxy, FORWARDED_FROM};
+use proxy::{auth::with_file::get_authorization, Context, Proxy, FORWARDED_FROM};
 use table_engine::engine::EngineRuntimes;
 use time_ext::InstantExt;
 
@@ -170,12 +170,6 @@ fn get_forwarded_from<T>(req: &tonic::Request<T>) -> Option<String> {
         .map(|value| value.to_str().unwrap().to_string())
 }
 
-fn get_authorization<T>(req: &tonic::Request<T>) -> Option<String> {
-    req.metadata()
-        .get(AUTHORIZATION)
-        .map(|value| value.to_str().unwrap().to_string())
-}
-
 // TODO: Use macros to simplify duplicate code
 impl StorageServiceImpl {
     async fn route_internal(

From 970ef2abe5cecccaffe65d3c4a8c91bae3a17bab Mon Sep 17 00:00:00 2001
From: jiacai2050 <dev@liujiacai.net>
Date: Wed, 15 May 2024 16:44:34 +0800
Subject: [PATCH 8/8] decode auth

---
 src/proxy/src/auth/with_file.rs | 51 +++++++++++++++++++++++++--------
 1 file changed, 39 insertions(+), 12 deletions(-)

diff --git a/src/proxy/src/auth/with_file.rs b/src/proxy/src/auth/with_file.rs
index a38e669dc1..116005cee4 100644
--- a/src/proxy/src/auth/with_file.rs
+++ b/src/proxy/src/auth/with_file.rs
@@ -15,11 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{collections::HashSet, fs::File, io, io::BufRead, path::Path};
+use std::{
+    collections::HashMap,
+    fs::File,
+    io::{self, BufRead},
+    path::Path,
+};
 
-use base64::encode;
 use generic_error::BoxError;
-use snafu::ResultExt;
+use snafu::{OptionExt, ResultExt};
 use tonic::service::Interceptor;
 
 use crate::{
@@ -31,7 +35,8 @@ use crate::{
 pub struct AuthWithFile {
     enable: bool,
     file_path: String,
-    auth: HashSet<String>,
+    // name -> password
+    users: HashMap<String, String>,
 }
 
 impl AuthWithFile {
@@ -39,10 +44,11 @@ impl AuthWithFile {
         Self {
             enable,
             file_path,
-            auth: HashSet::new(),
+            users: HashMap::new(),
         }
     }
 
+    // Load a csv format config
     pub fn load_credential(&mut self) -> Result<()> {
         if !self.enable {
             return Ok(());
@@ -65,23 +71,44 @@ impl AuthWithFile {
             let line = line.box_err().context(Internal {
                 msg: "failed to read line",
             })?;
-            let mut buf = Vec::with_capacity(line.len());
-            buf.extend_from_slice(line.as_bytes());
-            let auth = encode(&buf);
-            self.auth.insert(format!("Basic {}", auth));
+            let (username, password) = line.split_once(',').with_context(|| InternalNoCause {
+                msg: format!("invalid line: {:?}", line),
+            })?;
+            self.users
+                .insert(username.to_string(), password.to_string());
         }
 
         Ok(())
     }
 
     // TODO: currently we only support basic auth
-    pub fn identify(&self, authorization: Option<String>) -> bool {
+    // This function should return Result
+    pub fn identify(&self, input: Option<String>) -> bool {
         if !self.enable {
             return true;
         }
 
-        match authorization {
-            Some(auth) => self.auth.contains(&auth),
+        let input = match input {
+            Some(v) => v,
+            None => return false,
+        };
+        let input = match input.split_once("Basic ") {
+            Some((_, encoded)) => match base64::decode(encoded) {
+                Ok(v) => v,
+                Err(_e) => return false,
+            },
+            None => return false,
+        };
+        let input = match std::str::from_utf8(&input) {
+            Ok(v) => v,
+            Err(_e) => return false,
+        };
+        match input.split_once(':') {
+            Some((user, pass)) => self
+                .users
+                .get(user)
+                .map(|expected| expected == pass)
+                .unwrap_or_default(),
             None => false,
         }
     }