diff --git a/Cargo.lock b/Cargo.lock index 57b3f9674..584fb54aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1910,6 +1910,18 @@ dependencies = [ "syn 1.0.99", ] +[[package]] +name = "dashmap" +version = "5.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f" +dependencies = [ + "cfg-if 1.0.0", + "hashbrown 0.12.1", + "lock_api", + "parking_lot_core 0.9.3", +] + [[package]] name = "data-encoding" version = "2.3.2" @@ -3665,66 +3677,95 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.17.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8" +checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e" dependencies = [ - "async-trait", - "crossbeam-channel", - "futures-channel", - "futures-executor", - "futures-util", - "js-sys", - "lazy_static", - "percent-encoding", - "pin-project", - "rand 0.8.5", - "thiserror", - "tokio", - "tokio-stream", + "opentelemetry_api", + "opentelemetry_sdk", ] [[package]] name = "opentelemetry-datadog" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "457462dc4cd365992c574c79181ff11ee6f66c5cbfb15a352217b4e0b35eac34" +checksum = "171770efa142d2a19455b7e985037f560b2e75461f822dd1688bfd83c14856f6" dependencies = [ "async-trait", + "futures-core", "http 0.2.8", "indexmap", "itertools", - "lazy_static", + "once_cell", "opentelemetry", "opentelemetry-http", "opentelemetry-semantic-conventions", "reqwest", "rmp", "thiserror", + "url", ] [[package]] name = "opentelemetry-http" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "449048140ee61e28f57abe6e9975eedc1f3a29855c7407bd6c12b18578863379" +checksum = "1edc79add46364183ece1a4542592ca593e6421c60807232f5b8f7a31703825d" dependencies = [ "async-trait", "bytes 1.1.0", "http 0.2.8", - "opentelemetry", + "opentelemetry_api", "reqwest", ] [[package]] name = "opentelemetry-semantic-conventions" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "985cc35d832d412224b2cffe2f9194b1b89b6aa5d0bef76d080dce09d90e62bd" +checksum = "9b02e0230abb0ab6636d18e2ba8fa02903ea63772281340ccac18e0af3ec9eeb" dependencies = [ "opentelemetry", ] +[[package]] +name = "opentelemetry_api" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22" +dependencies = [ + "fnv", + "futures-channel", + "futures-util", + "indexmap", + "js-sys", + "once_cell", + "pin-project-lite 0.2.9", + "thiserror", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113" +dependencies = [ + "async-trait", + "crossbeam-channel", + "dashmap", + "fnv", + "futures-channel", + "futures-executor", + "futures-util", + "once_cell", + "opentelemetry_api", + "percent-encoding", + "rand 0.8.5", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "ordered-float" version = "2.10.0" @@ -5314,6 +5355,7 @@ dependencies = [ "once_cell", "opentelemetry", "opentelemetry-datadog", + "opentelemetry-http", "pipe", "portpicker", "rand 0.8.5", @@ -5358,6 +5400,7 @@ dependencies = [ "once_cell", "opentelemetry", "opentelemetry-datadog", + "opentelemetry-http", "portpicker", "rand 0.8.5", "regex", @@ -6620,9 +6663,9 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.17.4" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbbe89715c1dbbb790059e2565353978564924ee85017b5fff365c872ff6721f" +checksum = "21ebb87a95ea13271332df069020513ab70bdb5637ca42d6e492dc3bbbad48de" dependencies = [ "once_cell", "opentelemetry", diff --git a/Makefile b/Makefile index 4cdfa35a9..b73642ca0 100644 --- a/Makefile +++ b/Makefile @@ -39,12 +39,14 @@ STACK=shuttle-prod APPS_FQDN=shuttleapp.rs DB_FQDN=db.shuttle.rs CONTAINER_REGISTRY=public.ecr.aws/shuttle +DD_ENV=production else DOCKER_COMPOSE_FILES=-f docker-compose.yml -f docker-compose.dev.yml STACK=shuttle-dev APPS_FQDN=unstable.shuttleapp.rs DB_FQDN=db.unstable.shuttle.rs CONTAINER_REGISTRY=public.ecr.aws/shuttle-dev +DD_ENV=unstable endif POSTGRES_EXTRA_PATH?=./extras/postgres @@ -52,7 +54,7 @@ POSTGRES_TAG?=14 RUST_LOG?=debug -DOCKER_COMPOSE_ENV=STACK=$(STACK) BACKEND_TAG=$(TAG) PROVISIONER_TAG=$(TAG) POSTGRES_TAG=${POSTGRES_TAG} APPS_FQDN=$(APPS_FQDN) DB_FQDN=$(DB_FQDN) POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) RUST_LOG=$(RUST_LOG) CONTAINER_REGISTRY=$(CONTAINER_REGISTRY) MONGO_INITDB_ROOT_USERNAME=$(MONGO_INITDB_ROOT_USERNAME) MONGO_INITDB_ROOT_PASSWORD=$(MONGO_INITDB_ROOT_PASSWORD) +DOCKER_COMPOSE_ENV=STACK=$(STACK) BACKEND_TAG=$(TAG) PROVISIONER_TAG=$(TAG) POSTGRES_TAG=${POSTGRES_TAG} APPS_FQDN=$(APPS_FQDN) DB_FQDN=$(DB_FQDN) POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) RUST_LOG=$(RUST_LOG) CONTAINER_REGISTRY=$(CONTAINER_REGISTRY) MONGO_INITDB_ROOT_USERNAME=$(MONGO_INITDB_ROOT_USERNAME) MONGO_INITDB_ROOT_PASSWORD=$(MONGO_INITDB_ROOT_PASSWORD) DD_ENV=$(DD_ENV) .PHONY: images clean src up down deploy shuttle-% postgres docker-compose.rendered.yml test bump-% deploy-examples publish publish-% --validate-version diff --git a/deployer/Cargo.toml b/deployer/Cargo.toml index 3ca9ad942..a26f0e134 100644 --- a/deployer/Cargo.toml +++ b/deployer/Cargo.toml @@ -21,8 +21,9 @@ hyper = { version = "0.14.20", features = ["client", "http1", "http2", "tcp" ] } # not great, but waiting for WebSocket changes to be merged hyper-reverse-proxy = { git = "https://github.com/chesedo/hyper-reverse-proxy", branch = "master" } once_cell = "1.14.0" -opentelemetry = { version = "0.17.0", features = ["rt-tokio"] } -opentelemetry-datadog = { version = "0.5.0", features = ["reqwest-client"] } +opentelemetry = { version = "0.18.0", features = ["rt-tokio"] } +opentelemetry-datadog = { version = "0.6.0", features = ["reqwest-client"] } +opentelemetry-http = "0.7.0" pipe = "0.4.0" portpicker = "0.1.1" serde = "1.0.137" @@ -37,7 +38,7 @@ tonic = "0.8.0" tower = { version = "0.4.12", features = ["make"] } tower-http = { version = "0.3.4", features = ["auth", "trace"] } tracing = "0.1.35" -tracing-opentelemetry = "0.17.4" +tracing-opentelemetry = "0.18.0" tracing-subscriber = { version = "0.3.11", features = ["env-filter"] } uuid = { version = "1.1.2", features = ["v4"] } diff --git a/deployer/src/deployment/deploy_layer.rs b/deployer/src/deployment/deploy_layer.rs index c8951d450..08f4593b9 100644 --- a/deployer/src/deployment/deploy_layer.rs +++ b/deployer/src/deployment/deploy_layer.rs @@ -889,6 +889,7 @@ mod tests { id, service_name: "run-test".to_string(), service_id: Uuid::new_v4(), + tracing_context: Default::default(), }) .await; @@ -945,6 +946,7 @@ mod tests { service_id: Uuid::new_v4(), data: Bytes::from("violets are red").to_vec(), will_run_tests: false, + tracing_context: Default::default(), }) .await; @@ -988,6 +990,7 @@ mod tests { service_id: Uuid::new_v4(), data: bytes, will_run_tests: false, + tracing_context: Default::default(), } } } diff --git a/deployer/src/deployment/mod.rs b/deployer/src/deployment/mod.rs index 735b004d3..0d2c74071 100644 --- a/deployer/src/deployment/mod.rs +++ b/deployer/src/deployment/mod.rs @@ -8,7 +8,8 @@ use std::path::PathBuf; pub use queue::Queued; pub use run::{ActiveDeploymentsGetter, Built}; -use tracing::instrument; +use tracing::{instrument, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::persistence::{SecretRecorder, State}; use tokio::sync::{broadcast, mpsc}; @@ -54,7 +55,13 @@ impl DeploymentManager { } #[instrument(skip(self), fields(id = %queued.id, state = %State::Queued))] - pub async fn queue_push(&self, queued: Queued) { + pub async fn queue_push(&self, mut queued: Queued) { + let cx = Span::current().context(); + + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut queued.tracing_context); + }); + self.pipeline.queue_send.send(queued).await.unwrap(); } diff --git a/deployer/src/deployment/queue.rs b/deployer/src/deployment/queue.rs index 2c559f102..2eb3c3ea9 100644 --- a/deployer/src/deployment/queue.rs +++ b/deployer/src/deployment/queue.rs @@ -6,12 +6,14 @@ use crate::persistence::{LogLevel, SecretRecorder}; use cargo_metadata::Message; use chrono::Utc; use crossbeam_channel::Sender; +use opentelemetry::global; use serde_json::json; use shuttle_service::loader::{build_crate, get_config}; -use tracing::{debug, error, info, instrument, trace}; +use tracing::{debug, debug_span, error, info, instrument, trace, Instrument, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use uuid::Uuid; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::fmt; use std::fs::remove_file; use std::io::Read; @@ -58,27 +60,43 @@ pub async fn task( let libs_path = libs_path.clone(); tokio::spawn(async move { - match queued - .handle(builds_path, libs_path, log_recorder, secret_recorder) - .await - { - Ok(built) => promote_to_run(built, run_send_cloned).await, - Err(err) => build_failed(&id, err), + let parent_cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&queued.tracing_context) + }); + let span = debug_span!("builder"); + span.set_parent(parent_cx); + + async move { + match queued + .handle(builds_path, libs_path, log_recorder, secret_recorder) + .await + { + Ok(built) => promote_to_run(built, run_send_cloned).await, + Err(err) => build_failed(&id, err), + } } + .instrument(span) + .await }); } } -#[instrument(fields(id = %_id, state = %State::Crashed))] -fn build_failed(_id: &Uuid, err: impl std::error::Error + 'static) { +#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))] +fn build_failed(_id: &Uuid, error: impl std::error::Error + 'static) { error!( - error = &err as &dyn std::error::Error, + error = &error as &dyn std::error::Error, "service build encountered an error" ); } #[instrument(fields(id = %built.id, state = %State::Built))] -async fn promote_to_run(built: Built, run_send: RunSender) { +async fn promote_to_run(mut built: Built, run_send: RunSender) { + let cx = Span::current().context(); + + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut built.tracing_context); + }); + if let Err(err) = run_send.send(built.clone()).await { build_failed(&built.id, err); } @@ -90,10 +108,11 @@ pub struct Queued { pub service_id: Uuid, pub data: Vec, pub will_run_tests: bool, + pub tracing_context: HashMap, } impl Queued { - #[instrument(name = "queued_handle", skip(self, builds_path, libs_path, log_recorder, secret_recorder), fields(id = %self.id, state = %State::Building))] + #[instrument(skip(self, builds_path, libs_path, log_recorder, secret_recorder), fields(id = %self.id, state = %State::Building))] async fn handle( self, builds_path: PathBuf, @@ -169,6 +188,7 @@ impl Queued { id: self.id, service_name: self.service_name, service_id: self.service_id, + tracing_context: Default::default(), }; Ok(built) diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index eb323586d..2679ab2cc 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -1,10 +1,12 @@ use std::{ + collections::HashMap, net::{Ipv4Addr, SocketAddr}, path::PathBuf, str::FromStr, }; use async_trait::async_trait; +use opentelemetry::global; use portpicker::pick_unused_port; use shuttle_common::project::ProjectName as ServiceName; use shuttle_service::{ @@ -12,7 +14,8 @@ use shuttle_service::{ Factory, Logger, }; use tokio::task::JoinError; -use tracing::{debug, error, info, instrument, trace}; +use tracing::{debug, debug_span, error, info, instrument, trace, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use uuid::Uuid; use super::{provisioner_factory, runtime_logger, KillReceiver, KillSender, RunReceiver, State}; @@ -95,22 +98,32 @@ pub async fn task( let libs_path = libs_path.clone(); tokio::spawn(async move { - if let Err(err) = built - .handle( - addr, - libs_path, - &mut factory, - logger, - kill_recv, - old_deployments_killer, - cleanup, - ) - .await - { - start_crashed_cleanup(&id, err) + let parent_cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&built.tracing_context) + }); + let span = debug_span!("runner"); + span.set_parent(parent_cx); + + async move { + if let Err(err) = built + .handle( + addr, + libs_path, + &mut factory, + logger, + kill_recv, + old_deployments_killer, + cleanup, + ) + .await + { + start_crashed_cleanup(&id, err) + } + + info!("deployment done"); } - - info!("deployment done"); + .instrument(span) + .await }); } } @@ -139,28 +152,28 @@ async fn kill_old_deployments( Ok(()) } -#[instrument(fields(id = %_id, state = %State::Completed))] +#[instrument(skip(_id), fields(id = %_id, state = %State::Completed))] fn completed_cleanup(_id: &Uuid) { info!("service finished all on its own"); } -#[instrument(fields(id = %_id, state = %State::Stopped))] +#[instrument(skip(_id), fields(id = %_id, state = %State::Stopped))] fn stopped_cleanup(_id: &Uuid) { info!("service was stopped by the user"); } -#[instrument(fields(id = %_id, state = %State::Crashed))] -fn crashed_cleanup(_id: &Uuid, err: impl std::error::Error + 'static) { +#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))] +fn crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) { error!( - error = &err as &dyn std::error::Error, + error = &error as &dyn std::error::Error, "service encountered an error" ); } -#[instrument(fields(id = %_id, state = %State::Crashed))] -fn start_crashed_cleanup(_id: &Uuid, err: impl std::error::Error + 'static) { +#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))] +fn start_crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) { error!( - error = &err as &dyn std::error::Error, + error = &error as &dyn std::error::Error, "service startup encountered an error" ); } @@ -180,10 +193,11 @@ pub struct Built { pub id: Uuid, pub service_name: String, pub service_id: Uuid, + pub tracing_context: HashMap, } impl Built { - #[instrument(name = "built_handle", skip(self, libs_path, factory, logger, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))] + #[instrument(skip(self, libs_path, factory, logger, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))] #[allow(clippy::too_many_arguments)] async fn handle( self, @@ -493,6 +507,7 @@ mod tests { id: Uuid::new_v4(), service_name: "test".to_string(), service_id: Uuid::new_v4(), + tracing_context: Default::default(), }; let (_kill_send, kill_recv) = broadcast::channel(1); @@ -555,6 +570,7 @@ mod tests { id, service_name: crate_name.to_string(), service_id: Uuid::new_v4(), + tracing_context: Default::default(), } } } diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index 7c48b9f8c..daf56eea1 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -10,11 +10,14 @@ use bytes::BufMut; use chrono::{TimeZone, Utc}; use fqdn::FQDN; use futures::StreamExt; +use opentelemetry::global; +use opentelemetry_http::HeaderExtractor; use shuttle_common::models::secret; use shuttle_common::LogItem; use tower_http::auth::RequireAuthorizationLayer; use tower_http::trace::TraceLayer; use tracing::{debug, debug_span, error, field, trace, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use uuid::Uuid; use crate::deployment::{DeploymentManager, Queued}; @@ -60,7 +63,13 @@ pub fn make_router( .layer( TraceLayer::new_for_http() .make_span_with(|request: &Request| { - debug_span!("request", http.uri = %request.uri(), http.method = %request.method(), http.status_code = field::Empty, api_key = field::Empty) + let span = debug_span!("request", http.uri = %request.uri(), http.method = %request.method(), http.status_code = field::Empty); + let parent_context = global::get_text_map_propagator(|propagator| { + propagator.extract(&HeaderExtractor(request.headers())) + }); + span.set_parent(parent_context); + + span }) .on_response( |response: &Response, latency: Duration, span: &Span| { @@ -185,6 +194,7 @@ async fn post_service( service_id: service.id, data, will_run_tests: !params.contains_key("no-test"), + tracing_context: Default::default(), }; deployment_manager.queue_push(queued).await; diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index 30b40a42e..265554402 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -42,6 +42,7 @@ pub async fn start( id: existing_deployment.id, service_name: existing_deployment.service_name, service_id: existing_deployment.service_id, + tracing_context: Default::default(), }; deployment_manager.run_push(built).await; } diff --git a/deployer/src/main.rs b/deployer/src/main.rs index 230d16b58..5e1afa68a 100644 --- a/deployer/src/main.rs +++ b/deployer/src/main.rs @@ -1,4 +1,5 @@ use clap::Parser; +use opentelemetry::global; use shuttle_deployer::{ start, start_proxy, AbstractProvisionerFactory, Args, DeployLayer, Persistence, RuntimeLoggerFactory, @@ -17,6 +18,8 @@ async fn main() { trace!(args = ?args, "parsed args"); + global::set_text_map_propagator(opentelemetry_datadog::DatadogPropagator::new()); + let fmt_layer = fmt::layer(); let filter_layer = EnvFilter::try_from_default_env() .or_else(|_| EnvFilter::try_new("info")) diff --git a/deployer/src/persistence/mod.rs b/deployer/src/persistence/mod.rs index 28be90397..f3715b072 100644 --- a/deployer/src/persistence/mod.rs +++ b/deployer/src/persistence/mod.rs @@ -77,7 +77,7 @@ impl Persistence { let (log_send, log_recv): (crossbeam_channel::Sender, _) = crossbeam_channel::bounded(0); - let (stream_log_send, _) = broadcast::channel(32); + let (stream_log_send, _) = broadcast::channel(1); let stream_log_send_clone = stream_log_send.clone(); let pool_cloned = pool.clone(); diff --git a/deployer/src/proxy.rs b/deployer/src/proxy.rs index 72de43909..5fd290147 100644 --- a/deployer/src/proxy.rs +++ b/deployer/src/proxy.rs @@ -11,7 +11,10 @@ use hyper::{ }; use hyper_reverse_proxy::{ProxyError, ReverseProxy}; use once_cell::sync::Lazy; +use opentelemetry::global; +use opentelemetry_http::HeaderExtractor; use tracing::{error, field, instrument, trace, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; static PROXY_CLIENT: Lazy>> = Lazy::new(|| ReverseProxy::new(Client::new())); @@ -24,6 +27,12 @@ pub async fn handle( req: Request, address_getter: impl AddressGetter, ) -> Result, Infallible> { + let span = Span::current(); + let parent_context = global::get_text_map_propagator(|propagator| { + propagator.extract(&HeaderExtractor(req.headers())) + }); + span.set_parent(parent_context); + let host = match req.headers().get(HOST) { Some(host) => host.to_str().unwrap_or_default().to_owned(), None => { @@ -48,7 +57,7 @@ pub async fn handle( }; // Record current service for tracing purposes - Span::current().record("service", &service); + span.record("service", &service); let proxy_address = match address_getter.get_address_for_service(service).await { Ok(Some(address)) => address, diff --git a/docker-compose.yml b/docker-compose.yml index 63960ad78..e9e0fb672 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -131,6 +131,8 @@ services: - DD_APM_NON_LOCAL_TRAFFIC=true - DD_SITE=datadoghq.eu - DD_API_KEY=${DD_API_KEY} + - DD_ENV=${DD_ENV} + - DD_CONTAINER_LABELS_AS_TAGS={"project.name":"project_name"} deploy: placement: constraints: diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index e4b171d61..0c8e54807 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -18,8 +18,9 @@ hyper = { version = "0.14.19", features = [ "stream" ] } # not great, but waiting for WebSocket changes to be merged hyper-reverse-proxy = { git = "https://github.com/chesedo/hyper-reverse-proxy", branch = "bug/host_header" } once_cell = "1.14.0" -opentelemetry = { version = "0.17.0", features = ["rt-tokio"] } -opentelemetry-datadog = { version = "0.5.0", features = ["reqwest-client"] } +opentelemetry = { version = "0.18.0", features = ["rt-tokio"] } +opentelemetry-datadog = { version = "0.6.0", features = ["reqwest-client"] } +opentelemetry-http = "0.7.0" rand = "0.8.5" regex = "1.5.5" serde = { version = "1.0.137", features = [ "derive" ] } @@ -29,7 +30,7 @@ tokio = { version = "1.17", features = [ "full" ] } tower = { version = "0.4.13", features = [ "steer" ] } tower-http = { version = "0.3.4", features = ["trace"] } tracing = "0.1.35" -tracing-opentelemetry = "0.17.4" +tracing-opentelemetry = "0.18.0" tracing-subscriber = { version = "0.3.11", features = ["env-filter"] } [dependencies.shuttle-common] diff --git a/gateway/src/api/latest.rs b/gateway/src/api/latest.rs index 524bb42a2..1942e16ba 100644 --- a/gateway/src/api/latest.rs +++ b/gateway/src/api/latest.rs @@ -158,7 +158,7 @@ pub fn make_api(service: Arc, sender: Sender) -> Router| { - debug_span!("request", http.uri = %request.uri(), http.method = %request.method(), http.status_code = field::Empty, api_key = field::Empty) + debug_span!("request", http.uri = %request.uri(), http.method = %request.method(), http.status_code = field::Empty, account.name = field::Empty, account.project = field::Empty) }) .on_response( |response: &Response, latency: Duration, span: &Span| { diff --git a/gateway/src/auth.rs b/gateway/src/auth.rs index 5c66cceee..c5ea0b6ae 100644 --- a/gateway/src/auth.rs +++ b/gateway/src/auth.rs @@ -7,6 +7,7 @@ use axum::headers::authorization::Bearer; use axum::headers::Authorization; use rand::distributions::{Alphanumeric, DistString}; use serde::{Deserialize, Serialize}; +use tracing::Span; use crate::service::GatewayService; use crate::{AccountName, Error, ErrorKind, ProjectName}; @@ -92,6 +93,10 @@ where .await // Absord any error into `Unauthorized` .map_err(|e| Error::source(ErrorKind::Unauthorized, e))?; + + // Record current account name for tracing purposes + Span::current().record("account.name", &user.name.to_string()); + Ok(user) } } @@ -136,6 +141,9 @@ where .map(|Path((p, _))| p) .unwrap(), }; + + // Record current project for tracing purposes + Span::current().record("account.project", &scope.to_string()); if user.super_user || user.projects.contains(&scope) { Ok(Self { user, scope }) } else { diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 24df027fc..0696bad8e 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -1,5 +1,6 @@ use clap::Parser; use futures::prelude::*; +use opentelemetry::global; use shuttle_gateway::args::{Args, Commands, ExecCmd, ExecCmds, InitArgs}; use shuttle_gateway::auth::Key; use shuttle_gateway::proxy::make_proxy; @@ -21,6 +22,8 @@ async fn main() -> io::Result<()> { trace!(args = ?args, "parsed args"); + global::set_text_map_propagator(opentelemetry_datadog::DatadogPropagator::new()); + let fmt_layer = fmt::layer(); let filter_layer = EnvFilter::try_from_default_env() .or_else(|_| EnvFilter::try_new("info")) diff --git a/gateway/src/project.rs b/gateway/src/project.rs index e11a62ae0..5e9ec6ecb 100644 --- a/gateway/src/project.rs +++ b/gateway/src/project.rs @@ -371,6 +371,9 @@ impl ProjectCreating { "Env": [ "RUST_LOG=debug", ], + "Labels": { + "project.name": project_name, + } }); let mut config = Config::::from(container_config); diff --git a/gateway/src/proxy.rs b/gateway/src/proxy.rs index 012604ed4..bbe497773 100644 --- a/gateway/src/proxy.rs +++ b/gateway/src/proxy.rs @@ -14,8 +14,11 @@ use hyper::server::conn::AddrStream; use hyper::{Client, Request}; use hyper_reverse_proxy::ReverseProxy; use once_cell::sync::Lazy; +use opentelemetry::global; +use opentelemetry_http::HeaderInjector; use tower::Service; -use tracing::debug; +use tracing::{debug, debug_span, field}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::service::GatewayService; use crate::{Error, ErrorKind, ProjectName}; @@ -39,12 +42,14 @@ impl Service> for ProxyService { Poll::Ready(Ok(())) } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, mut req: Request) -> Self::Future { let remote_addr = self.remote_addr.ip(); let gateway = Arc::clone(&self.gateway); let fqdn = self.fqdn.clone(); + Box::pin( async move { + let span = debug_span!("proxy", http.method = %req.method(), http.uri = %req.uri(), http.status_code = field::Empty, project = field::Empty); let project_str = req .headers() .get("Host") @@ -58,12 +63,21 @@ impl Service> for ProxyService { let project = gateway.find_project(&project_name).await?; + // Record current project for tracing purposes + span.record("project", &project_name.to_string()); + let target_ip = project .target_ip()? .ok_or_else(|| Error::from_kind(ErrorKind::ProjectNotReady))?; let target_url = format!("http://{}:{}", target_ip, 8000); + let cx = span.context(); + + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut())) + }); + let proxy = PROXY_CLIENT .call(remote_addr, &target_url, req) .await @@ -71,6 +85,9 @@ impl Service> for ProxyService { let (parts, body) = proxy.into_parts(); let body = ::map_err(body, axum::Error::new).boxed_unsync(); + + span.record("http.status_code", &parts.status.as_u16()); + Ok(Response::from_parts(parts, body)) } .or_else(|err: Error| future::ready(Ok(err.into_response()))), diff --git a/gateway/src/service.rs b/gateway/src/service.rs index d8a84f712..54d0065e8 100644 --- a/gateway/src/service.rs +++ b/gateway/src/service.rs @@ -12,13 +12,16 @@ use hyper::client::HttpConnector; use hyper::Client; use hyper_reverse_proxy::ReverseProxy; use once_cell::sync::Lazy; +use opentelemetry::global; +use opentelemetry_http::HeaderInjector; use rand::distributions::{Alphanumeric, DistString}; use sqlx::error::DatabaseError; use sqlx::migrate::Migrator; use sqlx::sqlite::SqlitePool; use sqlx::types::Json as SqlxJson; use sqlx::{query, Error as SqlxError, Row}; -use tracing::debug; +use tracing::{debug, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::args::ContextArgs; use crate::auth::{Key, User}; @@ -218,6 +221,11 @@ impl GatewayService { debug!(target_url, "routing control"); + let cx = Span::current().context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut())) + }); + let resp = PROXY_CLIENT .call("127.0.0.1".parse().unwrap(), &target_url, req) .await