mirror of
https://github.com/vicanso/pingap.git
synced 2025-04-20 14:23:44 +08:00
refactor: merge the state module with the core module
This commit is contained in:
parent
7eba5788b2
commit
459ba7db0e
31
Cargo.lock
generated
31
Cargo.lock
generated
@ -2879,7 +2879,6 @@ dependencies = [
|
||||
"pingap-pyroscope",
|
||||
"pingap-sentry",
|
||||
"pingap-service",
|
||||
"pingap-state",
|
||||
"pingap-upstream",
|
||||
"pingap-util",
|
||||
"pingap-webhook",
|
||||
@ -2961,6 +2960,7 @@ dependencies = [
|
||||
"nanoid",
|
||||
"once_cell",
|
||||
"pingap-config",
|
||||
"pingap-core",
|
||||
"pingap-service",
|
||||
"pingap-util",
|
||||
"pingap-webhook",
|
||||
@ -3022,6 +3022,7 @@ dependencies = [
|
||||
"ahash",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"hostname 0.4.0",
|
||||
"http 1.2.0",
|
||||
"itoa",
|
||||
"once_cell",
|
||||
@ -3035,6 +3036,7 @@ dependencies = [
|
||||
"strum",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-test",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
@ -3047,6 +3049,7 @@ dependencies = [
|
||||
"futures",
|
||||
"hickory-resolver",
|
||||
"http 1.2.0",
|
||||
"pingap-core",
|
||||
"pingap-util",
|
||||
"pingap-webhook",
|
||||
"pingora",
|
||||
@ -3065,8 +3068,6 @@ dependencies = [
|
||||
"async-trait",
|
||||
"http 1.2.0",
|
||||
"humantime",
|
||||
"pingap-util",
|
||||
"pingap-webhook",
|
||||
"pingora",
|
||||
"pretty_assertions",
|
||||
"snafu",
|
||||
@ -3218,7 +3219,6 @@ dependencies = [
|
||||
"pingap-cache",
|
||||
"pingap-config",
|
||||
"pingap-core",
|
||||
"pingap-state",
|
||||
"pingap-util",
|
||||
"pingora",
|
||||
"pingora-limits",
|
||||
@ -3273,21 +3273,6 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pingap-state"
|
||||
version = "0.9.10"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"hostname 0.4.0",
|
||||
"http 1.2.0",
|
||||
"once_cell",
|
||||
"pingap-core",
|
||||
"pingora",
|
||||
"pretty_assertions",
|
||||
"tokio",
|
||||
"tokio-test",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pingap-upstream"
|
||||
version = "0.9.10"
|
||||
@ -3301,6 +3286,7 @@ dependencies = [
|
||||
"futures-util",
|
||||
"once_cell",
|
||||
"pingap-config",
|
||||
"pingap-core",
|
||||
"pingap-discovery",
|
||||
"pingap-health",
|
||||
"pingap-service",
|
||||
@ -3322,11 +3308,9 @@ dependencies = [
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"dirs",
|
||||
"hostname 0.4.0",
|
||||
"http 1.2.0",
|
||||
"ipnet",
|
||||
"itoa",
|
||||
"local-ip-address 0.6.3",
|
||||
"once_cell",
|
||||
"path-absolutize",
|
||||
"pingora",
|
||||
@ -3346,9 +3330,10 @@ name = "pingap-webhook"
|
||||
version = "0.9.10"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"hostname 0.4.0",
|
||||
"local-ip-address 0.6.3",
|
||||
"once_cell",
|
||||
"pingap-util",
|
||||
"pingora",
|
||||
"pingap-core",
|
||||
"reqwest 0.12.12",
|
||||
"serde_json",
|
||||
"strum",
|
||||
|
@ -81,7 +81,6 @@ pingap-cache = { path = "pingap-cache" }
|
||||
pingap-upstream = { path = "pingap-upstream" }
|
||||
pingap-certificate = { path = "pingap-certificate" }
|
||||
pingap-location = { path = "pingap-location" }
|
||||
pingap-state = { path = "pingap-state" }
|
||||
pingap-performance = { path = "pingap-performance" }
|
||||
pingap-logger = { path = "pingap-logger" }
|
||||
pingap-acme = { path = "pingap-acme" }
|
||||
@ -145,7 +144,6 @@ members = [
|
||||
"pingap-upstream",
|
||||
"pingap-certificate",
|
||||
"pingap-location",
|
||||
"pingap-state",
|
||||
"pingap-performance",
|
||||
"pingap-logger",
|
||||
"pingap-otel",
|
||||
|
@ -14,6 +14,7 @@ graph TD
|
||||
cache --> util
|
||||
|
||||
certificate --> config
|
||||
certificate --> core
|
||||
certificate --> service
|
||||
certificate --> util
|
||||
certificate --> webhook
|
||||
@ -22,12 +23,10 @@ graph TD
|
||||
config --> discovery
|
||||
config --> util
|
||||
|
||||
discovery --> core
|
||||
discovery --> util
|
||||
discovery --> webhook
|
||||
|
||||
health --> util
|
||||
health --> webhook
|
||||
|
||||
limit --> util
|
||||
|
||||
location --> config
|
||||
@ -49,18 +48,16 @@ graph TD
|
||||
plugin --> cache
|
||||
plugin --> config
|
||||
plugin --> core
|
||||
plugin --> state
|
||||
plugin --> util
|
||||
|
||||
state --> core
|
||||
|
||||
upstream --> config
|
||||
upstream --> core
|
||||
upstream --> discovery
|
||||
upstream --> health
|
||||
upstream --> service
|
||||
upstream --> util
|
||||
|
||||
webhook --> util
|
||||
webhook --> core
|
||||
|
||||
pingap --> acme
|
||||
pingap --> cache
|
||||
@ -78,7 +75,6 @@ graph TD
|
||||
pingap --> pyroscope
|
||||
pingap --> sentry
|
||||
pingap --> service
|
||||
pingap --> state
|
||||
pingap --> upstream
|
||||
pingap --> util
|
||||
pingap --> webhook
|
||||
|
@ -155,26 +155,23 @@ async fn handle_successful_renewal(domains: &[String], conf: &PingapConf) {
|
||||
"renew certificate success"
|
||||
);
|
||||
|
||||
pingap_webhook::send_notification(pingap_webhook::SendNotificationParams {
|
||||
category: pingap_webhook::NotificationCategory::LetsEncrypt,
|
||||
msg: "Generate new cert from lets encrypt".to_string(),
|
||||
remark: Some(format!("Domains: {domains:?}")),
|
||||
..Default::default()
|
||||
pingap_webhook::send_notification(pingap_core::NotificationData {
|
||||
category: "lets_encrypt".to_string(),
|
||||
level: pingap_core::NotificationLevel::Info,
|
||||
title: "Generate new cert from lets encrypt".to_string(),
|
||||
message: format!("Domains: {domains:?}"),
|
||||
})
|
||||
.await;
|
||||
|
||||
let (_, errors) = try_update_certificates(&conf.certificates);
|
||||
if !errors.is_empty() {
|
||||
error!(error = errors, "parse certificate fail");
|
||||
pingap_webhook::send_notification(
|
||||
pingap_webhook::SendNotificationParams {
|
||||
category:
|
||||
pingap_webhook::NotificationCategory::ParseCertificateFail,
|
||||
level: pingap_webhook::NotificationLevel::Error,
|
||||
msg: errors,
|
||||
remark: None,
|
||||
},
|
||||
)
|
||||
pingap_webhook::send_notification(pingap_core::NotificationData {
|
||||
category: "parse_certificate_fail".to_string(),
|
||||
level: pingap_core::NotificationLevel::Error,
|
||||
message: errors,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
@ -30,6 +30,8 @@ pingap-util = { path = "../pingap-util" }
|
||||
pingap-service = { path = "../pingap-service" }
|
||||
pingap-config = { path = "../pingap-config" }
|
||||
pingap-webhook = { path = "../pingap-webhook" }
|
||||
pingap-core = { path = "../pingap-core" }
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = "1.4.0"
|
||||
|
@ -104,14 +104,12 @@ async fn do_validity_check(count: u32) -> Result<bool, ServiceError> {
|
||||
|
||||
if let Err(err) = validity_check(&certificate_info_list, time_offset) {
|
||||
error!(category = LOG_CATEGORY, task = "validityChecker", error = %err);
|
||||
pingap_webhook::send_notification(
|
||||
pingap_webhook::SendNotificationParams {
|
||||
level: pingap_webhook::NotificationLevel::Warn,
|
||||
category: pingap_webhook::NotificationCategory::TlsValidity,
|
||||
msg: err.to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
pingap_webhook::send_notification(pingap_core::NotificationData {
|
||||
level: pingap_core::NotificationLevel::Warn,
|
||||
category: "tls_validity".to_string(),
|
||||
message: err.to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
}
|
||||
Ok(true)
|
||||
|
@ -24,6 +24,7 @@ serde = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
hostname = "0.4.0"
|
||||
opentelemetry = { version = "0.27.1", default-features = false, features = [
|
||||
"trace",
|
||||
], optional = true }
|
||||
@ -35,3 +36,4 @@ full = ["opentelemetry"]
|
||||
[dev-dependencies]
|
||||
pretty_assertions = "1.4.0"
|
||||
tempfile = "3.16.0"
|
||||
tokio-test = "0.4.4"
|
||||
|
@ -124,7 +124,7 @@ pub struct Ctx {
|
||||
pub connection_time: u64,
|
||||
/// Indicates if this connection is being reused
|
||||
pub connection_reused: bool,
|
||||
/// The location configuration handling this request
|
||||
/// The location handling this request
|
||||
pub location: String,
|
||||
/// Address of the upstream server
|
||||
pub upstream_address: String,
|
||||
@ -154,6 +154,7 @@ pub struct Ctx {
|
||||
pub cache_lock_time: Option<u64>,
|
||||
/// Maximum time-to-live for cache entries
|
||||
pub cache_max_ttl: Option<Duration>,
|
||||
/// The upstream server
|
||||
pub upstream: String,
|
||||
/// Indicates if the upstream connection is being reused
|
||||
pub upstream_reused: bool,
|
||||
|
@ -12,13 +12,32 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use super::{get_hostname, Ctx};
|
||||
use bytes::BytesMut;
|
||||
use http::header;
|
||||
use http::{HeaderName, HeaderValue};
|
||||
use once_cell::sync::Lazy;
|
||||
use pingora::http::RequestHeader;
|
||||
use pingora::proxy::Session;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::str::FromStr;
|
||||
|
||||
pub const HOST_NAME_TAG: &[u8] = b"$hostname";
|
||||
const HOST_TAG: &[u8] = b"$host";
|
||||
const SCHEME_TAG: &[u8] = b"$scheme";
|
||||
const REMOTE_ADDR_TAG: &[u8] = b"$remote_addr";
|
||||
const REMOTE_PORT_TAG: &[u8] = b"$remote_port";
|
||||
const SERVER_ADDR_TAG: &[u8] = b"$server_addr";
|
||||
const SERVER_PORT_TAG: &[u8] = b"$server_port";
|
||||
const PROXY_ADD_FORWARDED_TAG: &[u8] = b"$proxy_add_x_forwarded_for";
|
||||
const UPSTREAM_ADDR_TAG: &[u8] = b"$upstream_addr";
|
||||
|
||||
static SCHEME_HTTPS: HeaderValue = HeaderValue::from_static("https");
|
||||
static SCHEME_HTTP: HeaderValue = HeaderValue::from_static("http");
|
||||
|
||||
static HTTP_HEADER_X_FORWARDED_FOR: Lazy<http::HeaderName> =
|
||||
Lazy::new(|| HeaderName::from_str("X-Forwarded-For").unwrap());
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Invalid header value: {value} - {source}"))]
|
||||
@ -36,6 +55,21 @@ type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
pub type HttpHeader = (HeaderName, HeaderValue);
|
||||
|
||||
/// Get request host in this order of precedence:
|
||||
/// host name from the request line,
|
||||
/// or host name from the "Host" request header field
|
||||
fn get_host(header: &RequestHeader) -> Option<&str> {
|
||||
if let Some(host) = header.uri.host() {
|
||||
return Some(host);
|
||||
}
|
||||
if let Some(host) = header.headers.get("Host") {
|
||||
if let Ok(value) = host.to_str().map(|host| host.split(':').next()) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Converts a string in "name: value" format into an HTTP header tuple.
|
||||
/// Returns None if the input string doesn't contain a colon separator.
|
||||
///
|
||||
@ -120,10 +154,140 @@ pub static HTTP_HEADER_TRANSFER_CHUNKED: Lazy<HttpHeader> = Lazy::new(|| {
|
||||
pub static HTTP_HEADER_NAME_X_REQUEST_ID: Lazy<HeaderName> =
|
||||
Lazy::new(|| HeaderName::from_str("X-Request-Id").unwrap());
|
||||
|
||||
/// Processes special header values that contain dynamic variables.
|
||||
/// Supports variables like $host, $scheme, $remote_addr etc.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `value` - The header value to process
|
||||
/// * `session` - The HTTP session context
|
||||
/// * `ctx` - The application state
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Option<HeaderValue>` - The processed header value or None if no special handling needed
|
||||
#[inline]
|
||||
pub fn convert_header_value(
|
||||
value: &HeaderValue,
|
||||
session: &Session,
|
||||
ctx: &Ctx,
|
||||
) -> Option<HeaderValue> {
|
||||
let buf = value.as_bytes();
|
||||
|
||||
// Early return if not a special header (moved this check earlier)
|
||||
if buf.is_empty() || !(buf[0] == b'$' || buf[0] == b':') {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Helper closure to convert string to HeaderValue
|
||||
let to_header_value = |s: &str| HeaderValue::from_str(s).ok();
|
||||
|
||||
match buf {
|
||||
HOST_TAG => get_host(session.req_header()).and_then(to_header_value),
|
||||
SCHEME_TAG => Some(if ctx.tls_version.is_some() {
|
||||
SCHEME_HTTPS.clone()
|
||||
} else {
|
||||
SCHEME_HTTP.clone()
|
||||
}),
|
||||
HOST_NAME_TAG => to_header_value(get_hostname()),
|
||||
REMOTE_ADDR_TAG => ctx.remote_addr.as_deref().and_then(to_header_value),
|
||||
REMOTE_PORT_TAG => ctx
|
||||
.remote_port
|
||||
.map(|p| p.to_string())
|
||||
.and_then(|s| to_header_value(&s)),
|
||||
SERVER_ADDR_TAG => ctx.server_addr.as_deref().and_then(to_header_value),
|
||||
SERVER_PORT_TAG => ctx
|
||||
.server_port
|
||||
.map(|p| p.to_string())
|
||||
.and_then(|s| to_header_value(&s)),
|
||||
UPSTREAM_ADDR_TAG => {
|
||||
if !ctx.upstream_address.is_empty() {
|
||||
to_header_value(&ctx.upstream_address)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
},
|
||||
PROXY_ADD_FORWARDED_TAG => {
|
||||
ctx.remote_addr.as_deref().and_then(|remote_addr| {
|
||||
let value = match session
|
||||
.get_header(HTTP_HEADER_X_FORWARDED_FOR.clone())
|
||||
{
|
||||
Some(existing) => format!(
|
||||
"{}, {}",
|
||||
existing.to_str().unwrap_or_default(),
|
||||
remote_addr
|
||||
),
|
||||
None => remote_addr.to_string(),
|
||||
};
|
||||
to_header_value(&value)
|
||||
})
|
||||
},
|
||||
_ => handle_special_headers(buf, session, ctx),
|
||||
}
|
||||
}
|
||||
|
||||
const HTTP_HEADER_PREFIX: &[u8] = b"$http_";
|
||||
const HTTP_HEADER_PREFIX_LEN: usize = HTTP_HEADER_PREFIX.len();
|
||||
|
||||
#[inline]
|
||||
fn handle_special_headers(
|
||||
buf: &[u8],
|
||||
session: &Session,
|
||||
ctx: &Ctx,
|
||||
) -> Option<HeaderValue> {
|
||||
// Handle headers that reference other HTTP headers (e.g., $http_origin)
|
||||
if buf.starts_with(HTTP_HEADER_PREFIX) {
|
||||
return handle_http_header(buf, session);
|
||||
}
|
||||
// Handle environment variable references (e.g., $HOME)
|
||||
if buf.starts_with(b"$") {
|
||||
return handle_env_var(buf);
|
||||
}
|
||||
// Handle context value references (e.g., :connection_id)
|
||||
if buf.starts_with(b":") {
|
||||
return handle_context_value(buf, ctx);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn handle_http_header(buf: &[u8], session: &Session) -> Option<HeaderValue> {
|
||||
// Skip the "$http_" prefix (6 bytes) and convert remaining bytes to header key
|
||||
let key = std::str::from_utf8(&buf[HTTP_HEADER_PREFIX_LEN..]).ok()?;
|
||||
// Look up and clone the header value from the session
|
||||
session.get_header(key).cloned()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn handle_env_var(buf: &[u8]) -> Option<HeaderValue> {
|
||||
// Skip the "$" prefix and convert to environment variable name
|
||||
let var_name = std::str::from_utf8(&buf[1..]).ok()?;
|
||||
// Look up environment variable and convert to HeaderValue if found
|
||||
std::env::var(var_name)
|
||||
.ok()
|
||||
.and_then(|v| HeaderValue::from_str(&v).ok())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn handle_context_value(buf: &[u8], ctx: &Ctx) -> Option<HeaderValue> {
|
||||
// Skip the ":" prefix and convert to context key
|
||||
let key = std::str::from_utf8(&buf[1..]).ok()?;
|
||||
// Pre-allocate buffer for value
|
||||
let mut value = BytesMut::with_capacity(20);
|
||||
// Append context value to buffer
|
||||
value = ctx.append_value(value, key);
|
||||
// Convert to HeaderValue if buffer is not empty
|
||||
if !value.is_empty() {
|
||||
HeaderValue::from_bytes(&value).ok()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio_test::io::Builder;
|
||||
|
||||
#[test]
|
||||
fn test_convert_headers() {
|
||||
let headers = convert_headers(&[
|
||||
@ -202,4 +366,206 @@ mod tests {
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_convert_header_value() {
|
||||
let headers = ["Host: pingap.io"].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let default_state = Ctx {
|
||||
tls_version: Some("tls1.3".to_string()),
|
||||
remote_addr: Some("10.1.1.1".to_string()),
|
||||
remote_port: Some(6000),
|
||||
server_addr: Some("10.1.1.2".to_string()),
|
||||
server_port: Some(6001),
|
||||
upstream_address: "10.1.1.3:4123".to_string(),
|
||||
connection_id: 102,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$host").unwrap(),
|
||||
&session,
|
||||
&Ctx {
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("pingap.io", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$scheme").unwrap(),
|
||||
&session,
|
||||
&Ctx {
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("http", value.unwrap().to_str().unwrap());
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$scheme").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("https", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$remote_addr").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("10.1.1.1", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$remote_port").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("6000", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$server_addr").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("10.1.1.2", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$server_port").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("6001", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$upstream_addr").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("10.1.1.3:4123", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str(":connection_id").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("102", value.unwrap().to_str().unwrap());
|
||||
|
||||
let headers = ["X-Forwarded-For: 1.1.1.1, 2.2.2.2"].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$proxy_add_x_forwarded_for").unwrap(),
|
||||
&session,
|
||||
&Ctx {
|
||||
remote_addr: Some("10.1.1.1".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!(
|
||||
"1.1.1.1, 2.2.2.2, 10.1.1.1",
|
||||
value.unwrap().to_str().unwrap()
|
||||
);
|
||||
|
||||
let headers = [""].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$proxy_add_x_forwarded_for").unwrap(),
|
||||
&session,
|
||||
&Ctx {
|
||||
remote_addr: Some("10.1.1.1".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("10.1.1.1", value.unwrap().to_str().unwrap());
|
||||
|
||||
let headers = [""].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$upstream_addr").unwrap(),
|
||||
&session,
|
||||
&Ctx {
|
||||
upstream_address: "10.1.1.1:8001".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("10.1.1.1:8001", value.unwrap().to_str().unwrap());
|
||||
|
||||
let headers = ["Origin: https://github.com"].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$http_origin").unwrap(),
|
||||
&session,
|
||||
&Ctx::default(),
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("https://github.com", value.unwrap().to_str().unwrap());
|
||||
|
||||
let headers = ["Origin: https://github.com"].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$hostname").unwrap(),
|
||||
&session,
|
||||
&Ctx::default(),
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
|
||||
let headers = ["Origin: https://github.com"].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$HOME").unwrap(),
|
||||
&session,
|
||||
&Ctx::default(),
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
|
||||
let headers = ["Origin: https://github.com"].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("UUID").unwrap(),
|
||||
&session,
|
||||
&Ctx::default(),
|
||||
);
|
||||
assert_eq!(false, value.is_some());
|
||||
}
|
||||
}
|
||||
|
@ -49,14 +49,32 @@ pub fn get_super_ts() -> u32 {
|
||||
}
|
||||
}
|
||||
|
||||
static HOST_NAME: Lazy<String> = Lazy::new(|| {
|
||||
hostname::get()
|
||||
.unwrap_or_default()
|
||||
.to_str()
|
||||
.unwrap_or_default()
|
||||
.to_string()
|
||||
});
|
||||
|
||||
/// Returns the system hostname.
|
||||
///
|
||||
/// Returns:
|
||||
/// * `&'static str` - The system's hostname as a string slice
|
||||
pub fn get_hostname() -> &'static str {
|
||||
HOST_NAME.as_str()
|
||||
}
|
||||
|
||||
mod ctx;
|
||||
mod http_header;
|
||||
mod http_response;
|
||||
mod notification;
|
||||
mod plugin;
|
||||
|
||||
pub use ctx::*;
|
||||
pub use http_header::*;
|
||||
pub use http_response::*;
|
||||
pub use notification::*;
|
||||
pub use plugin::*;
|
||||
|
||||
#[cfg(test)]
|
||||
|
50
pingap-core/src/notification.rs
Normal file
50
pingap-core/src/notification.rs
Normal file
@ -0,0 +1,50 @@
|
||||
// Copyright 2024-2025 Tree xie.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use std::fmt::Display;
|
||||
|
||||
#[derive(Default)]
|
||||
pub enum NotificationLevel {
|
||||
#[default]
|
||||
Info,
|
||||
Warn,
|
||||
Error,
|
||||
}
|
||||
|
||||
impl Display for NotificationLevel {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let msg = match self {
|
||||
NotificationLevel::Error => "error",
|
||||
NotificationLevel::Warn => "warn",
|
||||
_ => "info",
|
||||
};
|
||||
write!(f, "{msg}")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct NotificationData {
|
||||
pub category: String,
|
||||
pub level: NotificationLevel,
|
||||
pub title: String,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait Notification {
|
||||
async fn notify(&self, data: NotificationData);
|
||||
}
|
||||
|
||||
pub type NotificationSender = Box<dyn Notification + Send + Sync>;
|
@ -23,6 +23,7 @@ hickory-resolver = "0.24.1"
|
||||
bollard = "0.18.1"
|
||||
pingap-webhook = { path = "../pingap-webhook" }
|
||||
pingap-util = { path = "../pingap-util" }
|
||||
pingap-core = { path = "../pingap-core" }
|
||||
snafu = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
|
@ -180,13 +180,17 @@ impl ServiceDiscovery for Dns {
|
||||
),
|
||||
"dns discover fail"
|
||||
);
|
||||
pingap_webhook::send_notification(pingap_webhook::SendNotificationParams {
|
||||
category:
|
||||
pingap_webhook::NotificationCategory::ServiceDiscoverFail,
|
||||
level: pingap_webhook::NotificationLevel::Warn,
|
||||
msg: format!("dns discovery {:?}, error: {e}", self.hosts),
|
||||
remark: None,
|
||||
})
|
||||
pingap_webhook::send_notification(
|
||||
pingap_core::NotificationData {
|
||||
category: "service_discover_fail".to_string(),
|
||||
level: pingap_core::NotificationLevel::Warn,
|
||||
message: format!(
|
||||
"dns discovery {:?}, error: {e}",
|
||||
self.hosts
|
||||
),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
Err(e.into())
|
||||
},
|
||||
|
@ -248,16 +248,17 @@ impl ServiceDiscovery for Docker {
|
||||
),
|
||||
"docker discover fail"
|
||||
);
|
||||
pingap_webhook::send_notification(pingap_webhook::SendNotificationParams {
|
||||
category:
|
||||
pingap_webhook::NotificationCategory::ServiceDiscoverFail,
|
||||
level: pingap_webhook::NotificationLevel::Warn,
|
||||
msg: format!(
|
||||
"docker discovery {:?}, error: {e}",
|
||||
self.labels(),
|
||||
),
|
||||
remark: None,
|
||||
})
|
||||
pingap_webhook::send_notification(
|
||||
pingap_core::NotificationData {
|
||||
category: "service_discover_fail".to_string(),
|
||||
level: pingap_core::NotificationLevel::Warn,
|
||||
message: format!(
|
||||
"docker discovery {:?}, error: {e}",
|
||||
self.labels(),
|
||||
),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
return Err(e.into());
|
||||
},
|
||||
|
@ -23,8 +23,6 @@ url = { workspace = true }
|
||||
strum = { workspace = true }
|
||||
tonic-health = "0.12.3"
|
||||
tonic = "0.12.3"
|
||||
pingap-webhook = { path = "../pingap-webhook" }
|
||||
pingap-util = { path = "../pingap-util" }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = "1.4.0"
|
||||
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use super::{Error, HealthCheckConf};
|
||||
use super::{new_internal_error, Error, HealthCheckConf};
|
||||
use async_trait::async_trait;
|
||||
use http::uri::InvalidUri;
|
||||
use http::Uri;
|
||||
@ -40,7 +40,11 @@ pub struct GrpcHealthCheck {
|
||||
}
|
||||
|
||||
impl GrpcHealthCheck {
|
||||
pub fn new(name: &str, conf: &HealthCheckConf) -> Result<Self> {
|
||||
pub fn new(
|
||||
_name: &str,
|
||||
conf: &HealthCheckConf,
|
||||
health_changed_callback: Option<HealthObserveCallback>,
|
||||
) -> Result<Self> {
|
||||
let scheme = if conf.tls {
|
||||
"https".to_string()
|
||||
} else {
|
||||
@ -60,9 +64,7 @@ impl GrpcHealthCheck {
|
||||
consecutive_success: conf.consecutive_success,
|
||||
consecutive_failure: conf.consecutive_failure,
|
||||
connection_timeout: conf.connection_timeout,
|
||||
health_changed_callback: Some(
|
||||
pingap_webhook::new_backend_observe_notification(name),
|
||||
),
|
||||
health_changed_callback,
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -73,20 +75,20 @@ impl HealthCheck for GrpcHealthCheck {
|
||||
let uri = format!("{}://{}", self.scheme, target.addr);
|
||||
|
||||
let conn = tonic::transport::Endpoint::from_shared(uri)
|
||||
.map_err(|e| pingap_util::new_internal_error(500, e.to_string()))?
|
||||
.map_err(|e| new_internal_error(500, e.to_string()))?
|
||||
.origin(self.origin.clone())
|
||||
.connect_timeout(self.connection_timeout)
|
||||
.connect()
|
||||
.await
|
||||
.map_err(|e| pingap_util::new_internal_error(500, e.to_string()))?;
|
||||
.map_err(|e| new_internal_error(500, e.to_string()))?;
|
||||
let resp = HealthClient::new(conn)
|
||||
.check(HealthCheckRequest {
|
||||
service: self.service.clone(),
|
||||
})
|
||||
.await
|
||||
.map_err(|e| pingap_util::new_internal_error(500, e.to_string()))?;
|
||||
.map_err(|e| new_internal_error(500, e.to_string()))?;
|
||||
if resp.get_ref().status() != ServingStatus::Serving.into() {
|
||||
return Err(pingap_util::new_internal_error(
|
||||
return Err(new_internal_error(
|
||||
500,
|
||||
"grpc server is not serving".to_string(),
|
||||
));
|
||||
@ -131,7 +133,7 @@ mod tests {
|
||||
r###"HealthCheckConf { schema: Grpc, host: "upstreamname", path: "/ping?from=nginx", connection_timeout: 3s, read_timeout: 3s, check_frequency: 10s, reuse_connection: true, consecutive_success: 2, consecutive_failure: 1, service: "grpc", tls: true }"###,
|
||||
format!("{grpc_check:?}")
|
||||
);
|
||||
let grpc_check = GrpcHealthCheck::new("", &grpc_check).unwrap();
|
||||
let grpc_check = GrpcHealthCheck::new("", &grpc_check, None).unwrap();
|
||||
assert_eq!(2, grpc_check.health_threshold(true));
|
||||
assert_eq!(1, grpc_check.health_threshold(false));
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ use super::{
|
||||
};
|
||||
use humantime::parse_duration;
|
||||
use pingora::http::RequestHeader;
|
||||
use pingora::lb::health_check::HttpHealthCheck;
|
||||
use pingora::lb::health_check::{HealthObserveCallback, HttpHealthCheck};
|
||||
use std::time::Duration;
|
||||
use tracing::error;
|
||||
use url::Url;
|
||||
@ -29,6 +29,7 @@ type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
pub(crate) fn new_http_health_check(
|
||||
name: &str,
|
||||
conf: &HealthCheckConf,
|
||||
health_changed_callback: Option<HealthObserveCallback>,
|
||||
) -> HttpHealthCheck {
|
||||
let mut check = HttpHealthCheck::new(
|
||||
&conf.host,
|
||||
@ -40,8 +41,7 @@ pub(crate) fn new_http_health_check(
|
||||
check.consecutive_success = conf.consecutive_success;
|
||||
check.consecutive_failure = conf.consecutive_failure;
|
||||
check.reuse_connection = conf.reuse_connection;
|
||||
check.health_changed_callback =
|
||||
Some(pingap_webhook::new_backend_observe_notification(name));
|
||||
check.health_changed_callback = health_changed_callback;
|
||||
// create http get request
|
||||
match RequestHeader::build("GET", conf.path.as_bytes(), None) {
|
||||
Ok(mut req) => {
|
||||
@ -49,6 +49,7 @@ pub(crate) fn new_http_health_check(
|
||||
if let Err(e) = req.append_header("Host", &conf.host) {
|
||||
error!(
|
||||
category = LOG_CATEGORY,
|
||||
name,
|
||||
error = e.to_string(),
|
||||
host = conf.host,
|
||||
"http health check append host fail"
|
||||
@ -187,7 +188,7 @@ mod tests {
|
||||
r###"HealthCheckConf { schema: Https, host: "upstreamname", path: "/ping?from=nginx", connection_timeout: 3s, read_timeout: 1s, check_frequency: 10s, reuse_connection: true, consecutive_success: 2, consecutive_failure: 1, service: "grpc", tls: true }"###,
|
||||
format!("{http_check:?}")
|
||||
);
|
||||
let http_check = new_http_health_check("", &http_check);
|
||||
let http_check = new_http_health_check("", &http_check, None);
|
||||
assert_eq!(1, http_check.consecutive_failure);
|
||||
assert_eq!(2, http_check.consecutive_success);
|
||||
assert_eq!(true, http_check.reuse_connection);
|
||||
|
@ -13,7 +13,9 @@
|
||||
// limitations under the License.
|
||||
|
||||
use humantime::format_duration;
|
||||
use pingora::lb::health_check::{HealthCheck, TcpHealthCheck};
|
||||
use pingora::lb::health_check::{
|
||||
HealthCheck, HealthObserveCallback, TcpHealthCheck,
|
||||
};
|
||||
use pingora::upstreams::peer::PeerOptions;
|
||||
use snafu::Snafu;
|
||||
use std::time::Duration;
|
||||
@ -26,6 +28,15 @@ mod http;
|
||||
pub use grpc::GrpcHealthCheck;
|
||||
pub use http::HealthCheckConf;
|
||||
|
||||
/// Creates a new internal error
|
||||
pub fn new_internal_error(status: u16, message: String) -> pingora::BError {
|
||||
pingora::Error::because(
|
||||
pingora::ErrorType::HTTPStatus(status),
|
||||
message,
|
||||
pingora::Error::new(pingora::ErrorType::InternalError),
|
||||
)
|
||||
}
|
||||
|
||||
// Add constants for default values
|
||||
const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(3);
|
||||
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(3);
|
||||
@ -62,14 +73,17 @@ fn update_peer_options(
|
||||
options
|
||||
}
|
||||
|
||||
fn new_tcp_health_check(name: &str, conf: &HealthCheckConf) -> TcpHealthCheck {
|
||||
fn new_tcp_health_check(
|
||||
_name: &str,
|
||||
conf: &HealthCheckConf,
|
||||
health_changed_callback: Option<HealthObserveCallback>,
|
||||
) -> TcpHealthCheck {
|
||||
let mut check = TcpHealthCheck::default();
|
||||
check.peer_template.options =
|
||||
update_peer_options(conf, check.peer_template.options.clone());
|
||||
check.consecutive_success = conf.consecutive_success;
|
||||
check.consecutive_failure = conf.consecutive_failure;
|
||||
check.health_changed_callback =
|
||||
Some(pingap_webhook::new_backend_observe_notification(name));
|
||||
check.health_changed_callback = health_changed_callback;
|
||||
|
||||
check
|
||||
}
|
||||
@ -77,14 +91,14 @@ fn new_tcp_health_check(name: &str, conf: &HealthCheckConf) -> TcpHealthCheck {
|
||||
pub fn new_health_check(
|
||||
name: &str,
|
||||
health_check: &str,
|
||||
health_changed_callback: Option<HealthObserveCallback>,
|
||||
) -> Result<(Box<dyn HealthCheck + Send + Sync + 'static>, Duration)> {
|
||||
let mut health_check_frequency = Duration::from_secs(10);
|
||||
let hc: Box<dyn HealthCheck + Send + Sync + 'static> = if health_check
|
||||
.is_empty()
|
||||
{
|
||||
let mut check = TcpHealthCheck::new();
|
||||
check.health_changed_callback =
|
||||
Some(pingap_webhook::new_backend_observe_notification(name));
|
||||
check.health_changed_callback = health_changed_callback;
|
||||
check.peer_template.options.connection_timeout =
|
||||
Some(Duration::from_secs(3));
|
||||
info!(
|
||||
@ -116,13 +130,25 @@ pub fn new_health_check(
|
||||
);
|
||||
match health_check_conf.schema {
|
||||
HealthCheckSchema::Http | HealthCheckSchema::Https => {
|
||||
Box::new(http::new_http_health_check(name, &health_check_conf))
|
||||
Box::new(http::new_http_health_check(
|
||||
name,
|
||||
&health_check_conf,
|
||||
health_changed_callback,
|
||||
))
|
||||
},
|
||||
HealthCheckSchema::Grpc => {
|
||||
let check = GrpcHealthCheck::new(name, &health_check_conf)?;
|
||||
let check = GrpcHealthCheck::new(
|
||||
name,
|
||||
&health_check_conf,
|
||||
health_changed_callback,
|
||||
)?;
|
||||
Box::new(check)
|
||||
},
|
||||
_ => Box::new(new_tcp_health_check(name, &health_check_conf)),
|
||||
_ => Box::new(new_tcp_health_check(
|
||||
name,
|
||||
&health_check_conf,
|
||||
health_changed_callback,
|
||||
)),
|
||||
}
|
||||
};
|
||||
Ok((hc, health_check_frequency))
|
||||
@ -154,7 +180,7 @@ mod tests {
|
||||
r###"HealthCheckConf { schema: Tcp, host: "upstreamname", path: "", connection_timeout: 3s, read_timeout: 3s, check_frequency: 10s, reuse_connection: false, consecutive_success: 2, consecutive_failure: 1, service: "", tls: false }"###,
|
||||
format!("{tcp_check:?}")
|
||||
);
|
||||
let tcp_check = new_tcp_health_check("", &tcp_check);
|
||||
let tcp_check = new_tcp_health_check("", &tcp_check, None);
|
||||
assert_eq!(1, tcp_check.consecutive_failure);
|
||||
assert_eq!(2, tcp_check.consecutive_success);
|
||||
assert_eq!(
|
||||
@ -164,7 +190,7 @@ mod tests {
|
||||
}
|
||||
#[test]
|
||||
fn test_new_health_check() {
|
||||
let (_, frequency) = new_health_check("upstreamname", "https://upstreamname/ping?connection_timeout=3s&read_timeout=1s&success=2&failure=1&check_frequency=10s&from=nginx&reuse").unwrap();
|
||||
let (_, frequency) = new_health_check("upstreamname", "https://upstreamname/ping?connection_timeout=3s&read_timeout=1s&success=2&failure=1&check_frequency=10s&from=nginx&reuse", None).unwrap();
|
||||
assert_eq!(Duration::from_secs(10), frequency);
|
||||
}
|
||||
}
|
||||
|
@ -13,15 +13,13 @@
|
||||
// limitations under the License.
|
||||
|
||||
use bytes::BytesMut;
|
||||
use pingap_core::Ctx;
|
||||
use pingap_util::{format_byte_size, format_duration, get_hostname};
|
||||
use pingap_core::{get_hostname, Ctx, HOST_NAME_TAG};
|
||||
use pingap_util::{format_byte_size, format_duration};
|
||||
use pingora::http::ResponseHeader;
|
||||
use pingora::proxy::Session;
|
||||
use regex::Regex;
|
||||
use substring::Substring;
|
||||
|
||||
pub const HOST_NAME_TAG: &[u8] = b"$hostname";
|
||||
|
||||
// Enum representing different types of log tags that can be used in the logging format
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum TagCategory {
|
||||
|
@ -16,10 +16,9 @@ use super::{get_process_system_info, Error, Result, LOG_CATEGORY};
|
||||
use humantime::parse_duration;
|
||||
#[cfg(feature = "full")]
|
||||
use pingap_cache::{CACHE_READING_TIME, CACHE_WRITING_TIME};
|
||||
use pingap_core::Ctx;
|
||||
use pingap_core::{get_hostname, Ctx};
|
||||
use pingap_service::Error as ServiceError;
|
||||
use pingap_service::SimpleServiceTaskFuture;
|
||||
use pingap_util::get_hostname;
|
||||
use pingora::proxy::Session;
|
||||
use prometheus::core::Collector;
|
||||
use prometheus::{
|
||||
|
@ -54,7 +54,6 @@ toml = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
http = { workspace = true }
|
||||
pingap-state = { path = "../pingap-state" }
|
||||
pingap-config = { path = "../pingap-config" }
|
||||
pingap-util = { path = "../pingap-util" }
|
||||
pingap-cache = { path = "../pingap-cache" }
|
||||
|
@ -20,8 +20,9 @@ use ctor::ctor;
|
||||
use http::{header, HeaderValue};
|
||||
use humantime::parse_duration;
|
||||
use pingap_config::{PluginCategory, PluginConf};
|
||||
use pingap_core::{Ctx, HttpHeader, HttpResponse, Plugin, PluginStep};
|
||||
use pingap_state::convert_header_value;
|
||||
use pingap_core::{
|
||||
convert_header_value, Ctx, HttpHeader, HttpResponse, Plugin, PluginStep,
|
||||
};
|
||||
use pingora::http::ResponseHeader;
|
||||
use pingora::proxy::Session;
|
||||
use regex::Regex;
|
||||
|
@ -16,8 +16,9 @@ use async_trait::async_trait;
|
||||
use ctor::ctor;
|
||||
use http::header::HeaderName;
|
||||
use pingap_config::{PluginCategory, PluginConf};
|
||||
use pingap_core::{convert_header, Ctx, HttpHeader, Plugin, PluginStep};
|
||||
use pingap_state::convert_header_value;
|
||||
use pingap_core::{
|
||||
convert_header, convert_header_value, Ctx, HttpHeader, Plugin, PluginStep,
|
||||
};
|
||||
use pingora::http::ResponseHeader;
|
||||
use pingora::proxy::Session;
|
||||
use std::str::FromStr;
|
||||
|
@ -1,27 +0,0 @@
|
||||
[package]
|
||||
name = "pingap-state"
|
||||
version = "0.9.10"
|
||||
edition = "2021"
|
||||
authors = ["Tree Xie <tree.xie@outlook.com>"]
|
||||
license = "Apache-2.0"
|
||||
homepage = "https://github.com/vicanso/pingap"
|
||||
repository = "https://github.com/vicanso/pingap"
|
||||
keywords = ["pingap", "state"]
|
||||
|
||||
[lib]
|
||||
name = "pingap_state"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
bytes = { workspace = true }
|
||||
http = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
hostname = "0.4.0"
|
||||
pingora = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
pingap-core = { path = "../pingap-core" }
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = "1.4.0"
|
||||
tokio-test = "0.4.4"
|
@ -1,392 +0,0 @@
|
||||
// Copyright 2024-2025 Tree xie.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use super::get_hostname;
|
||||
use bytes::BytesMut;
|
||||
use http::{HeaderName, HeaderValue};
|
||||
use once_cell::sync::Lazy;
|
||||
use pingap_core::Ctx;
|
||||
use pingora::http::RequestHeader;
|
||||
use pingora::proxy::Session;
|
||||
use std::str::FromStr;
|
||||
|
||||
pub const HOST_NAME_TAG: &[u8] = b"$hostname";
|
||||
const HOST_TAG: &[u8] = b"$host";
|
||||
const SCHEME_TAG: &[u8] = b"$scheme";
|
||||
const REMOTE_ADDR_TAG: &[u8] = b"$remote_addr";
|
||||
const REMOTE_PORT_TAG: &[u8] = b"$remote_port";
|
||||
const SERVER_ADDR_TAG: &[u8] = b"$server_addr";
|
||||
const SERVER_PORT_TAG: &[u8] = b"$server_port";
|
||||
const PROXY_ADD_FORWARDED_TAG: &[u8] = b"$proxy_add_x_forwarded_for";
|
||||
const UPSTREAM_ADDR_TAG: &[u8] = b"$upstream_addr";
|
||||
|
||||
static SCHEME_HTTPS: HeaderValue = HeaderValue::from_static("https");
|
||||
static SCHEME_HTTP: HeaderValue = HeaderValue::from_static("http");
|
||||
|
||||
static HTTP_HEADER_X_FORWARDED_FOR: Lazy<http::HeaderName> =
|
||||
Lazy::new(|| HeaderName::from_str("X-Forwarded-For").unwrap());
|
||||
|
||||
/// Get request host in this order of precedence:
|
||||
/// host name from the request line,
|
||||
/// or host name from the "Host" request header field
|
||||
fn get_host(header: &RequestHeader) -> Option<&str> {
|
||||
if let Some(host) = header.uri.host() {
|
||||
return Some(host);
|
||||
}
|
||||
if let Some(host) = header.headers.get("Host") {
|
||||
if let Ok(value) = host.to_str().map(|host| host.split(':').next()) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Processes special header values that contain dynamic variables.
|
||||
/// Supports variables like $host, $scheme, $remote_addr etc.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `value` - The header value to process
|
||||
/// * `session` - The HTTP session context
|
||||
/// * `ctx` - The application state
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Option<HeaderValue>` - The processed header value or None if no special handling needed
|
||||
#[inline]
|
||||
pub fn convert_header_value(
|
||||
value: &HeaderValue,
|
||||
session: &Session,
|
||||
ctx: &Ctx,
|
||||
) -> Option<HeaderValue> {
|
||||
let buf = value.as_bytes();
|
||||
|
||||
// Early return if not a special header (moved this check earlier)
|
||||
if buf.is_empty() || !(buf[0] == b'$' || buf[0] == b':') {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Helper closure to convert string to HeaderValue
|
||||
let to_header_value = |s: &str| HeaderValue::from_str(s).ok();
|
||||
|
||||
match buf {
|
||||
HOST_TAG => get_host(session.req_header()).and_then(to_header_value),
|
||||
SCHEME_TAG => Some(if ctx.tls_version.is_some() {
|
||||
SCHEME_HTTPS.clone()
|
||||
} else {
|
||||
SCHEME_HTTP.clone()
|
||||
}),
|
||||
HOST_NAME_TAG => to_header_value(get_hostname()),
|
||||
REMOTE_ADDR_TAG => ctx.remote_addr.as_deref().and_then(to_header_value),
|
||||
REMOTE_PORT_TAG => ctx
|
||||
.remote_port
|
||||
.map(|p| p.to_string())
|
||||
.and_then(|s| to_header_value(&s)),
|
||||
SERVER_ADDR_TAG => ctx.server_addr.as_deref().and_then(to_header_value),
|
||||
SERVER_PORT_TAG => ctx
|
||||
.server_port
|
||||
.map(|p| p.to_string())
|
||||
.and_then(|s| to_header_value(&s)),
|
||||
UPSTREAM_ADDR_TAG => {
|
||||
if !ctx.upstream_address.is_empty() {
|
||||
to_header_value(&ctx.upstream_address)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
},
|
||||
PROXY_ADD_FORWARDED_TAG => {
|
||||
ctx.remote_addr.as_deref().and_then(|remote_addr| {
|
||||
let value = match session
|
||||
.get_header(HTTP_HEADER_X_FORWARDED_FOR.clone())
|
||||
{
|
||||
Some(existing) => format!(
|
||||
"{}, {}",
|
||||
existing.to_str().unwrap_or_default(),
|
||||
remote_addr
|
||||
),
|
||||
None => remote_addr.to_string(),
|
||||
};
|
||||
to_header_value(&value)
|
||||
})
|
||||
},
|
||||
_ => handle_special_headers(buf, session, ctx),
|
||||
}
|
||||
}
|
||||
|
||||
const HTTP_HEADER_PREFIX: &[u8] = b"$http_";
|
||||
const HTTP_HEADER_PREFIX_LEN: usize = HTTP_HEADER_PREFIX.len();
|
||||
|
||||
#[inline]
|
||||
fn handle_special_headers(
|
||||
buf: &[u8],
|
||||
session: &Session,
|
||||
ctx: &Ctx,
|
||||
) -> Option<HeaderValue> {
|
||||
// Handle headers that reference other HTTP headers (e.g., $http_origin)
|
||||
if buf.starts_with(HTTP_HEADER_PREFIX) {
|
||||
return handle_http_header(buf, session);
|
||||
}
|
||||
// Handle environment variable references (e.g., $HOME)
|
||||
if buf.starts_with(b"$") {
|
||||
return handle_env_var(buf);
|
||||
}
|
||||
// Handle context value references (e.g., :connection_id)
|
||||
if buf.starts_with(b":") {
|
||||
return handle_context_value(buf, ctx);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn handle_http_header(buf: &[u8], session: &Session) -> Option<HeaderValue> {
|
||||
// Skip the "$http_" prefix (6 bytes) and convert remaining bytes to header key
|
||||
let key = std::str::from_utf8(&buf[HTTP_HEADER_PREFIX_LEN..]).ok()?;
|
||||
// Look up and clone the header value from the session
|
||||
session.get_header(key).cloned()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn handle_env_var(buf: &[u8]) -> Option<HeaderValue> {
|
||||
// Skip the "$" prefix and convert to environment variable name
|
||||
let var_name = std::str::from_utf8(&buf[1..]).ok()?;
|
||||
// Look up environment variable and convert to HeaderValue if found
|
||||
std::env::var(var_name)
|
||||
.ok()
|
||||
.and_then(|v| HeaderValue::from_str(&v).ok())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn handle_context_value(buf: &[u8], ctx: &Ctx) -> Option<HeaderValue> {
|
||||
// Skip the ":" prefix and convert to context key
|
||||
let key = std::str::from_utf8(&buf[1..]).ok()?;
|
||||
// Pre-allocate buffer for value
|
||||
let mut value = BytesMut::with_capacity(20);
|
||||
// Append context value to buffer
|
||||
value = ctx.append_value(value, key);
|
||||
// Convert to HeaderValue if buffer is not empty
|
||||
if !value.is_empty() {
|
||||
HeaderValue::from_bytes(&value).ok()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use http::HeaderValue;
|
||||
use pingora::proxy::Session;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio_test::io::Builder;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_convert_header_value() {
|
||||
let headers = ["Host: pingap.io"].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let default_state = Ctx {
|
||||
tls_version: Some("tls1.3".to_string()),
|
||||
remote_addr: Some("10.1.1.1".to_string()),
|
||||
remote_port: Some(6000),
|
||||
server_addr: Some("10.1.1.2".to_string()),
|
||||
server_port: Some(6001),
|
||||
upstream_address: "10.1.1.3:4123".to_string(),
|
||||
connection_id: 102,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$host").unwrap(),
|
||||
&session,
|
||||
&Ctx {
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("pingap.io", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$scheme").unwrap(),
|
||||
&session,
|
||||
&Ctx {
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("http", value.unwrap().to_str().unwrap());
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$scheme").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("https", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$remote_addr").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("10.1.1.1", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$remote_port").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("6000", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$server_addr").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("10.1.1.2", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$server_port").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("6001", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$upstream_addr").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("10.1.1.3:4123", value.unwrap().to_str().unwrap());
|
||||
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str(":connection_id").unwrap(),
|
||||
&session,
|
||||
&default_state,
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("102", value.unwrap().to_str().unwrap());
|
||||
|
||||
let headers = ["X-Forwarded-For: 1.1.1.1, 2.2.2.2"].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$proxy_add_x_forwarded_for").unwrap(),
|
||||
&session,
|
||||
&Ctx {
|
||||
remote_addr: Some("10.1.1.1".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!(
|
||||
"1.1.1.1, 2.2.2.2, 10.1.1.1",
|
||||
value.unwrap().to_str().unwrap()
|
||||
);
|
||||
|
||||
let headers = [""].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$proxy_add_x_forwarded_for").unwrap(),
|
||||
&session,
|
||||
&Ctx {
|
||||
remote_addr: Some("10.1.1.1".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("10.1.1.1", value.unwrap().to_str().unwrap());
|
||||
|
||||
let headers = [""].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$upstream_addr").unwrap(),
|
||||
&session,
|
||||
&Ctx {
|
||||
upstream_address: "10.1.1.1:8001".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("10.1.1.1:8001", value.unwrap().to_str().unwrap());
|
||||
|
||||
let headers = ["Origin: https://github.com"].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$http_origin").unwrap(),
|
||||
&session,
|
||||
&Ctx::default(),
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
assert_eq!("https://github.com", value.unwrap().to_str().unwrap());
|
||||
|
||||
let headers = ["Origin: https://github.com"].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$hostname").unwrap(),
|
||||
&session,
|
||||
&Ctx::default(),
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
|
||||
let headers = ["Origin: https://github.com"].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("$HOME").unwrap(),
|
||||
&session,
|
||||
&Ctx::default(),
|
||||
);
|
||||
assert_eq!(true, value.is_some());
|
||||
|
||||
let headers = ["Origin: https://github.com"].join("\r\n");
|
||||
let input_header =
|
||||
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
|
||||
let mock_io = Builder::new().read(input_header.as_bytes()).build();
|
||||
let mut session = Session::new_h1(Box::new(mock_io));
|
||||
session.read_request().await.unwrap();
|
||||
let value = convert_header_value(
|
||||
&HeaderValue::from_str("UUID").unwrap(),
|
||||
&session,
|
||||
&Ctx::default(),
|
||||
);
|
||||
assert_eq!(false, value.is_some());
|
||||
}
|
||||
}
|
@ -1,35 +0,0 @@
|
||||
// Copyright 2024-2025 Tree xie.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
mod http_header;
|
||||
|
||||
static HOST_NAME: Lazy<String> = Lazy::new(|| {
|
||||
hostname::get()
|
||||
.unwrap_or_default()
|
||||
.to_str()
|
||||
.unwrap_or_default()
|
||||
.to_string()
|
||||
});
|
||||
|
||||
/// Returns the system hostname.
|
||||
///
|
||||
/// Returns:
|
||||
/// * `&'static str` - The system's hostname as a string slice
|
||||
pub fn get_hostname() -> &'static str {
|
||||
HOST_NAME.as_str()
|
||||
}
|
||||
|
||||
pub use http_header::*;
|
@ -31,6 +31,7 @@ pingap-discovery = { path = "../pingap-discovery" }
|
||||
pingap-health = { path = "../pingap-health" }
|
||||
pingap-service = { path = "../pingap-service" }
|
||||
pingap-util = { path = "../pingap-util" }
|
||||
pingap-core = { path = "../pingap-core" }
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
|
@ -19,6 +19,7 @@ use derive_more::Debug;
|
||||
use futures_util::FutureExt;
|
||||
use once_cell::sync::Lazy;
|
||||
use pingap_config::UpstreamConf;
|
||||
use pingap_core::{NotificationData, NotificationLevel, NotificationSender};
|
||||
use pingap_discovery::{
|
||||
is_dns_discovery, is_docker_discovery, is_static_discovery,
|
||||
new_dns_discover_backends, new_docker_discover_backends,
|
||||
@ -26,9 +27,11 @@ use pingap_discovery::{
|
||||
};
|
||||
use pingap_health::new_health_check;
|
||||
use pingap_service::{CommonServiceTask, ServiceTask};
|
||||
use pingora::lb::health_check::{HealthObserve, HealthObserveCallback};
|
||||
use pingora::lb::selection::{
|
||||
BackendIter, BackendSelection, Consistent, RoundRobin,
|
||||
};
|
||||
use pingora::lb::Backend;
|
||||
use pingora::lb::{Backends, LoadBalancer};
|
||||
use pingora::protocols::l4::ext::TcpKeepalive;
|
||||
use pingora::protocols::ALPN;
|
||||
@ -50,6 +53,47 @@ pub enum Error {
|
||||
}
|
||||
type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
pub struct BackendObserveNotification {
|
||||
name: String,
|
||||
sender: Arc<NotificationSender>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl HealthObserve for BackendObserveNotification {
|
||||
async fn observe(&self, backend: &Backend, healthy: bool) {
|
||||
let addr = backend.addr.to_string();
|
||||
let template = format!("upstream {}({addr}) becomes ", self.name);
|
||||
let info = if healthy {
|
||||
(NotificationLevel::Info, template + "healthy")
|
||||
} else {
|
||||
(NotificationLevel::Error, template + "unhealthy")
|
||||
};
|
||||
|
||||
self.sender
|
||||
.notify(NotificationData {
|
||||
category: "backend_status".to_string(),
|
||||
level: info.0,
|
||||
title: "Upstream backend status changed".to_string(),
|
||||
message: info.1,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
fn new_observe(
|
||||
name: &str,
|
||||
sender: Option<Arc<NotificationSender>>,
|
||||
) -> Option<HealthObserveCallback> {
|
||||
if let Some(sender) = sender {
|
||||
Some(Box::new(BackendObserveNotification {
|
||||
name: name.to_string(),
|
||||
sender: sender.clone(),
|
||||
}))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
// SelectionLb represents different load balancing strategies:
|
||||
// - RoundRobin: Distributes requests evenly across backends
|
||||
// - Consistent: Uses consistent hashing to map requests to backends
|
||||
@ -242,6 +286,7 @@ fn update_health_check_params<S>(
|
||||
mut lb: LoadBalancer<S>,
|
||||
name: &str,
|
||||
conf: &UpstreamConf,
|
||||
sender: Option<Arc<NotificationSender>>,
|
||||
) -> Result<LoadBalancer<S>>
|
||||
where
|
||||
S: BackendSelection + 'static,
|
||||
@ -256,12 +301,15 @@ where
|
||||
}
|
||||
|
||||
// Set up health checking for the backends
|
||||
let (hc, health_check_frequency) =
|
||||
new_health_check(name, &conf.health_check.clone().unwrap_or_default())
|
||||
.map_err(|e| Error::Common {
|
||||
message: e.to_string(),
|
||||
category: "health".to_string(),
|
||||
})?;
|
||||
let (hc, health_check_frequency) = new_health_check(
|
||||
name,
|
||||
&conf.health_check.clone().unwrap_or_default(),
|
||||
new_observe(name, sender),
|
||||
)
|
||||
.map_err(|e| Error::Common {
|
||||
message: e.to_string(),
|
||||
category: "health".to_string(),
|
||||
})?;
|
||||
// Configure health checking
|
||||
lb.parallel_health_check = true;
|
||||
lb.set_health_check(hc);
|
||||
@ -281,6 +329,7 @@ where
|
||||
fn new_load_balancer(
|
||||
name: &str,
|
||||
conf: &UpstreamConf,
|
||||
sender: Option<Arc<NotificationSender>>,
|
||||
) -> Result<(SelectionLb, String, String)> {
|
||||
// Validate that addresses are provided
|
||||
if conf.addrs.is_empty() {
|
||||
@ -334,6 +383,7 @@ fn new_load_balancer(
|
||||
LoadBalancer::<Consistent>::from_backends(backends),
|
||||
name,
|
||||
conf,
|
||||
sender,
|
||||
)?;
|
||||
|
||||
SelectionLb::Consistent(Arc::new(lb))
|
||||
@ -344,6 +394,7 @@ fn new_load_balancer(
|
||||
LoadBalancer::<RoundRobin>::from_backends(backends),
|
||||
name,
|
||||
conf,
|
||||
sender,
|
||||
)?;
|
||||
|
||||
SelectionLb::RoundRobin(Arc::new(lb))
|
||||
@ -361,8 +412,12 @@ impl Upstream {
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<Self>` - New Upstream instance or error if creation fails
|
||||
pub fn new(name: &str, conf: &UpstreamConf) -> Result<Self> {
|
||||
let (lb, hash, hash_key) = new_load_balancer(name, conf)?;
|
||||
pub fn new(
|
||||
name: &str,
|
||||
conf: &UpstreamConf,
|
||||
sender: Option<Arc<NotificationSender>>,
|
||||
) -> Result<Self> {
|
||||
let (lb, hash, hash_key) = new_load_balancer(name, conf, sender)?;
|
||||
let key = conf.hash_key();
|
||||
let sni = conf.sni.clone().unwrap_or_default();
|
||||
let tls = !sni.is_empty();
|
||||
@ -583,6 +638,7 @@ pub fn get_upstreams_processing_connected(
|
||||
|
||||
fn new_ahash_upstreams(
|
||||
upstream_configs: &HashMap<String, UpstreamConf>,
|
||||
sender: Option<Arc<NotificationSender>>,
|
||||
) -> Result<(Upstreams, Vec<String>)> {
|
||||
let mut upstreams = AHashMap::new();
|
||||
let mut updated_upstreams = vec![];
|
||||
@ -595,7 +651,7 @@ fn new_ahash_upstreams(
|
||||
continue;
|
||||
}
|
||||
}
|
||||
let up = Arc::new(Upstream::new(name, conf)?);
|
||||
let up = Arc::new(Upstream::new(name, conf, sender.clone())?);
|
||||
upstreams.insert(name.to_string(), up);
|
||||
updated_upstreams.push(name.to_string());
|
||||
}
|
||||
@ -604,8 +660,9 @@ fn new_ahash_upstreams(
|
||||
|
||||
pub fn try_init_upstreams(
|
||||
upstream_configs: &HashMap<String, UpstreamConf>,
|
||||
sender: Option<Arc<NotificationSender>>,
|
||||
) -> Result<()> {
|
||||
let (upstreams, _) = new_ahash_upstreams(upstream_configs)?;
|
||||
let (upstreams, _) = new_ahash_upstreams(upstream_configs, sender)?;
|
||||
UPSTREAM_MAP.store(Arc::new(upstreams));
|
||||
Ok(())
|
||||
}
|
||||
@ -633,8 +690,10 @@ async fn run_health_check(up: &Arc<Upstream>) -> Result<()> {
|
||||
|
||||
pub async fn try_update_upstreams(
|
||||
upstream_configs: &HashMap<String, UpstreamConf>,
|
||||
sender: Option<Arc<NotificationSender>>,
|
||||
) -> Result<Vec<String>> {
|
||||
let (upstreams, updated_upstreams) = new_ahash_upstreams(upstream_configs)?;
|
||||
let (upstreams, updated_upstreams) =
|
||||
new_ahash_upstreams(upstream_configs, sender)?;
|
||||
for (name, up) in upstreams.iter() {
|
||||
// no need to run health check if not new upstream
|
||||
if !updated_upstreams.contains(name) {
|
||||
@ -831,6 +890,7 @@ mod tests {
|
||||
&UpstreamConf {
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
);
|
||||
assert_eq!(
|
||||
"Common error, category: new_upstream, upstream addrs is empty",
|
||||
@ -854,6 +914,7 @@ mod tests {
|
||||
tcp_recv_buf: Some(bytesize::ByteSize(1024)),
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -938,6 +999,7 @@ mod tests {
|
||||
addrs: vec!["192.168.1.1:8001".to_string()],
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(true, up.new_http_peer(&session, &None,).is_some());
|
||||
|
@ -23,9 +23,7 @@ ipnet = "2.11.0"
|
||||
base64 = "0.22.1"
|
||||
path-absolutize = "3.1.1"
|
||||
rustc_version_runtime = "0.3.0"
|
||||
local-ip-address = "0.6.3"
|
||||
dirs = "6.0.0"
|
||||
hostname = "0.4.0"
|
||||
once_cell = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
http = { workspace = true }
|
||||
|
@ -13,7 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
use base64::{engine::general_purpose::STANDARD, Engine};
|
||||
use once_cell::sync::Lazy;
|
||||
use path_absolutize::*;
|
||||
use snafu::Snafu;
|
||||
use std::path::Path;
|
||||
@ -101,27 +100,6 @@ pub fn is_pem(value: &str) -> bool {
|
||||
value.starts_with("-----")
|
||||
}
|
||||
|
||||
/// Returns a list of non-loopback IP addresses (both IPv4 and IPv6) for the local machine
|
||||
///
|
||||
/// # Returns
|
||||
/// A vector of IP addresses as strings
|
||||
pub fn local_ip_list() -> Vec<String> {
|
||||
let mut ip_list = vec![];
|
||||
|
||||
if let Ok(value) = local_ip_address::local_ip() {
|
||||
ip_list.push(value);
|
||||
}
|
||||
if let Ok(value) = local_ip_address::local_ipv6() {
|
||||
ip_list.push(value);
|
||||
}
|
||||
|
||||
ip_list
|
||||
.iter()
|
||||
.filter(|item| !item.is_loopback())
|
||||
.map(|item| item.to_string())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Converts various certificate/key formats into bytes.
|
||||
/// Supports PEM format, file paths, and base64-encoded data.
|
||||
///
|
||||
@ -221,22 +199,6 @@ pub fn path_join(value1: &str, value2: &str) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
static HOST_NAME: Lazy<String> = Lazy::new(|| {
|
||||
hostname::get()
|
||||
.unwrap_or_default()
|
||||
.to_str()
|
||||
.unwrap_or_default()
|
||||
.to_string()
|
||||
});
|
||||
|
||||
/// Returns the system hostname.
|
||||
///
|
||||
/// Returns:
|
||||
/// * `&'static str` - The system's hostname as a string slice
|
||||
pub fn get_hostname() -> &'static str {
|
||||
HOST_NAME.as_str()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@ -257,12 +219,6 @@ mod tests {
|
||||
resolve_path("~/")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_local_ip_list() {
|
||||
assert_eq!(false, local_ip_list().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_rustc_version() {
|
||||
assert_eq!(false, get_rustc_version().is_empty());
|
||||
|
@ -11,10 +11,11 @@ keywords = ["pingap", "webhook"]
|
||||
|
||||
[dependencies]
|
||||
async-trait = { workspace = true }
|
||||
pingora = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
strum = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
pingap-util = { path = "../pingap-util" }
|
||||
local-ip-address = "0.6.3"
|
||||
hostname = "0.4.0"
|
||||
pingap-core = { path = "../pingap-core" }
|
||||
|
@ -13,11 +13,13 @@
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pingora::lb::health_check::HealthObserve;
|
||||
use pingora::lb::Backend;
|
||||
use once_cell::sync::{Lazy, OnceCell};
|
||||
use pingap_core::{
|
||||
Notification, NotificationData, NotificationLevel, NotificationSender,
|
||||
};
|
||||
use serde_json::{Map, Value};
|
||||
use std::{fmt::Display, time::Duration};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use strum::EnumString;
|
||||
use tracing::{error, info};
|
||||
|
||||
@ -39,14 +41,21 @@ pub fn set_web_hook(url: &str, category: &str, notifications: &[String]) {
|
||||
WEBHOOK_NOTIFICATIONS.get_or_init(|| notifications.to_owned());
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub enum NotificationLevel {
|
||||
#[default]
|
||||
Info,
|
||||
Warn,
|
||||
Error,
|
||||
static WEBHOOK_NOTIFICATION_SENDER: Lazy<Arc<NotificationSender>> =
|
||||
Lazy::new(|| Arc::new(Box::new(WebhookNotificationSender {})));
|
||||
|
||||
pub fn get_webhook_notification_sender() -> Arc<NotificationSender> {
|
||||
WEBHOOK_NOTIFICATION_SENDER.clone()
|
||||
}
|
||||
|
||||
pub struct WebhookNotificationSender {}
|
||||
|
||||
#[async_trait]
|
||||
impl Notification for WebhookNotificationSender {
|
||||
async fn notify(&self, data: NotificationData) {
|
||||
send_notification(data).await;
|
||||
}
|
||||
}
|
||||
#[derive(PartialEq, Debug, Clone, EnumString, strum::Display, Default)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum NotificationCategory {
|
||||
@ -63,62 +72,6 @@ pub enum NotificationCategory {
|
||||
ServiceDiscoverFail,
|
||||
}
|
||||
|
||||
impl Display for NotificationLevel {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let msg = match self {
|
||||
NotificationLevel::Error => "error",
|
||||
NotificationLevel::Warn => "warn",
|
||||
_ => "info",
|
||||
};
|
||||
write!(f, "{msg}")
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BackendObserveNotification {
|
||||
name: String,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl HealthObserve for BackendObserveNotification {
|
||||
async fn observe(&self, backend: &Backend, healthy: bool) {
|
||||
let addr = backend.addr.to_string();
|
||||
let template = format!("upstream {}({addr}) becomes ", self.name);
|
||||
let info = if healthy {
|
||||
(NotificationLevel::Info, template + "healthy")
|
||||
} else {
|
||||
(NotificationLevel::Error, template + "unhealthy")
|
||||
};
|
||||
send_notification(SendNotificationParams {
|
||||
category: NotificationCategory::BackendStatus,
|
||||
level: info.0,
|
||||
msg: info.1,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new backend health observation notification handler
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `name` - Name of the backend service to monitor
|
||||
pub fn new_backend_observe_notification(
|
||||
name: &str,
|
||||
) -> Box<BackendObserveNotification> {
|
||||
Box::new(BackendObserveNotification {
|
||||
name: name.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Parameters for sending a notification
|
||||
#[derive(Default)]
|
||||
pub struct SendNotificationParams {
|
||||
pub category: NotificationCategory,
|
||||
pub level: NotificationLevel,
|
||||
pub msg: String,
|
||||
pub remark: Option<String>,
|
||||
}
|
||||
|
||||
/// Sends a notification via configured webhook
|
||||
///
|
||||
/// Formats and sends the notification based on the webhook type (wecom, dingtalk, etc).
|
||||
@ -126,11 +79,12 @@ pub struct SendNotificationParams {
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `params` - The notification parameters including category, level, message and optional remark
|
||||
pub async fn send_notification(params: SendNotificationParams) {
|
||||
pub async fn send_notification(params: NotificationData) {
|
||||
info!(
|
||||
category = LOG_CATEGORY,
|
||||
notification = params.category.to_string(),
|
||||
message = params.msg,
|
||||
notification = params.category,
|
||||
title = params.title,
|
||||
message = params.message,
|
||||
"webhook notification"
|
||||
);
|
||||
let webhook_type = if let Some(value) = WEBHOOK_CATEGORY.get() {
|
||||
@ -154,12 +108,15 @@ pub async fn send_notification(params: SendNotificationParams) {
|
||||
}
|
||||
let category = params.category.to_string();
|
||||
let level = params.level;
|
||||
let ip = pingap_util::local_ip_list().join(";");
|
||||
let remark = params.remark.unwrap_or_default();
|
||||
let ip = local_ip_list().join(";");
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let mut data = serde_json::Map::new();
|
||||
let hostname = pingap_util::get_hostname();
|
||||
let hostname = hostname::get()
|
||||
.unwrap_or_default()
|
||||
.to_str()
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
// TODO get app name from config
|
||||
let name = "pingap".to_string();
|
||||
let color_type = match level {
|
||||
@ -172,9 +129,8 @@ pub async fn send_notification(params: SendNotificationParams) {
|
||||
>hostname: {hostname}
|
||||
>ip: {ip}
|
||||
>category: {category}
|
||||
>message: {}
|
||||
>remark: {remark}"###,
|
||||
params.msg
|
||||
>message: {}"###,
|
||||
params.message
|
||||
);
|
||||
match webhook_type.to_lowercase().as_str() {
|
||||
"wecom" => {
|
||||
@ -208,7 +164,7 @@ pub async fn send_notification(params: SendNotificationParams) {
|
||||
);
|
||||
data.insert("ip".to_string(), Value::String(ip));
|
||||
data.insert("category".to_string(), Value::String(category));
|
||||
data.insert("message".to_string(), Value::String(params.msg));
|
||||
data.insert("message".to_string(), Value::String(params.message));
|
||||
},
|
||||
}
|
||||
|
||||
@ -239,3 +195,24 @@ pub async fn send_notification(params: SendNotificationParams) {
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/// Returns a list of non-loopback IP addresses (both IPv4 and IPv6) for the local machine
|
||||
///
|
||||
/// # Returns
|
||||
/// A vector of IP addresses as strings
|
||||
fn local_ip_list() -> Vec<String> {
|
||||
let mut ip_list = vec![];
|
||||
|
||||
if let Ok(value) = local_ip_address::local_ip() {
|
||||
ip_list.push(value);
|
||||
}
|
||||
if let Ok(value) = local_ip_address::local_ipv6() {
|
||||
ip_list.push(value);
|
||||
}
|
||||
|
||||
ip_list
|
||||
.iter()
|
||||
.filter(|item| !item.is_loopback())
|
||||
.map(|item| item.to_string())
|
||||
.collect()
|
||||
}
|
||||
|
18
src/main.rs
18
src/main.rs
@ -210,10 +210,10 @@ fn run_admin_node(args: Args) -> Result<(), Box<dyn Error>> {
|
||||
let (server_conf, name, proxy_plugin_info) =
|
||||
plugin::parse_admin_plugin(&args.admin.unwrap_or_default())?;
|
||||
|
||||
if let Err(e) =
|
||||
plugin::try_init_plugins(&HashMap::from([(name, proxy_plugin_info)]))
|
||||
{
|
||||
error!(error = e.to_string(), "init plugins fail",);
|
||||
let (_, error) =
|
||||
plugin::try_init_plugins(&HashMap::from([(name, proxy_plugin_info)]));
|
||||
if !error.is_empty() {
|
||||
error!(error, "init plugins fail",);
|
||||
}
|
||||
pingap_config::try_init_config_storage(&args.conf)?;
|
||||
// config::set_config_path(&args.conf);
|
||||
@ -410,7 +410,10 @@ fn run() -> Result<(), Box<dyn Error>> {
|
||||
process::set_restart_process_command(cmd);
|
||||
}
|
||||
|
||||
try_init_upstreams(&conf.upstreams)?;
|
||||
try_init_upstreams(
|
||||
&conf.upstreams,
|
||||
Some(pingap_webhook::get_webhook_notification_sender()),
|
||||
)?;
|
||||
try_init_locations(&conf.locations)?;
|
||||
proxy::try_init_server_locations(&conf.servers, &conf.locations)?;
|
||||
let certificates = conf.certificates.clone();
|
||||
@ -466,8 +469,9 @@ fn run() -> Result<(), Box<dyn Error>> {
|
||||
"plugins" = get_plugin_factory().supported_plugins().join(","),
|
||||
"plugins are registered"
|
||||
);
|
||||
if let Err(e) = plugin::try_init_plugins(&conf.plugins) {
|
||||
error!(error = e.to_string(), "init plugins fail",);
|
||||
let (_, error) = plugin::try_init_plugins(&conf.plugins);
|
||||
if !error.is_empty() {
|
||||
error!(error, "init plugins fail",);
|
||||
}
|
||||
|
||||
let mut server_conf_list: Vec<ServerConf> = proxy::parse_from_conf(conf);
|
||||
|
@ -155,6 +155,7 @@ struct BasicInfo {
|
||||
fd_count: usize,
|
||||
tcp_count: usize,
|
||||
tcp6_count: usize,
|
||||
supported_plugins: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
@ -624,6 +625,7 @@ impl Plugin for AdminServe {
|
||||
fd_count: info.fd_count,
|
||||
tcp_count: info.tcp_count,
|
||||
tcp6_count: info.tcp6_count,
|
||||
supported_plugins: get_plugin_factory().supported_plugins(),
|
||||
})
|
||||
.unwrap_or(HttpResponse::unknown_error("Json serde fail".into()))
|
||||
} else if path == "/restart" && method == Method::POST {
|
||||
|
@ -25,7 +25,7 @@ use snafu::Snafu;
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tracing::info;
|
||||
use tracing::{error, info};
|
||||
|
||||
mod admin;
|
||||
mod stats;
|
||||
@ -216,16 +216,19 @@ static PLUGINS: Lazy<ArcSwap<Plugins>> =
|
||||
/// Parses plugin configurations and instantiates plugin instances.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `confs` - Vector of (name, config) tuples for plugins to initialize
|
||||
/// * `configs` - Vector of (name, config) tuples for plugins to initialize
|
||||
///
|
||||
/// # Returns
|
||||
/// HashMap mapping plugin names to initialized plugin instances
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns Error if plugin initialization fails
|
||||
pub fn parse_plugins(confs: Vec<(String, PluginConf)>) -> Result<Plugins> {
|
||||
pub fn parse_plugins(
|
||||
configs: Vec<(String, PluginConf)>,
|
||||
) -> (Plugins, Vec<Error>) {
|
||||
let mut plugins: Plugins = AHashMap::new();
|
||||
for (name, conf) in confs.iter() {
|
||||
let mut errors: Vec<Error> = vec![];
|
||||
for (name, conf) in configs.iter() {
|
||||
let name = name.to_string();
|
||||
let category = if let Some(value) = conf.get("category") {
|
||||
value.as_str().unwrap_or_default().to_string()
|
||||
@ -233,23 +236,37 @@ pub fn parse_plugins(confs: Vec<(String, PluginConf)>) -> Result<Plugins> {
|
||||
"".to_string()
|
||||
};
|
||||
if category.is_empty() {
|
||||
return Err(Error::Invalid {
|
||||
errors.push(Error::Invalid {
|
||||
category: "".to_string(),
|
||||
message: "Category can not be empty".to_string(),
|
||||
message: format!("category of {name} can not be empty"),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
let plugin =
|
||||
get_plugin_factory()
|
||||
.create(conf)
|
||||
.map_err(|e| Error::Invalid {
|
||||
match get_plugin_factory().create(conf) {
|
||||
Ok(plugin) => {
|
||||
plugins.insert(name.clone(), plugin.clone());
|
||||
},
|
||||
Err(e) => {
|
||||
errors.push(Error::Invalid {
|
||||
category,
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
plugins.insert(name.clone(), plugin.clone());
|
||||
message: format!("create plugin {name} failed, {}", e),
|
||||
});
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Ok(plugins)
|
||||
// let plugin =
|
||||
// get_plugin_factory()
|
||||
// .create(conf)
|
||||
// .map_err(|e| Error::Invalid {
|
||||
// category,
|
||||
// message: format!("create plugin {name} failed, {}", e),
|
||||
// })?;
|
||||
// plugins.insert(name.clone(), plugin.clone());
|
||||
// }
|
||||
|
||||
(plugins, errors)
|
||||
}
|
||||
|
||||
/// Initializes or updates plugins based on configuration.
|
||||
@ -264,23 +281,30 @@ pub fn parse_plugins(confs: Vec<(String, PluginConf)>) -> Result<Plugins> {
|
||||
/// Returns Error if plugin initialization fails
|
||||
pub fn try_init_plugins(
|
||||
plugins: &HashMap<String, PluginConf>,
|
||||
) -> Result<Vec<String>> {
|
||||
let mut plugin_confs: Vec<(String, PluginConf)> = plugins
|
||||
) -> (Vec<String>, String) {
|
||||
let mut plugin_configs: Vec<(String, PluginConf)> = plugins
|
||||
.iter()
|
||||
.map(|(name, value)| (name.to_string(), value.clone()))
|
||||
.collect();
|
||||
|
||||
// add admin plugin
|
||||
let mut errors = vec![];
|
||||
if let Some(addr) = &get_admin_addr() {
|
||||
let (_, name, proxy_plugin_info) = parse_admin_plugin(addr)?;
|
||||
plugin_confs.push((name, proxy_plugin_info));
|
||||
match parse_admin_plugin(addr) {
|
||||
Ok((_, name, proxy_plugin_info)) => {
|
||||
plugin_configs.push((name, proxy_plugin_info));
|
||||
},
|
||||
Err(e) => {
|
||||
errors.push(e);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
plugin_confs.extend(get_builtin_proxy_plugins());
|
||||
plugin_configs.extend(get_builtin_proxy_plugins());
|
||||
|
||||
let mut updated_plugins = vec![];
|
||||
let mut plugins = AHashMap::new();
|
||||
let plugin_confs: Vec<(String, PluginConf)> = plugin_confs
|
||||
let plugin_configs: Vec<(String, PluginConf)> = plugin_configs
|
||||
.into_iter()
|
||||
.filter(|(name, conf)| {
|
||||
let conf_hash_key = get_hash_key(conf);
|
||||
@ -308,10 +332,23 @@ pub fn try_init_plugins(
|
||||
true
|
||||
})
|
||||
.collect();
|
||||
plugins.extend(parse_plugins(plugin_confs)?);
|
||||
let (new_plugins, new_errors) = parse_plugins(plugin_configs);
|
||||
plugins.extend(new_plugins);
|
||||
errors.extend(new_errors);
|
||||
PLUGINS.store(Arc::new(plugins));
|
||||
let error = if !errors.is_empty() {
|
||||
let error = errors
|
||||
.iter()
|
||||
.map(|e| e.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(";");
|
||||
error!(error, "parse plugins failed");
|
||||
error
|
||||
} else {
|
||||
"".to_string()
|
||||
};
|
||||
|
||||
Ok(updated_plugins)
|
||||
(updated_plugins, error)
|
||||
}
|
||||
|
||||
pub fn get_plugin(name: &str) -> Option<Arc<dyn Plugin>> {
|
||||
@ -395,5 +432,6 @@ remove_headers = [
|
||||
.unwrap(),
|
||||
),
|
||||
]);
|
||||
try_init_plugins(&plugins).unwrap();
|
||||
let (_, error) = try_init_plugins(&plugins);
|
||||
assert!(error.is_empty());
|
||||
}
|
||||
|
@ -18,10 +18,9 @@ use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use ctor::ctor;
|
||||
use pingap_config::{PluginCategory, PluginConf};
|
||||
use pingap_core::{Ctx, HttpResponse, PluginStep};
|
||||
use pingap_core::{get_hostname, Ctx, HttpResponse, PluginStep};
|
||||
use pingap_performance::{get_process_system_info, get_processing_accepted};
|
||||
use pingap_plugin::{get_plugin_factory, Error};
|
||||
use pingap_util::get_hostname;
|
||||
use pingora::proxy::Session;
|
||||
use serde::Serialize;
|
||||
use std::sync::Arc;
|
||||
|
@ -136,7 +136,12 @@ async fn diff_and_update_config(
|
||||
};
|
||||
|
||||
if should_reload_upstream {
|
||||
match try_update_upstreams(&new_config.upstreams).await {
|
||||
match try_update_upstreams(
|
||||
&new_config.upstreams,
|
||||
Some(pingap_webhook::get_webhook_notification_sender()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(e) => {
|
||||
let error = e.to_string();
|
||||
reload_fail_messages
|
||||
@ -149,11 +154,13 @@ async fn diff_and_update_config(
|
||||
Ok(updated_upstreams) => {
|
||||
info!(category = LOG_CATEGORY, "reload upstream success");
|
||||
pingap_webhook::send_notification(
|
||||
pingap_webhook::SendNotificationParams {
|
||||
category:
|
||||
pingap_webhook::NotificationCategory::ReloadConfig,
|
||||
level: pingap_webhook::NotificationLevel::Info,
|
||||
msg: format_message("Upstream", updated_upstreams),
|
||||
pingap_core::NotificationData {
|
||||
category: "reload_config".to_string(),
|
||||
level: pingap_core::NotificationLevel::Info,
|
||||
message: format_message(
|
||||
"Upstream",
|
||||
updated_upstreams,
|
||||
),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
@ -175,11 +182,13 @@ async fn diff_and_update_config(
|
||||
Ok(updated_locations) => {
|
||||
info!(category = LOG_CATEGORY, "reload location success");
|
||||
pingap_webhook::send_notification(
|
||||
pingap_webhook::SendNotificationParams {
|
||||
category:
|
||||
pingap_webhook::NotificationCategory::ReloadConfig,
|
||||
level: pingap_webhook::NotificationLevel::Info,
|
||||
msg: format_message("Location", updated_locations),
|
||||
pingap_core::NotificationData {
|
||||
category: "reload_config".to_string(),
|
||||
level: pingap_core::NotificationLevel::Info,
|
||||
message: format_message(
|
||||
"Location",
|
||||
updated_locations,
|
||||
),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
@ -188,30 +197,32 @@ async fn diff_and_update_config(
|
||||
};
|
||||
}
|
||||
if should_reload_plugin {
|
||||
match plugin::try_init_plugins(&new_config.plugins) {
|
||||
Err(e) => {
|
||||
let error = e.to_string();
|
||||
reload_fail_messages
|
||||
.push(format!("plugin reload fail: {error}"));
|
||||
error!(
|
||||
category = LOG_CATEGORY,
|
||||
error, "reload plugin fail"
|
||||
);
|
||||
},
|
||||
Ok(updated_plugins) => {
|
||||
info!(category = LOG_CATEGORY, "reload plugin success");
|
||||
pingap_webhook::send_notification(
|
||||
pingap_webhook::SendNotificationParams {
|
||||
category:
|
||||
pingap_webhook::NotificationCategory::ReloadConfig,
|
||||
level: pingap_webhook::NotificationLevel::Info,
|
||||
msg: format_message("Plugin", updated_plugins),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
},
|
||||
};
|
||||
let (updated_plugins, error) =
|
||||
plugin::try_init_plugins(&new_config.plugins);
|
||||
if !updated_plugins.is_empty() {
|
||||
info!(category = LOG_CATEGORY, "reload plugin success");
|
||||
pingap_webhook::send_notification(
|
||||
pingap_core::NotificationData {
|
||||
category: "reload_config".to_string(),
|
||||
level: pingap_core::NotificationLevel::Info,
|
||||
message: format_message("Plugin", updated_plugins),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
if !error.is_empty() {
|
||||
error!(category = LOG_CATEGORY, error, "reload plugin fail");
|
||||
pingap_webhook::send_notification(
|
||||
pingap_core::NotificationData {
|
||||
category: "reload_config_fail".to_string(),
|
||||
level: pingap_core::NotificationLevel::Error,
|
||||
message: error,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
if should_reload_certificate {
|
||||
let (updated_certificates, errors) =
|
||||
@ -219,15 +230,12 @@ async fn diff_and_update_config(
|
||||
&new_config.certificates,
|
||||
);
|
||||
info!(category = LOG_CATEGORY, "reload certificate success");
|
||||
pingap_webhook::send_notification(
|
||||
pingap_webhook::SendNotificationParams {
|
||||
category:
|
||||
pingap_webhook::NotificationCategory::ReloadConfig,
|
||||
level: pingap_webhook::NotificationLevel::Info,
|
||||
msg: format_message("Certificate", updated_certificates),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
pingap_webhook::send_notification(pingap_core::NotificationData {
|
||||
category: "reload_config".to_string(),
|
||||
level: pingap_core::NotificationLevel::Info,
|
||||
message: format_message("Certificate", updated_certificates),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
if !errors.is_empty() {
|
||||
error!(
|
||||
@ -235,13 +243,14 @@ async fn diff_and_update_config(
|
||||
error = errors,
|
||||
"parse certificate fail"
|
||||
);
|
||||
pingap_webhook::send_notification(pingap_webhook::SendNotificationParams {
|
||||
category:
|
||||
pingap_webhook::NotificationCategory::ParseCertificateFail,
|
||||
level: pingap_webhook::NotificationLevel::Error,
|
||||
msg: errors,
|
||||
remark: None,
|
||||
})
|
||||
pingap_webhook::send_notification(
|
||||
pingap_core::NotificationData {
|
||||
category: "parse_certificate_fail".to_string(),
|
||||
level: pingap_core::NotificationLevel::Error,
|
||||
message: errors,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
@ -265,11 +274,10 @@ async fn diff_and_update_config(
|
||||
"reload server location success"
|
||||
);
|
||||
pingap_webhook::send_notification(
|
||||
pingap_webhook::SendNotificationParams {
|
||||
category:
|
||||
pingap_webhook::NotificationCategory::ReloadConfig,
|
||||
level: pingap_webhook::NotificationLevel::Info,
|
||||
msg: format_message(
|
||||
pingap_core::NotificationData {
|
||||
category: "reload_config".to_string(),
|
||||
level: pingap_core::NotificationLevel::Info,
|
||||
message: format_message(
|
||||
"Server Location",
|
||||
updated_servers,
|
||||
),
|
||||
@ -300,21 +308,20 @@ async fn diff_and_update_config(
|
||||
// update current config to be hot reload config
|
||||
set_current_config(&hot_reload_config);
|
||||
if !original_diff_result.is_empty() {
|
||||
pingap_webhook::send_notification(
|
||||
pingap_webhook::SendNotificationParams {
|
||||
category: pingap_webhook::NotificationCategory::DiffConfig,
|
||||
msg: original_diff_result.join("\n").trim().to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
pingap_webhook::send_notification(pingap_core::NotificationData {
|
||||
category: "diff_config".to_string(),
|
||||
message: original_diff_result.join("\n").trim().to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
if !reload_fail_message.is_empty() {
|
||||
pingap_webhook::send_notification(pingap_webhook::SendNotificationParams {
|
||||
category: pingap_webhook::NotificationCategory::ReloadConfigFail,
|
||||
msg: reload_fail_message.clone(),
|
||||
remark: Some("reload config fail".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
pingap_webhook::send_notification(
|
||||
pingap_core::NotificationData {
|
||||
category: "reload_config_fail".to_string(),
|
||||
message: reload_fail_message.clone(),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
@ -339,24 +346,18 @@ async fn diff_and_update_config(
|
||||
}
|
||||
|
||||
if !original_diff_result.is_empty() {
|
||||
pingap_webhook::send_notification(
|
||||
pingap_webhook::SendNotificationParams {
|
||||
category: pingap_webhook::NotificationCategory::DiffConfig,
|
||||
msg: original_diff_result.join("\n").trim().to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
pingap_webhook::send_notification(pingap_core::NotificationData {
|
||||
category: "diff_config".to_string(),
|
||||
message: original_diff_result.join("\n").trim().to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
if !reload_fail_message.is_empty() {
|
||||
pingap_webhook::send_notification(
|
||||
pingap_webhook::SendNotificationParams {
|
||||
category:
|
||||
pingap_webhook::NotificationCategory::ReloadConfigFail,
|
||||
msg: reload_fail_message.clone(),
|
||||
remark: Some("reload config fail".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
pingap_webhook::send_notification(pingap_core::NotificationData {
|
||||
category: "reload_config_fail".to_string(),
|
||||
message: reload_fail_message.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
@ -110,9 +110,9 @@ pub async fn restart_now() -> io::Result<process::Output> {
|
||||
));
|
||||
}
|
||||
info!(category = LOG_CATEGORY, "pingap will restart");
|
||||
pingap_webhook::send_notification(pingap_webhook::SendNotificationParams {
|
||||
category: pingap_webhook::NotificationCategory::Restart,
|
||||
msg: format!("Restart now, pid:{}", std::process::id()),
|
||||
pingap_webhook::send_notification(pingap_core::NotificationData {
|
||||
category: "restart".to_string(),
|
||||
message: format!("Restart now, pid:{}", std::process::id()),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
@ -160,12 +160,11 @@ pub async fn restart() {
|
||||
"restart fail"
|
||||
);
|
||||
pingap_webhook::send_notification(
|
||||
pingap_webhook::SendNotificationParams {
|
||||
level: pingap_webhook::NotificationLevel::Error,
|
||||
category:
|
||||
pingap_webhook::NotificationCategory::RestartFail,
|
||||
msg: e.to_string(),
|
||||
remark: None,
|
||||
pingap_core::NotificationData {
|
||||
level: pingap_core::NotificationLevel::Error,
|
||||
category: "restart_fail".to_string(),
|
||||
message: e.to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
@ -34,7 +34,7 @@ use pingap_certificate::{GlobalCertificate, TlsSettingParams};
|
||||
use pingap_config::get_config_storage;
|
||||
#[cfg(feature = "full")]
|
||||
use pingap_core::OtelTracer;
|
||||
use pingap_core::{convert_headers, HttpHeader};
|
||||
use pingap_core::{convert_header_value, convert_headers, HttpHeader};
|
||||
use pingap_core::{get_cache_key, CompressionStat, Ctx, PluginStep};
|
||||
use pingap_core::{HttpResponse, HTTP_HEADER_NAME_X_REQUEST_ID};
|
||||
use pingap_location::{get_location, Location};
|
||||
@ -45,7 +45,6 @@ use pingap_performance::{
|
||||
new_prometheus, new_prometheus_push_service, Prometheus,
|
||||
};
|
||||
use pingap_service::SimpleServiceTaskFuture;
|
||||
use pingap_state::convert_header_value;
|
||||
use pingap_upstream::get_upstream;
|
||||
use pingora::apps::HttpServerOptions;
|
||||
use pingora::cache::cache_control::CacheControl;
|
||||
@ -1580,7 +1579,7 @@ category = "config"
|
||||
value = 'proxy_set_headers = ["name:value"]'
|
||||
"###;
|
||||
let pingap_conf = PingapConf::new(toml_data.as_ref(), false).unwrap();
|
||||
try_init_upstreams(&pingap_conf.upstreams).unwrap();
|
||||
try_init_upstreams(&pingap_conf.upstreams, None).unwrap();
|
||||
try_init_locations(&pingap_conf.locations).unwrap();
|
||||
try_init_server_locations(&pingap_conf.servers, &pingap_conf.locations)
|
||||
.unwrap();
|
||||
|
@ -185,6 +185,7 @@ export default function Basic() {
|
||||
"reload_config",
|
||||
"reload_config_fail",
|
||||
"tls_validity",
|
||||
"parse_certificate_fail",
|
||||
"service_discover_fail",
|
||||
].sort(),
|
||||
true,
|
||||
|
@ -48,7 +48,7 @@ export default function Home() {
|
||||
if (day.length === 1) {
|
||||
day = `0${day}`;
|
||||
}
|
||||
return `${month}-${day}`;
|
||||
return `${date.getFullYear()}-${month}-${day}`;
|
||||
};
|
||||
const results = {} as Record<string, string>;
|
||||
Object.keys(infos).forEach((name) => {
|
||||
|
@ -14,7 +14,7 @@ import {
|
||||
import { useSearchParams } from "react-router-dom";
|
||||
import { useEffect } from "react";
|
||||
import { useShallow } from "zustand/react/shallow";
|
||||
|
||||
import useBasicState from "@/states/basic";
|
||||
function getPluginConfig(
|
||||
name: string,
|
||||
plugins?: Record<string, Record<string, unknown>>,
|
||||
@ -38,6 +38,8 @@ export default function Plugins() {
|
||||
]),
|
||||
);
|
||||
|
||||
const [basicInfo] = useBasicState(useShallow((state) => [state.data]));
|
||||
|
||||
const newPlugin = "*";
|
||||
const plugins = Object.keys(config.plugins || {});
|
||||
plugins.sort();
|
||||
@ -48,7 +50,7 @@ export default function Plugins() {
|
||||
);
|
||||
const pluginConfig = getPluginConfig(currentPlugin, config.plugins);
|
||||
const [currentCategory, setCurrentCategory] = React.useState(
|
||||
(pluginConfig.catregory as string) || "",
|
||||
(pluginConfig.category as string) || "",
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
@ -81,37 +83,7 @@ export default function Plugins() {
|
||||
category: ExFormItemCategory.RADIOS,
|
||||
span: 6,
|
||||
options: newStringOptions(
|
||||
[
|
||||
PluginCategory.STATS,
|
||||
PluginCategory.PING,
|
||||
PluginCategory.ADMIN,
|
||||
PluginCategory.DIRECTORY,
|
||||
PluginCategory.MOCK,
|
||||
PluginCategory.REDIRECT,
|
||||
PluginCategory.CACHE,
|
||||
|
||||
PluginCategory.REQUEST_ID,
|
||||
PluginCategory.COMPRESSION,
|
||||
PluginCategory.ACCEPT_ENCODING,
|
||||
|
||||
// auth
|
||||
PluginCategory.KEY_AUTH,
|
||||
PluginCategory.BASIC_AUTH,
|
||||
PluginCategory.JWT,
|
||||
PluginCategory.COMBINED_AUTH,
|
||||
|
||||
// limit
|
||||
PluginCategory.LIMIT,
|
||||
PluginCategory.IP_RESTRICTION,
|
||||
PluginCategory.UA_RESTRICTION,
|
||||
PluginCategory.REFERER_RESTRICTION,
|
||||
PluginCategory.CSRF,
|
||||
PluginCategory.CORS,
|
||||
|
||||
// response
|
||||
PluginCategory.RESPONSE_HEADERS,
|
||||
PluginCategory.SUB_FILTER,
|
||||
],
|
||||
basicInfo.supported_plugins,
|
||||
true,
|
||||
),
|
||||
},
|
||||
|
@ -24,6 +24,7 @@ interface Basic {
|
||||
fd_count: number;
|
||||
tcp_count: number;
|
||||
tcp6_count: number;
|
||||
supported_plugins: string[];
|
||||
}
|
||||
|
||||
interface ConfigState {
|
||||
@ -56,6 +57,7 @@ const useBasicState = create<ConfigState>()((set) => ({
|
||||
fd_count: 0,
|
||||
tcp_count: 0,
|
||||
tcp6_count: 0,
|
||||
supported_plugins: [],
|
||||
},
|
||||
initialized: false,
|
||||
fetch: async () => {
|
||||
|
Loading…
x
Reference in New Issue
Block a user