# Connection pooling and reuse
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#connection-pooling-and-reuse)
When the request to a Peer (upstream server) is finished, the connection to that peer is kept alive and added to a connection pool to be _reused_ by subsequent requests. This happens automatically without any special configuration.
Requests that reuse previously established connections avoid the latency and compute cost of setting up a new connection, improving the Pingora server's overall performance and scalability.
## Same Peer
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#same-peer)
Only the connections to the exact same Peer can be reused by a request. For correctness and security reasons, two Peers are the same if and only if all the following attributes are the same
- IP:port
- scheme
- SNI
- client cert
- verify cert
- verify hostname
- alternative\_cn
- proxy settings
## Disable pooling
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#disable-pooling)
To disable connection pooling and reuse to a certain Peer, just set the idle_timeout to 0 seconds to all requests using that Peer.
## Failure
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#failure)
A connection is considered not reusable if errors happen during the request.
# Daemonization
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/daemon.md#daemonization)
When a Pingora server is configured to run as a daemon, after its bootstrapping, it will move itself to the background and optionally change to run under the configured user and group. The pid_file option comes handy in this case for the user to track the PID of the daemon in the background.
Daemonization also allows the server to perform privileged actions like loading secrets and then switch to an unprivileged user before accepting any requests from the network.
This process happens in the run_forever() call. Because daemonization involves fork(), certain things like threads created before this call are likely lost.
# Configuration
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/conf.md#configuration)
A Pingora configuration file is a list of Pingora settings in yaml format.
Example
\---
version: 1
threads: 2
pid\_file: /run/pingora.pid
upgrade\_sock: /tmp/pingora\_upgrade.sock
user: nobody
group: webusers
## Settings
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/conf.md#settings)
Key
meaning
value type
version
the version of the conf, currently it is a constant 1
number
pid\_file
The path to the pid file
string
daemon
whether to run the server in the background
bool
error\_log
the path to error log output file. STDERR is used if not set
string
upgrade\_sock
the path to the upgrade socket.
string
threads
number of threads per service
number
user
the user the pingora server should be run under after daemonization
string
group
the group the pingora server should be run under after daemonization
string
client\_bind\_to\_ipv4
source IPv4 addresses to bind to when connecting to server
list of string
client\_bind\_to\_ipv6
source IPv6 addresses to bind to when connecting to server
list of string
ca\_file
The path to the root CA file
string
work\_stealing
Enable work stealing runtime (default true). See Pingora runtime (WIP) section for more info
bool
upstream\_keepalive\_pool\_size
The number of total connections to keep in the connection pool
number
## Extension
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/conf.md#extension)
Any unknown settings will be ignored. This allows extending the conf file to add and pass user defined settings. See User defined configuration section.
# Sharing state across phases with CTX
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/ctx.md#sharing-state-across-phases-with-ctx)
## Using CTX
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/ctx.md#using-ctx)
The custom filters users implement in different phases of the request don't interact with each other directly. In order to share information and state across the filters, users can define a CTX struct. Each request owns a single CTX object. All the filters are able to read and update members of the CTX object. The CTX object will be dropped at the end of the request.
### Example
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/ctx.md#example)
In the following example, the proxy parses the request header in the request_filter phase, it stores the boolean flag so that later in the upstream_peer phase the flag is used to decide which server to route traffic to. (Technically, the header can be parsed in upstream_peer phase, but we just do it in an earlier phase just for the demonstration.)
pub struct MyProxy();
pub struct MyCtx {
beta\_user: bool,
}
fn check\_beta\_user(req: &pingora\_http::RequestHeader) -> bool {
// some simple logic to check if user is beta
req.headers.get("beta-flag").is\_some()
}
#\[async\_trait\]
impl ProxyHttp for MyProxy {
type CTX = MyCtx;
fn new\_ctx(&self) -> Self::CTX {
MyCtx { beta\_user: false }
}
async fn request\_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool\> {
ctx.beta\_user = check\_beta\_user(session.req\_header());
Ok(false)
}
async fn upstream\_peer(
&self,
\_session: &mut Session,
ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer\>\> {
let addr = if ctx.beta\_user {
info!("I'm a beta user");
("1.0.0.1", 443)
} else {
("1.1.1.1", 443)
};
let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to\_string()));
Ok(peer)
}
}
## Sharing state across requests
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/ctx.md#sharing-state-across-requests)
Sharing state such as a counter, cache and other info across requests is common. There is nothing special needed for sharing resources and data across requests in Pingora. Arc, static or any other mechanism can be used.
### Example
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/ctx.md#example-1)
Let's modify the example above to track the number of beta visitors as well as the number of total visitors. The counters can either be defined in the MyProxy struct itself or defined as a global variable. Because the counters can be concurrently accessed, Mutex is used here.
// global counter
static REQ\_COUNTER: Mutex<usize\> = Mutex::new(0);
pub struct MyProxy {
// counter for the service
beta\_counter: Mutex<usize\>, // AtomicUsize works too
}
pub struct MyCtx {
beta\_user: bool,
}
fn check\_beta\_user(req: &pingora\_http::RequestHeader) -> bool {
// some simple logic to check if user is beta
req.headers.get("beta-flag").is\_some()
}
#\[async\_trait\]
impl ProxyHttp for MyProxy {
type CTX = MyCtx;
fn new\_ctx(&self) -> Self::CTX {
MyCtx { beta\_user: false }
}
async fn request\_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool\> {
ctx.beta\_user = check\_beta\_user(session.req\_header());
Ok(false)
}
async fn upstream\_peer(
&self,
\_session: &mut Session,
ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer\>\> {
let mut req\_counter = REQ\_COUNTER.lock().unwrap();
\*req\_counter += 1;
let addr = if ctx.beta\_user {
let mut beta\_count = self.beta\_counter.lock().unwrap();
\*beta\_count += 1;
info!("I'm a beta user #{beta\_count}");
("1.0.0.1", 443)
} else {
info!("I'm an user #{req\_counter}");
("1.1.1.1", 443)
};
let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to\_string()));
Ok(peer)
}
}
The complete example can be found under [pingora-proxy/examples/ctx.rs](https://github.com/cloudflare/pingora/blob/main/pingora-proxy/examples/ctx.rs). You can run it using cargo:
RUST_LOG=INFO cargo run --example ctx
# How to return errors
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/errors.md#how-to-return-errors)
For easy error handling, the pingora-error crate exports a custom Result type used throughout other Pingora crates.
The Error struct used in this Result's error variant is a wrapper around arbitrary error types. It allows the user to tag the source of the underlying error and attach other custom context info.
Users will often need to return errors by propagating an existing error or creating a wholly new one. pingora-error makes this easy with its error building functions.
## Examples
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/errors.md#examples)
For example, one could return an error when an expected header is not present:
fn validate\_req\_header(req: &RequestHeader) -> Result<()\> {
// validate that the \host\ header exists
req.headers()
.get(http::header::HOST)
.ok\_or\_else(|| Error::explain(InvalidHTTPHeader, "No host header detected"))
}
impl MyServer {
pub async fn handle\_request\_filter(
&self,
http\_session: &mut Session,
ctx: &mut CTX,
) -> Result<bool\> {
validate\_req\_header(session.req\_header()?).or\_err(HTTPStatus(400), "Missing required headers")?;
Ok(true)
}
}
validate_req_header returns an Error if the host header is not found, using Error::explain to create a new Error along with an associated type (InvalidHTTPHeader) and helpful context that may be logged in an error log.
This error will eventually propagate to the request filter, where it is returned as a new HTTPStatus error using or_err. (As part of the default pingora-proxy fail_to_proxy() phase, not only will this error be logged, but it will result in sending a 400 Bad Request response downstream.)
Note that the original causing error will be visible in the error logs as well. or_err wraps the original causing error in a new one with additional context, but Error's Display implementation also prints the chain of causing errors.
## Guidelines
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/errors.md#guidelines)
An error has a _type_ (e.g. ConnectionClosed), a _source_ (e.g. Upstream, Downstream, Internal), and optionally, a _cause_ (another wrapped error) and a _context_ (arbitrary user-provided string details).
A minimal error can be created using functions like new_in / new_up / new_down, each of which specifies a source and asks the user to provide a type.
Generally speaking:
- To create a new error, without a direct cause but with more context, use Error::explain. You can also use explain_err on a Result to replace the potential error inside it with a new one.
- To wrap a causing error in a new one with more context, use Error::because. You can also use or_err on a Result to replace the potential error inside it by wrapping the original one.
## Retry
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/errors.md#retry)
Errors can be "retry-able." If the error is retry-able, pingora-proxy will be allowed to retry the upstream request. Some errors are only retry-able on [reused connections](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md), e.g. to handle situations where the remote end has dropped a connection we attempted to reuse.
By default a newly created Error either takes on its direct causing error's retry status, or, if left unspecified, is considered not retry-able.
# Error logging
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/error_log.md#error-logging)
Pingora libraries are built to expect issues like disconnects, timeouts and invalid inputs from the network. A common way to record these issues are to output them in error log (STDERR or log files).
## Log level guidelines
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/error_log.md#log-level-guidelines)
Pingora adopts the idea behind [log](https://docs.rs/log/latest/log/). There are five log levels:
- error: This level should be used when the error stops the request from being handled correctly. For example when the server we try to connect to is offline.
- warning: This level should be used when an error occurs but the system recovers from it. For example when the primary DNS timed out but the system is able to query the secondary DNS.
- info: Pingora logs when the server is starting up or shutting down.
- debug: Internal details. This log level is not compiled in release builds.
- trace: Fine-grained internal details. This log level is not compiled in release builds.
The pingora-proxy crate has a well-defined interface to log errors, so that users don't have to manually log common proxy errors. See its guide for more details.
# Handling failures and failover
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/failover.md#handling-failures-and-failover)
Pingora-proxy allows users to define how to handle failures throughout the life of a proxied request.
When a failure happens before the response header is sent downstream, users have a few options:
1. Send an error page downstream and then give up.
2. Retry the same upstream again.
3. Try another upstream if applicable.
Otherwise, once the response header is already sent downstream, there is nothing the proxy can do other than logging an error and then giving up on the request.
## Retry / Failover
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/failover.md#retry--failover)
In order to implement retry or failover, fail_to_connect() / error_while_proxy() needs to mark the error as "retry-able." For failover, fail_to_connect() / error_while_proxy() also needs to update the CTX to tell upstream_peer() not to use the same Peer again.
### Safety
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/failover.md#safety)
In general, idempotent HTTP requests, e.g., GET, are safe to retry. Other requests, e.g., POST, are not safe to retry if the requests have already been sent. When fail_to_connect() is called, pingora-proxy guarantees that nothing was sent upstream. Users are not recommended to retry a non-idempotent request after error_while_proxy() unless they know the upstream server enough to know whether it is safe.
### Example
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/failover.md#example)
In the following example we set a tries variable on the CTX to track how many connection attempts we've made. When setting our peer in upstream_peer we check if tries is less than one and connect to 192.0.2.1. On connect failure we increment tries in fail_to_connect and set e.set_retry(true) which tells Pingora this is a retryable error. On retry, we enter upstream_peer again and this time connect to 1.1.1.1. If we're unable to connect to 1.1.1.1 we return a 502 since we only set e.set_retry(true) in fail_to_connect when tries is zero.
pub struct MyProxy();
pub struct MyCtx {
tries: usize,
}
#\[async\_trait\]
impl ProxyHttp for MyProxy {
type CTX = MyCtx;
fn new\_ctx(&self) -> Self::CTX {
MyCtx { tries: 0 }
}
fn fail\_to\_connect(
&self,
\_session: &mut Session,
\_peer: &HttpPeer,
ctx: &mut Self::CTX,
mut e: Box<Error\>,
) -> Box<Error\> {
if ctx.tries > 0 {
return e;
}
ctx.tries += 1;
e.set\_retry(true);
e
}
async fn upstream\_peer(
&self,
\_session: &mut Session,
ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer\>\> {
let addr = if ctx.tries < 1 {
("192.0.2.1", 443)
} else {
("1.1.1.1", 443)
};
let mut peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to\_string()));
peer.options.connection\_timeout = Some(Duration::from\_millis(100));
Ok(peer)
}
}
# Graceful restart and shutdown
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/graceful.md#graceful-restart-and-shutdown)
Graceful restart, upgrade, and shutdown mechanisms are very commonly used to avoid errors or downtime when releasing new versions of Pingora servers.
Pingora graceful upgrade mechanism guarantees the following:
- A request is guaranteed to be handled either by the old server instance or the new one. No request will see connection refused when trying to connect to the server endpoints.
- A request that can finish within the grace period is guaranteed not to be terminated.
## How to graceful upgrade
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/graceful.md#how-to-graceful-upgrade)
### Step 0
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/graceful.md#step-0)
Configure the upgrade socket. The old and new server need to agree on the same path to this socket. See configuration manual for details.
### Step 1
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/graceful.md#step-1)
Start the new instance with the --upgrade CLI option. The new instance will not try to listen to the service endpoint right away. It will try to acquire the listening socket from the old instance instead.
### Step 2
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/graceful.md#step-2)
Send SIGQUIT signal to the old instance. The old instance will start to transfer the listening socket to the new instance.
Once step 2 is successful, the new instance will start to handle new incoming connections right away. Meanwhile, the old instance will enter its graceful shutdown mode. It waits a short period of time (to give the new instance time to initialize and prepare to handle traffic), after which it will not accept any new connections.
# Pingora Internals
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#pingora-internals)
(Special thanks to [James Munns](https://github.com/jamesmunns) for writing this section)
## Starting the Server
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#starting-the-server)
The pingora system starts by spawning a _server_. The server is responsible for starting _services_, and listening for termination events.
┌───────────┐
┌─────────>│ Service │
│ └───────────┘
┌────────┐ │ ┌───────────┐
│ Server │──Spawns──┼─────────>│ Service │
└────────┘ │ └───────────┘
│ ┌───────────┐
└─────────>│ Service │
└───────────┘
After spawning the _services_, the server continues to listen to a termination event, which it will propagate to the created services.
## Services
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#services)
_Services_ are entities that handle listening to given sockets, and perform the core functionality. A _service_ is tied to a particular protocol and set of options.
> NOTE: there are also "background" services, which just do _stuff_, and aren't necessarily listening to a socket. For now we're just talking about listener services.
Each service has its own threadpool/tokio runtime, with a number of threads based on the configured value. Worker threads are not shared cross-service. Service runtime threadpools may be work-stealing (tokio-default), or non-work-stealing (N isolated single threaded runtimes).
┌─────────────────────────┐
│ ┌─────────────────────┐ │
│ │┌─────────┬─────────┐│ │
│ ││ Conn │ Conn ││ │
│ │├─────────┼─────────┤│ │
│ ││Endpoint │Endpoint ││ │
│ │├─────────┴─────────┤│ │
│ ││ Listeners ││ │
│ │├─────────┬─────────┤│ │
│ ││ Worker │ Worker ││ │
│ ││ Thread │ Thread ││ │
│ │├─────────┴─────────┤│ │
│ ││ Tokio Executor ││ │
│ │└───────────────────┘│ │
│ └─────────────────────┘ │
│ ┌───────┐ │
└─┤Service├───────────────┘
└───────┘
## Service Listeners
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#service-listeners)
At startup, each Service is assigned a set of downstream endpoints that they listen to. A single service may listen to more than one endpoint. The Server also passes along any relevant configuration, including TLS settings if relevant.
These endpoints are converted into listening sockets, called TransportStacks. Each TransportStack is assigned to an async task within that service's executor.
┌───────────────────┐
│┌─────────────────┐│ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
┌─────────┐ ││ TransportStack ││ ┌────────────────────┐│
┌┤Listeners├────────┐ ││ ││ │ │ ││ │
│└─────────┘ │ ││ (Listener, TLS │├──────spawn(run_endpoint())────>│ Service<ServerApp> ││
│┌─────────────────┐│ ││ Acceptor, ││ │ │ ││ │
││ Endpoint ││ ││ UpgradeFDs) ││ └────────────────────┘│
││ addr/ports ││ │├─────────────────┤│ │ │ │
││ + TLS Settings ││ ││ TransportStack ││ ┌────────────────────┐│
│├─────────────────┤│ ││ ││ │ │ ││ │
││ Endpoint ││──build()─> ││ (Listener, TLS │├──────spawn(run_endpoint())────>│ Service<ServerApp> ││
││ addr/ports ││ ││ Acceptor, ││ │ │ ││ │
││ + TLS Settings ││ ││ UpgradeFDs) ││ └────────────────────┘│
│├─────────────────┤│ │├─────────────────┤│ │ │ │
││ Endpoint ││ ││ TransportStack ││ ┌────────────────────┐│
││ addr/ports ││ ││ ││ │ │ ││ │
││ + TLS Settings ││ ││ (Listener, TLS │├──────spawn(run_endpoint())────>│ Service<ServerApp> ││
│└─────────────────┘│ ││ Acceptor, ││ │ │ ││ │
└───────────────────┘ ││ UpgradeFDs) ││ └────────────────────┘│
│└─────────────────┘│ │ ┌───────────────┐ │ │ ┌──────────────┐
└───────────────────┘ ─│start_service()│─ ─ ─ ─│ Worker Tasks ├ ─ ─ ┘
└───────────────┘ └──────────────┘
## Downstream connection lifecycle
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#downstream-connection-lifecycle)
Each service processes incoming connections by spawning a task-per-connection. These connections are held open as long as there are new events to be handled.
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
│ ┌───────────────┐ ┌────────────────┐ ┌─────────────────┐ ┌─────────────┐ │
┌────────────────────┐ │ UninitStream │ │ Service │ │ App │ │ Task Ends │
│ │ │ │ ::handshake() │──>│::handle_event()│──>│ ::process_new() │──┬>│ │ │
│ Service<ServerApp> │──spawn()──> └───────────────┘ └────────────────┘ └─────────────────┘ │ └─────────────┘
│ │ │ ▲ │ │
└────────────────────┘ │ while
│ └─────────reuse │
┌───────────────────────────┐
└ ─│ Task on Service Runtime │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
└───────────────────────────┘
## What is a proxy then?
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#what-is-a-proxy-then)
Interestingly, the pingora Server itself has no particular notion of a Proxy.
Instead, it only thinks in terms of Services, which are expected to contain a particular implementor of the ServiceApp trait.
For example, this is how an HttpProxy struct, from the pingora-proxy crate, "becomes" a Service spawned by the Server:
┌─────────────┐
│ HttpProxy │
│ (struct) │
└─────────────┘
│
implements ┌─────────────┐
│ │HttpServerApp│
└───────>│ (trait) │
└─────────────┘
│
implements ┌─────────────┐
│ │ ServerApp │
└───────>│ (trait) │
└─────────────┘
│
contained ┌─────────────────────┐
within │ │
└───────>│ Service<ServiceApp> │
│ │
└─────────────────────┘
Different functionalities and helpers are provided at different layers in this representation.
┌─────────────┐ ┌──────────────────────────────────────┐
│ HttpProxy │ │Handles high level Proxying workflow, │
│ (struct) │─ ─ ─ ─ │ customizable via ProxyHttp trait │
└──────┬──────┘ └──────────────────────────────────────┘
│
┌──────▼──────┐ ┌──────────────────────────────────────┐
│HttpServerApp│ │ Handles selection of H1 vs H2 stream │
│ (trait) │─ ─ ─ ─ │ handling, incl H2 handshake │
└──────┬──────┘ └──────────────────────────────────────┘
│
┌──────▼──────┐ ┌──────────────────────────────────────┐
│ ServerApp │ │ Handles dispatching of App instances │
│ (trait) │─ ─ ─ ─ │ as individual tasks, per Session │
└──────┬──────┘ └──────────────────────────────────────┘
│
┌──────▼──────┐ ┌──────────────────────────────────────┐
│ Service<A> │ │ Handles dispatching of App instances │
│ (struct) │─ ─ ─ ─ │ as individual tasks, per Listener │
└─────────────┘ └──────────────────────────────────────┘
The HttpProxy struct handles the high level workflow of proxying an HTTP connection
It uses the ProxyHttp (note the flipped wording order!) **trait** to allow customization at each of the following steps (note: taken from [the phase chart](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase_chart.md) doc):
Loading
graph TD;
start("new request")-->request\_filter;
request\_filter-->upstream\_peer;
upstream\_peer-->Connect{{IO: connect to upstream}};
Connect--connection success-->connected\_to\_upstream;
Connect--connection failure-->fail\_to\_connect;
connected\_to\_upstream-->upstream\_request\_filter;
upstream\_request\_filter --> SendReq{{IO: send request to upstream}};
SendReq-->RecvResp{{IO: read response from upstream}};
RecvResp-->upstream\_response\_filter-->response\_filter-->upstream\_response\_body\_filter-->response\_body\_filter-->logging-->endreq("request done");
fail\_to\_connect --can retry-->upstream\_peer;
fail\_to\_connect --can't retry-->fail\_to\_proxy--send error response-->logging;
RecvResp--failure-->IOFailure;
SendReq--failure-->IOFailure;
error\_while\_proxy--can retry-->upstream\_peer;
error\_while\_proxy--can't retry-->fail\_to\_proxy;
request\_filter --send response-->logging
Error>any response filter error\]-->error\_while\_proxy
IOFailure>IO error\]-->error\_while\_proxy
## Zooming out
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#zooming-out)
Before we zoom in, it's probably good to zoom out and remind ourselves how a proxy generally works:
┌────────────┐ ┌─────────────┐ ┌────────────┐
│ Downstream │ │ Proxy │ │ Upstream │
│ Client │─────────>│ │────────>│ Server │
└────────────┘ └─────────────┘ └────────────┘
The proxy will be taking connections from the **Downstream** client, and (if everything goes right), establishing a connection with the appropriate **Upstream** server. This selected upstream server is referred to as the **Peer**.
Once the connection is established, the Downstream and Upstream can communicate bidirectionally.
So far, the discussion of Server, Services, and Listeners have focused on the LEFT half of this diagram, handling incoming Downstream connections, and getting it TO the proxy component.
Next, we'll look at the RIGHT half of this diagram, connecting to Upstreams.
## Managing the Upstream
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#managing-the-upstream)
Connections to Upstream Peers are made through Connectors. This is not a specific type or trait, but more of a "style".
Connectors are responsible for a few things:
- Establishing a connection with a Peer
- Maintaining a connection pool with the Peer, allowing for connection reuse across:
- Multiple requests from a single downstream client
- Multiple requests from different downstream clients
- Measuring health of connections, for connections like H2, which perform regular pings
- Handling protocols with multiple poolable layers, like H2
- Caching, if relevant to the protocol and enabled
- Compression, if relevant to the protocol and enabled
Now in context, we can see how each end of the Proxy is handled:
┌────────────┐ ┌─────────────┐ ┌────────────┐
│ Downstream │ ┌ ─│─ Proxy ┌ ┼ ─ │ Upstream │
│ Client │─────────>│ │ │──┼─────>│ Server │
└────────────┘ │ └───────────┼─┘ └────────────┘
─ ─ ┘ ─ ─ ┘
▲ ▲
┌──┘ └──┐
│ │
┌ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─
Listeners Connectors│
└ ─ ─ ─ ─ ┘ └ ─ ─ ─ ─ ─
## What about multiple peers?
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#what-about-multiple-peers)
Connectors only handle the connection to a single peer, so selecting one of potentially multiple Peers is actually handled one level up, in the upstream_peer() method of the ProxyHttp trait.
# Examples: taking control of the request
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md#examples-taking-control-of-the-request)
In this section we will go through how to route, modify or reject requests.
## Routing
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md#routing)
Any information from the request can be used to make routing decision. Pingora doesn't impose any constraints on how users could implement their own routing logic.
In the following example, the proxy sends traffic to 1.0.0.1 only when the request path start with /family/. All the other requests are routed to 1.1.1.1.
pub struct MyGateway;
#\[async\_trait\]
impl ProxyHttp for MyGateway {
type CTX = ();
fn new\_ctx(&self) -> Self::CTX {}
async fn upstream\_peer(
&self,
session: &mut Session,
\_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer\>\> {
let addr = if session.req\_header().uri.path().starts\_with("/family/") {
("1.0.0.1", 443)
} else {
("1.1.1.1", 443)
};
info!("connecting to {addr:?}");
let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to\_string()));
Ok(peer)
}
}
## Modifying headers
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md#modifying-headers)
Both request and response headers can be added, removed or modified in their corresponding phases. In the following example, we add logic to the response_filter phase to update the Server header and remove the alt-svc header.
#\[async\_trait\]
impl ProxyHttp for MyGateway {
...
async fn response\_filter(
&self,
\_session: &mut Session,
upstream\_response: &mut ResponseHeader,
\_ctx: &mut Self::CTX,
) -> Result<()\>
where
Self::CTX: Send + Sync,
{
// replace existing header if any
upstream\_response
.insert\_header("Server", "MyGateway")
.unwrap();
// because we don't support h3
upstream\_response.remove\_header("alt-svc");
Ok(())
}
}
## Return Error pages
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md#return-error-pages)
Sometimes instead of proxying the traffic, under certain conditions, such as authentication failures, you might want the proxy to just return an error page.
fn check\_login(req: &pingora\_http::RequestHeader) -> bool {
// implement you logic check logic here
req.headers.get("Authorization").map(|v| v.as\_bytes()) == Some(b"password")
}
#\[async\_trait\]
impl ProxyHttp for MyGateway {
...
async fn request\_filter(&self, session: &mut Session, \_ctx: &mut Self::CTX) -> Result<bool\> {
if session.req\_header().uri.path().starts\_with("/login")
&& !check\_login(session.req\_header())
{
let \_ = session.respond\_error(403).await;
// true: tell the proxy that the response is already written
return Ok(true);
}
Ok(false)
}
## Logging
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md#logging)
Logging logic can be added to the logging phase of Pingora. The logging phase runs on every request right before Pingora proxy finish processing it. This phase runs for both successful and failed requests.
In the example below, we add Prometheus metric and access logging to the proxy. In order for the metrics to be scraped, we also start a Prometheus metric server on a different port.
pub struct MyGateway {
req\_metric: prometheus::IntCounter,
}
#\[async\_trait\]
impl ProxyHttp for MyGateway {
...
async fn logging(
&self,
session: &mut Session,
\_e: Option<&pingora::Error\>,
ctx: &mut Self::CTX,
) {
let response\_code = session
.response\_written()
.map\_or(0, |resp| resp.status.as\_u16());
// access log
info!(
"{} response code: {response\_code}",
self.request\_summary(session, ctx)
);
self.req\_metric.inc();
}
fn main() {
...
let mut prometheus\_service\_http =
pingora::services::listening::Service::prometheus\_http\_service();
prometheus\_service\_http.add\_tcp("127.0.0.1:6192");
my\_server.add\_service(prometheus\_service\_http);
my\_server.run\_forever();
}
# Handling panics
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/panic.md#handling-panics)
Any panic that happens to particular requests does not affect other ongoing requests or the server's ability to handle other requests. Sockets acquired by the panicking requests are dropped (closed). The panics will be captured by the tokio runtime and then ignored.
In order to monitor the panics, Pingora server has built-in Sentry integration.
my\_server.sentry = Some(
sentry::ClientOptions{
dsn: "SENTRY\_DSN".into\_dsn().unwrap(),
..Default::default()
}
);
Even though a panic is not fatal in Pingora, it is still not the preferred way to handle failures like network timeouts. Panics should be reserved for unexpected logic errors.
# Peer: how to connect to upstream
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/peer.md#peer-how-to-connect-to-upstream)
In the upstream_peer() phase the user should return a Peer object which defines how to connect to a certain upstream.
## Peer
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/peer.md#peer)
A HttpPeer defines which upstream to connect to.
attribute
meaning
address: SocketAddr
The IP:Port to connect to
scheme: Scheme
Http or Https
sni: String
The SNI to use, Https only
proxy: Option<Proxy>
The setting to proxy the request through a [CONNECT proxy](https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/CONNECT)
client\_cert\_key: Option<Arc<CertKey>>
The client certificate to use in mTLS connections to upstream
options: PeerOptions
See below
## PeerOptions
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/peer.md#peeroptions)
A PeerOptions defines how to connect to the upstream.
attribute
meaning
bind\_to: Option<InetSocketAddr>
Which local address to bind to as the client IP
connection\_timeout: Option<Duration>
How long to wait before giving up _establishing_ a TCP connection
total\_connection\_timeout: Option<Duration>
How long to wait before giving up _establishing_ a connection including TLS handshake time
read\_timeout: Option<Duration>
How long to wait before each individual read() from upstream. The timer is reset after each read()
idle\_timeout: Option<Duration>
How long to wait before closing a idle connection waiting for connection reuse
write\_timeout: Option<Duration>
How long to wait before a write() to upstream finishes
verify\_cert: bool
Whether to check if upstream' server cert is valid and validated
verify\_hostname: bool
Whether to check if upstream server cert's CN matches the SNI
alternative\_cn: Option<String>
Accept the cert if the CN matches this name
alpn: ALPN
Which HTTP protocol to advertise during ALPN, http1.1 and/or http2
ca: Option<Arc<Box<[X509]>>>
Which Root CA to use to validate the server's cert
tcp\_keepalive: Option<TcpKeepalive>
TCP keepalive settings to upstream
## Examples
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/peer.md#examples)
TBD
# Life of a request: pingora-proxy phases and filters
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#life-of-a-request-pingora-proxy-phases-and-filters)
## Intro
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#intro)
The pingora-proxy HTTP proxy framework supports highly programmable proxy behaviors. This is done by allowing users to inject custom logic into different phases (stages) in the life of a request.
## Life of a proxied HTTP request
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#life-of-a-proxied-http-request)
1. The life of a proxied HTTP request starts when the proxy reads the request header from the **downstream** (i.e., the client).
2. Then, the proxy connects to the **upstream** (i.e., the remote server). This step is skipped if there is a previously established [connection to reuse](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md).
3. The proxy then sends the request header to the upstream.
4. Once the request header is sent, the proxy enters a duplex mode, which simultaneously proxies: a. upstream response (both header and body) to the downstream, and b. downstream request body to upstream (if any).
5. Once the entire request/response finishes, the life of the request is ended. All resources are released. The downstream connections and the upstream connections are recycled to be reused if applicable.
## Pingora-proxy phases and filters
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#pingora-proxy-phases-and-filters)
Pingora-proxy allows users to insert arbitrary logic into the life of a request.
Loading
graph TD;
start("new request")-->early\_request\_filter;
early\_request\_filter-->request\_filter;
request\_filter-->upstream\_peer;
upstream\_peer-->Connect{{IO: connect to upstream}};
Connect--connection success-->connected\_to\_upstream;
Connect--connection failure-->fail\_to\_connect;
connected\_to\_upstream-->upstream\_request\_filter;
upstream\_request\_filter --> request\_body\_filter;
request\_body\_filter --> SendReq{{IO: send request to upstream}};
SendReq-->RecvResp{{IO: read response from upstream}};
RecvResp-->upstream\_response\_filter-->response\_filter-->upstream\_response\_body\_filter-->response\_body\_filter-->logging-->endreq("request done");
fail\_to\_connect --can retry-->upstream\_peer;
fail\_to\_connect --can't retry-->fail\_to\_proxy--send error response-->logging;
RecvResp--failure-->IOFailure;
SendReq--failure-->IOFailure;
error\_while\_proxy--can retry-->upstream\_peer;
error\_while\_proxy--can't retry-->fail\_to\_proxy;
request\_filter --send response-->logging
Error>any response filter error\]-->error\_while\_proxy
IOFailure>IO error\]-->error\_while\_proxy
### General filter usage guidelines
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#general-filter-usage-guidelines)
- Most filters return a [pingora_error::Result<_>](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/errors.md). When the returned value is Result::Err, fail_to_proxy() will be called and the request will be terminated.
- Most filters are async functions, which allows other async operations such as IO to be performed within the filters.
- A per-request CTX object can be defined to share states across the filters of the same request. All filters have mutable access to this object.
- Most filters are optional.
- The reason both upstream_response_*_filter() and response_*_filter() exist is for HTTP caching integration reasons (still WIP).
### early_request_filter()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#early_request_filter)
This is the first phase of every request.
This function is similar to request_filter() but executes before any other logic, including downstream module logic. The main purpose of this function is to provide finer-grained control of the behavior of the modules.
### request_filter()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#request_filter)
This phase is usually for validating request inputs, rate limiting, and initializing context.
### request_body_filter()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#request_body_filter)
This phase is triggered after a response body is ready to send to upstream. It will be called every time a piece of request body is received.
### proxy_upstream_filter()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#proxy_upstream_filter)
This phase determines if we should continue to the upstream to serve a response. If we short-circuit, a 502 is returned by default, but a different response can be implemented.
This phase returns a boolean determining if we should continue to the upstream or error.
### upstream_peer()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#upstream_peer)
This phase decides which upstream to connect to (e.g. with DNS lookup and hashing/round-robin), and how to connect to it.
This phase returns a Peer that defines the upstream to connect to. Implementing this phase is **required**.
### connected_to_upstream()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#connected_to_upstream)
This phase is executed when upstream is successfully connected.
Usually this phase is for logging purposes. Connection info such as RTT and upstream TLS ciphers are reported in this phase.
### fail_to_connect()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#fail_to_connect)
The counterpart of connected_to_upstream(). This phase is called if an error is encountered when connecting to upstream.
In this phase users can report the error in Sentry/Prometheus/error log. Users can also decide if the error is retry-able.
If the error is retry-able, upstream_peer() will be called again, in which case the user can decide whether to retry the same upstream or failover to a secondary one.
If the error is not retry-able, the request will end.
### upstream_request_filter()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#upstream_request_filter)
This phase is to modify requests before sending to upstream.
### upstream_response_filter()/upstream_response_body_filter()/upstream_response_trailer_filter()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#upstream_response_filterupstream_response_body_filterupstream_response_trailer_filter)
This phase is triggered after an upstream response header/body/trailer is received.
This phase is to modify or process response headers, body, or trailers before sending to downstream. Note that this phase is called _prior_ to HTTP caching and therefore any changes made here will affect the response stored in the HTTP cache.
### response_filter()/response_body_filter()/response_trailer_filter()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#response_filterresponse_body_filterresponse_trailer_filter)
This phase is triggered after a response header/body/trailer is ready to send to downstream.
This phase is to modify them before sending to downstream.
### error_while_proxy()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#error_while_proxy)
This phase is triggered during proxy errors to upstream, this is after the connection is established.
This phase may decide to retry a request if the connection was re-used and the HTTP method is idempotent.
### fail_to_proxy()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#fail_to_proxy)
This phase is called whenever an error is encounter during any of the phases above.
This phase is usually for error logging and error reporting to downstream.
### logging()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#logging)
This is the last phase that runs after the request is finished (or errors) and before any of its resources are released. Every request will end up in this final phase.
This phase is usually for logging and post request cleanup.
### request_summary()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#request_summary)
This is not a phase, but a commonly used callback.
Every error that reaches fail_to_proxy() will be automatically logged in the error log. request_summary() will be called to dump the info regarding the request when logging the error.
This callback returns a string which allows users to customize what info to dump in the error log to help track and debug the failures.
### suppress_error_log()
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#suppress_error_log)
This is also not a phase, but another callback.
fail_to_proxy() errors are automatically logged in the error log, but users may not be interested in every error. For example, downstream errors are logged if the client disconnects early, but these errors can become noisy if users are mainly interested in observing upstream issues. This callback can inspect the error and returns true or false. If true, the error will not be written to the log.
### Cache filters
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#cache-filters)
To be documented
# Connection pooling and reuse
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#connection-pooling-and-reuse)
When the request to a Peer (upstream server) is finished, the connection to that peer is kept alive and added to a connection pool to be _reused_ by subsequent requests. This happens automatically without any special configuration.
Requests that reuse previously established connections avoid the latency and compute cost of setting up a new connection, improving the Pingora server's overall performance and scalability.
## Same Peer
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#same-peer)
Only the connections to the exact same Peer can be reused by a request. For correctness and security reasons, two Peers are the same if and only if all the following attributes are the same
- IP:port
- scheme
- SNI
- client cert
- verify cert
- verify hostname
- alternative\_cn
- proxy settings
## Disable pooling
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#disable-pooling)
To disable connection pooling and reuse to a certain Peer, just set the idle_timeout to 0 seconds to all requests using that Peer.
## Failure
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#failure)
A connection is considered not reusable if errors happen during the request.
# Prometheus
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/prom.md#prometheus)
Pingora has a built-in prometheus HTTP metric server for scraping.
...
let mut prometheus\_service\_http = Service::prometheus\_http\_service();
prometheus\_service\_http.add\_tcp("0.0.0.0:1234");
my\_server.add\_service(prometheus\_service\_http);
my\_server.run\_forever();
The simplest way to use it is to have [static metrics](https://docs.rs/prometheus/latest/prometheus/#static-metrics).
static MY\_COUNTER: Lazy<IntGauge\> = Lazy::new(|| {
register\_int\_gauge!("my\_counter", "my counter").unwrap()
});
This static metric will automatically appear in the Prometheus metric endpoint.
# **RateLimiter quickstart**
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/rate_limiter.md#ratelimiter-quickstart)
Pingora provides a crate pingora-limits which provides a simple and easy to use rate limiter for your application. Below is an example of how you can use [Rate](https://docs.rs/pingora-limits/latest/pingora_limits/rate/struct.Rate.html) to create an application that uses multiple limiters to restrict the rate at which requests can be made on a per-app basis (determined by a request header).
## Steps
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/rate_limiter.md#steps)
1. Add the following dependencies to your Cargo.toml:
async-trait\="0.1"
pingora = { version = "0.3", features = \[ "lb" \] }
pingora-limits = "0.3.0"
once\_cell = "1.19.0"
2. Declare a global rate limiter map to store the rate limiter for each client. In this example, we use appid.
3. Override the request_filter method in the ProxyHttp trait to implement rate limiting.
1. Retrieve the client appid from header.
2. Retrieve the current window requests from the rate limiter map. If there is no rate limiter for the client, create a new one and insert it into the map.
3. If the current window requests exceed the limit, return 429 and set RateLimiter associated headers.
4. If the request is not rate limited, return Ok(false) to continue the request.
## Example
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/rate_limiter.md#example)
use async\_trait::async\_trait;
use once\_cell::sync::Lazy;
use pingora::http::ResponseHeader;
use pingora::prelude::\*;
use pingora\_limits::rate::Rate;
use std::sync::Arc;
use std::time::Duration;
fn main() {
let mut server = Server::new(Some(Opt::default())).unwrap();
server.bootstrap();
let mut upstreams = LoadBalancer::try\_from\_iter(\["1.1.1.1:443", "1.0.0.1:443"\]).unwrap();
// Set health check
let hc = TcpHealthCheck::new();
upstreams.set\_health\_check(hc);
upstreams.health\_check\_frequency = Some(Duration::from\_secs(1));
// Set background service
let background = background\_service("health check", upstreams);
let upstreams = background.task();
// Set load balancer
let mut lb = http\_proxy\_service(&server.configuration, LB(upstreams));
lb.add\_tcp("0.0.0.0:6188");
// let rate = Rate
server.add\_service(background);
server.add\_service(lb);
server.run\_forever();
}
pub struct LB(Arc<LoadBalancer<RoundRobin\>\>);
impl LB {
pub fn get\_request\_appid(&self, session: &mut Session) -> Option<String\> {
match session
.req\_header()
.headers
.get("appid")
.map(|v| v.to\_str())
{
None => None,
Some(v) => match v {
Ok(v) => Some(v.to\_string()),
Err(\_) => None,
},
}
}
}
// Rate limiter
static RATE\_LIMITER: Lazy<Rate\> = Lazy::new(|| Rate::new(Duration::from\_secs(1)));
// max request per second per client
static MAX\_REQ\_PER\_SEC: isize = 1;
#\[async\_trait\]
impl ProxyHttp for LB {
type CTX = ();
fn new\_ctx(&self) {}
async fn upstream\_peer(
&self,
\_session: &mut Session,
\_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer\>\> {
let upstream = self.0.select(b"", 256).unwrap();
// Set SNI
let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to\_string()));
Ok(peer)
}
async fn upstream\_request\_filter(
&self,
\_session: &mut Session,
upstream\_request: &mut RequestHeader,
\_ctx: &mut Self::CTX,
) -> Result<()\>
where
Self::CTX: Send + Sync,
{
upstream\_request
.insert\_header("Host", "one.one.one.one")
.unwrap();
Ok(())
}
async fn request\_filter(&self, session: &mut Session, \_ctx: &mut Self::CTX) -> Result<bool\>
where
Self::CTX: Send + Sync,
{
let appid = match self.get\_request\_appid(session) {
None => return Ok(false), // no client appid found, skip rate limiting
Some(addr) => addr,
};
// retrieve the current window requests
let curr\_window\_requests = RATE\_LIMITER.observe(&appid, 1);
if curr\_window\_requests > MAX\_REQ\_PER\_SEC {
// rate limited, return 429
let mut header = ResponseHeader::build(429, None).unwrap();
header
.insert\_header("X-Rate-Limit-Limit", MAX\_REQ\_PER\_SEC.to\_string())
.unwrap();
header.insert\_header("X-Rate-Limit-Remaining", "0").unwrap();
header.insert\_header("X-Rate-Limit-Reset", "1").unwrap();
session.set\_keepalive(None);
session
.write\_response\_header(Box::new(header), true)
.await?;
return Ok(true);
}
Ok(false)
}
}
## Testing
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/rate_limiter.md#testing)
To use the example above,
1. Run your program with cargo run.
2. Verify the program is working with a few executions of curl localhost:6188 -H "appid:1" -v
- The first request should work and any later requests that arrive within 1s of a previous request should fail with:
* Trying 127.0.0.1:6188...
* Connected to localhost (127.0.0.1) port 6188 (#0)
> GET / HTTP/1.1
> Host: localhost:6188
> User-Agent: curl/7.88.1
> Accept: */*
> appid:1
>
< HTTP/1.1 429 Too Many Requests
< X-Rate-Limit-Limit: 1
< X-Rate-Limit-Remaining: 0
< X-Rate-Limit-Reset: 1
< Date: Sun, 14 Jul 2024 20:29:02 GMT
< Connection: close
<
* Closing connection 0
## Complete Example
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/rate_limiter.md#complete-example)
You can run the pre-made example code in the [pingora-proxy examples folder](https://github.com/cloudflare/pingora/tree/main/pingora-proxy/examples/rate_limiter.rs) with
cargo run --example rate_limiter
# Starting and stopping Pingora server
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/start_stop.md#starting-and-stopping-pingora-server)
A pingora server is a regular unprivileged multithreaded process.
## Start
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/start_stop.md#start)
By default, the server will run in the foreground.
A Pingora server by default takes the following command-line arguments:
Argument
Effect
default
\-d, --daemon
Daemonize the server
false
\-t, --test
Test the server conf and then exit (WIP)
false
\-c, --conf
The path to the configuration file
empty string
\-u, --upgrade
This server should gracefully upgrade a running server
false
## Stop
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/start_stop.md#stop)
A Pingora server will listen to the following signals.
### SIGINT: fast shutdown
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/start_stop.md#sigint-fast-shutdown)
Upon receiving SIGINT (ctrl + c), the server will exit immediately with no delay. All unfinished requests will be interrupted. This behavior is usually less preferred because it could break requests.
### SIGTERM: graceful shutdown
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/start_stop.md#sigterm-graceful-shutdown)
Upon receiving SIGTERM, the server will notify all its services to shutdown, wait for some preconfigured time and then exit. This behavior gives requests a grace period to finish.
### SIGQUIT: graceful upgrade
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/start_stop.md#sigquit-graceful-upgrade)
Similar to SIGTERM, but the server will also transfer all its listening sockets to a new Pingora server so that there is no downtime during the upgrade. See the [graceful upgrade](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/graceful.md) section for more details.
# Systemd integration
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/systemd.md#systemd-integration)
A Pingora server doesn't depend on systemd but it can easily be made into a systemd service.
\[Service\]
Type\=forking
PIDFile\=/run/pingora.pid
ExecStart\=/bin/pingora -d -c /etc/pingora.conf
ExecReload\=kill -QUIT $MAINPID
ExecReload\=/bin/pingora -u -d -c /etc/pingora.conf
The example systemd setup integrates Pingora's graceful upgrade into systemd. To upgrade the pingora service, simply install a version of the binary and then call systemctl reload pingora.service.
# Systemd integration
[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/systemd.md#systemd-integration)
A Pingora server doesn't depend on systemd but it can easily be made into a systemd service.
\[Service\]
Type\=forking
PIDFile\=/run/pingora.pid
ExecStart\=/bin/pingora -d -c /etc/pingora.conf
ExecReload\=kill -QUIT $MAINPID
ExecReload\=/bin/pingora -u -d -c /etc/pingora.conf
The example systemd setup integrates Pingora's graceful upgrade into systemd. To upgrade the pingora service, simply install a version of the binary and then call systemctl reload pingora.service.
## gateway.rs example
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use bytes::Bytes;
use clap::Parser;
use log::info;
use prometheus::register_int_counter;
use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_http::ResponseHeader;
use pingora_proxy::{ProxyHttp, Session};
fn check_login(req: &pingora_http::RequestHeader) -> bool {
// implement you logic check logic here
req.headers.get("Authorization").map(|v| v.as_bytes()) == Some(b"password")
}
pub struct MyGateway {
req_metric: prometheus::IntCounter,
}
#[async_trait]
impl ProxyHttp for MyGateway {
type CTX = ();
fn new_ctx(&self) -> Self::CTX {}
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
if session.req_header().uri.path().starts_with("/login")
&& !check_login(session.req_header())
{
let _ = session
.respond_error_with_body(403, Bytes::from_static(b"no way!"))
.await;
// true: early return as the response is already written
return Ok(true);
}
Ok(false)
}
async fn upstream_peer(
&self,
session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
let addr = if session.req_header().uri.path().starts_with("/family") {
("1.0.0.1", 443)
} else {
("1.1.1.1", 443)
};
info!("connecting to {addr:?}");
let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to_string()));
Ok(peer)
}
async fn response_filter(
&self,
_session: &mut Session,
upstream_response: &mut ResponseHeader,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
// replace existing header if any
upstream_response
.insert_header("Server", "MyGateway")
.unwrap();
// because we don't support h3
upstream_response.remove_header("alt-svc");
Ok(())
}
async fn logging(
&self,
session: &mut Session,
_e: Option<&pingora_core::Error>,
ctx: &mut Self::CTX,
) {
let response_code = session
.response_written()
.map_or(0, |resp| resp.status.as_u16());
info!(
"{} response code: {response_code}",
self.request_summary(session, ctx)
);
self.req_metric.inc();
}
}
// RUST_LOG=INFO cargo run --example gateway
// curl 127.0.0.1:6191 -H "Host: one.one.one.one"
// curl 127.0.0.1:6190/family/ -H "Host: one.one.one.one"
// curl 127.0.0.1:6191/login/ -H "Host: one.one.one.one" -I -H "Authorization: password"
// curl 127.0.0.1:6191/login/ -H "Host: one.one.one.one" -I -H "Authorization: bad"
// For metrics
// curl 127.0.0.1:6192/
fn main() {
env_logger::init();
// read command line arguments
let opt = Opt::parse();
let mut my_server = Server::new(Some(opt)).unwrap();
my_server.bootstrap();
let mut my_proxy = pingora_proxy::http_proxy_service(
&my_server.configuration,
MyGateway {
req_metric: register_int_counter!("req_counter", "Number of requests").unwrap(),
},
);
my_proxy.add_tcp("0.0.0.0:6191");
my_server.add_service(my_proxy);
let mut prometheus_service_http =
pingora_core::services::listening::Service::prometheus_http_service();
prometheus_service_http.add_tcp("127.0.0.1:6192");
my_server.add_service(prometheus_service_http);
my_server.run_forever();
}
## ctx.rs
use async_trait::async_trait;
use clap::Parser;
use log::info;
use std::sync::Mutex;
use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_proxy::{ProxyHttp, Session};
// global counter
static REQ_COUNTER: Mutex<usize> = Mutex::new(0);
pub struct MyProxy {
// counter for the service
beta_counter: Mutex<usize>, // AtomicUsize works too
}
pub struct MyCtx {
beta_user: bool,
}
fn check_beta_user(req: &pingora_http::RequestHeader) -> bool {
// some simple logic to check if user is beta
req.headers.get("beta-flag").is_some()
}
#[async_trait]
impl ProxyHttp for MyProxy {
type CTX = MyCtx;
fn new_ctx(&self) -> Self::CTX {
MyCtx { beta_user: false }
}
async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
ctx.beta_user = check_beta_user(session.req_header());
Ok(false)
}
async fn upstream_peer(
&self,
_session: &mut Session,
ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
let mut req_counter = REQ_COUNTER.lock().unwrap();
*req_counter += 1;
let addr = if ctx.beta_user {
let mut beta_count = self.beta_counter.lock().unwrap();
*beta_count += 1;
info!("I'm a beta user #{beta_count}");
("1.0.0.1", 443)
} else {
info!("I'm an user #{req_counter}");
("1.1.1.1", 443)
};
let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to_string()));
Ok(peer)
}
}
// RUST_LOG=INFO cargo run --example ctx
// curl 127.0.0.1:6190 -H "Host: one.one.one.one"
// curl 127.0.0.1:6190 -H "Host: one.one.one.one" -H "beta-flag: 1"
fn main() {
env_logger::init();
// read command line arguments
let opt = Opt::parse();
let mut my_server = Server::new(Some(opt)).unwrap();
my_server.bootstrap();
let mut my_proxy = pingora_proxy::http_proxy_service(
&my_server.configuration,
MyProxy {
beta_counter: Mutex::new(0),
},
);
my_proxy.add_tcp("0.0.0.0:6190");
my_server.add_service(my_proxy);
my_server.run_forever();
}
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use clap::Parser;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_core::{
modules::http::{
grpc_web::{GrpcWeb, GrpcWebBridge},
HttpModules,
},
prelude::Opt,
};
use pingora_proxy::{ProxyHttp, Session};
/// This example shows how to use the gRPC-web bridge module
pub struct GrpcWebBridgeProxy;
#[async_trait]
impl ProxyHttp for GrpcWebBridgeProxy {
type CTX = ();
fn new_ctx(&self) -> Self::CTX {}
fn init_downstream_modules(&self, modules: &mut HttpModules) {
// Add the gRPC web module
modules.add_module(Box::new(GrpcWeb))
}
async fn early_request_filter(
&self,
session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<()> {
let grpc = session
.downstream_modules_ctx
.get_mut::<GrpcWebBridge>()
.expect("GrpcWebBridge module added");
// initialize gRPC module for this request
grpc.init();
Ok(())
}
async fn upstream_peer(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
// this needs to be your gRPC server
let grpc_peer = Box::new(HttpPeer::new(
("1.1.1.1", 443),
true,
"one.one.one.one".to_string(),
));
Ok(grpc_peer)
}
}
// RUST_LOG=INFO cargo run --example grpc_web_module
fn main() {
env_logger::init();
// read command line arguments
let opt = Opt::parse();
let mut my_server = Server::new(Some(opt)).unwrap();
my_server.bootstrap();
let mut my_proxy =
pingora_proxy::http_proxy_service(&my_server.configuration, GrpcWebBridgeProxy);
my_proxy.add_tcp("0.0.0.0:6194");
my_server.add_service(my_proxy);
my_server.run_forever();
}
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use clap::Parser;
use log::info;
use pingora_core::services::background::background_service;
use std::{sync::Arc, time::Duration};
use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_load_balancing::{health_check, selection::RoundRobin, LoadBalancer};
use pingora_proxy::{ProxyHttp, Session};
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
#[async_trait]
impl ProxyHttp for LB {
type CTX = ();
fn new_ctx(&self) -> Self::CTX {}
async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
let upstream = self
.0
.select(b"", 256) // hash doesn't matter
.unwrap();
info!("upstream peer is: {:?}", upstream);
let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
Ok(peer)
}
async fn upstream_request_filter(
&self,
_session: &mut Session,
upstream_request: &mut pingora_http::RequestHeader,
_ctx: &mut Self::CTX,
) -> Result<()> {
upstream_request
.insert_header("Host", "one.one.one.one")
.unwrap();
Ok(())
}
}
// RUST_LOG=INFO cargo run --example load_balancer
fn main() {
env_logger::init();
// read command line arguments
let opt = Opt::parse();
let mut my_server = Server::new(Some(opt)).unwrap();
my_server.bootstrap();
// 127.0.0.1:343" is just a bad server
let mut upstreams =
LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443", "127.0.0.1:343"]).unwrap();
// We add health check in the background so that the bad server is never selected.
let hc = health_check::TcpHealthCheck::new();
upstreams.set_health_check(hc);
upstreams.health_check_frequency = Some(Duration::from_secs(1));
let background = background_service("health check", upstreams);
let upstreams = background.task();
let mut lb = pingora_proxy::http_proxy_service(&my_server.configuration, LB(upstreams));
lb.add_tcp("0.0.0.0:6188");
let cert_path = format!("{}/tests/keys/server.crt", env!("CARGO_MANIFEST_DIR"));
let key_path = format!("{}/tests/keys/key.pem", env!("CARGO_MANIFEST_DIR"));
let mut tls_settings =
pingora_core::listeners::tls::TlsSettings::intermediate(&cert_path, &key_path).unwrap();
tls_settings.enable_h2();
lb.add_tls_with_settings("0.0.0.0:6189", None, tls_settings);
my_server.add_service(lb);
my_server.add_service(background);
my_server.run_forever();
}
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use bytes::Bytes;
use clap::Parser;
use serde::{Deserialize, Serialize};
use std::net::ToSocketAddrs;
use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_http::ResponseHeader;
use pingora_proxy::{ProxyHttp, Session};
const HOST: &str = "ip.jsontest.com";
#[derive(Serialize, Deserialize)]
pub struct Resp {
ip: String,
}
pub struct Json2Yaml {
addr: std::net::SocketAddr,
}
pub struct MyCtx {
buffer: Vec<u8>,
}
#[async_trait]
impl ProxyHttp for Json2Yaml {
type CTX = MyCtx;
fn new_ctx(&self) -> Self::CTX {
MyCtx { buffer: vec![] }
}
async fn upstream_peer(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
let peer = Box::new(HttpPeer::new(self.addr, false, HOST.to_owned()));
Ok(peer)
}
async fn upstream_request_filter(
&self,
_session: &mut Session,
upstream_request: &mut pingora_http::RequestHeader,
_ctx: &mut Self::CTX,
) -> Result<()> {
upstream_request
.insert_header("Host", HOST.to_owned())
.unwrap();
Ok(())
}
async fn response_filter(
&self,
_session: &mut Session,
upstream_response: &mut ResponseHeader,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
// Remove content-length because the size of the new body is unknown
upstream_response.remove_header("Content-Length");
upstream_response
.insert_header("Transfer-Encoding", "Chunked")
.unwrap();
Ok(())
}
fn response_body_filter(
&self,
_session: &mut Session,
body: &mut Option<Bytes>,
end_of_stream: bool,
ctx: &mut Self::CTX,
) -> Result<Option<std::time::Duration>>
where
Self::CTX: Send + Sync,
{
// buffer the data
if let Some(b) = body {
ctx.buffer.extend(&b[..]);
// drop the body
b.clear();
}
if end_of_stream {
// This is the last chunk, we can process the data now
let json_body: Resp = serde_json::de::from_slice(&ctx.buffer).unwrap();
let yaml_body = serde_yaml::to_string(&json_body).unwrap();
*body = Some(Bytes::copy_from_slice(yaml_body.as_bytes()));
}
Ok(None)
}
}
// RUST_LOG=INFO cargo run --example modify_response
// curl 127.0.0.1:6191
fn main() {
env_logger::init();
let opt = Opt::parse();
let mut my_server = Server::new(Some(opt)).unwrap();
my_server.bootstrap();
let mut my_proxy = pingora_proxy::http_proxy_service(
&my_server.configuration,
Json2Yaml {
// hardcode the IP of ip.jsontest.com for now
addr: ("142.251.2.121", 80)
.to_socket_addrs()
.unwrap()
.next()
.unwrap(),
},
);
my_proxy.add_tcp("127.0.0.1:6191");
my_server.add_service(my_proxy);
my_server.run_forever();
}
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use std::sync::Arc;
use pingora_core::{prelude::*, services::background::GenBackgroundService};
use pingora_load_balancing::{
health_check::TcpHealthCheck,
selection::{BackendIter, BackendSelection, RoundRobin},
LoadBalancer,
};
use pingora_proxy::{http_proxy_service, ProxyHttp, Session};
struct Router {
cluster_one: Arc<LoadBalancer<RoundRobin>>,
cluster_two: Arc<LoadBalancer<RoundRobin>>,
}
#[async_trait]
impl ProxyHttp for Router {
type CTX = ();
fn new_ctx(&self) {}
async fn upstream_peer(&self, session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
// determine LB cluster based on request uri
let cluster = if session.req_header().uri.path().starts_with("/one/") {
&self.cluster_one
} else {
&self.cluster_two
};
let upstream = cluster
.select(b"", 256) // hash doesn't matter for round robin
.unwrap();
println!("upstream peer is: {upstream:?}");
// Set SNI to one.one.one.one
let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
Ok(peer)
}
}
fn build_cluster_service<S>(upstreams: &[&str]) -> GenBackgroundService<LoadBalancer<S>>
where
S: BackendSelection + 'static,
S::Iter: BackendIter,
{
let mut cluster = LoadBalancer::try_from_iter(upstreams).unwrap();
cluster.set_health_check(TcpHealthCheck::new());
cluster.health_check_frequency = Some(std::time::Duration::from_secs(1));
background_service("cluster health check", cluster)
}
// RUST_LOG=INFO cargo run --example multi_lb
// curl 127.0.0.1:6188/one/
// curl 127.0.0.1:6188/two/
fn main() {
let mut my_server = Server::new(None).unwrap();
my_server.bootstrap();
// build multiple clusters
let cluster_one = build_cluster_service::<RoundRobin>(&["1.1.1.1:443", "127.0.0.1:343"]);
let cluster_two = build_cluster_service::<RoundRobin>(&["1.0.0.1:443", "127.0.0.2:343"]);
let router = Router {
cluster_one: cluster_one.task(),
cluster_two: cluster_two.task(),
};
let mut router_service = http_proxy_service(&my_server.configuration, router);
router_service.add_tcp("0.0.0.0:6188");
my_server.add_service(router_service);
my_server.add_service(cluster_one);
my_server.add_service(cluster_two);
my_server.run_forever();
}
use async_trait::async_trait;
use once_cell::sync::Lazy;
use pingora_core::prelude::*;
use pingora_http::{RequestHeader, ResponseHeader};
use pingora_limits::rate::Rate;
use pingora_load_balancing::prelude::{RoundRobin, TcpHealthCheck};
use pingora_load_balancing::LoadBalancer;
use pingora_proxy::{http_proxy_service, ProxyHttp, Session};
use std::sync::Arc;
use std::time::Duration;
fn main() {
let mut server = Server::new(Some(Opt::default())).unwrap();
server.bootstrap();
let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
// Set health check
let hc = TcpHealthCheck::new();
upstreams.set_health_check(hc);
upstreams.health_check_frequency = Some(Duration::from_secs(1));
// Set background service
let background = background_service("health check", upstreams);
let upstreams = background.task();
// Set load balancer
let mut lb = http_proxy_service(&server.configuration, LB(upstreams));
lb.add_tcp("0.0.0.0:6188");
// let rate = Rate
server.add_service(background);
server.add_service(lb);
server.run_forever();
}
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
impl LB {
pub fn get_request_appid(&self, session: &mut Session) -> Option<String> {
match session
.req_header()
.headers
.get("appid")
.map(|v| v.to_str())
{
None => None,
Some(v) => match v {
Ok(v) => Some(v.to_string()),
Err(_) => None,
},
}
}
}
// Rate limiter
static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));
// max request per second per client
static MAX_REQ_PER_SEC: isize = 1;
#[async_trait]
impl ProxyHttp for LB {
type CTX = ();
fn new_ctx(&self) {}
async fn upstream_peer(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
let upstream = self.0.select(b"", 256).unwrap();
// Set SNI
let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
Ok(peer)
}
async fn upstream_request_filter(
&self,
_session: &mut Session,
upstream_request: &mut RequestHeader,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
upstream_request
.insert_header("Host", "one.one.one.one")
.unwrap();
Ok(())
}
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
where
Self::CTX: Send + Sync,
{
let appid = match self.get_request_appid(session) {
None => return Ok(false), // no client appid found, skip rate limiting
Some(addr) => addr,
};
// retrieve the current window requests
let curr_window_requests = RATE_LIMITER.observe(&appid, 1);
if curr_window_requests > MAX_REQ_PER_SEC {
// rate limited, return 429
let mut header = ResponseHeader::build(429, None).unwrap();
header
.insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string())
.unwrap();
header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
session.set_keepalive(None);
session
.write_response_header(Box::new(header), true)
.await?;
return Ok(true);
}
Ok(false)
}
}
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use clap::Parser;
use pingora_core::modules::http::HttpModules;
use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_http::RequestHeader;
use pingora_proxy::{ProxyHttp, Session};
/// This example shows how to build and import 3rd party modules
/// A simple ACL to check "Authorization: basic $credential" header
mod my_acl {
use super::*;
use pingora_core::modules::http::{HttpModule, HttpModuleBuilder, Module};
use pingora_error::{Error, ErrorType::HTTPStatus};
use std::any::Any;
// This is the struct for per request module context
struct MyAclCtx {
credential_header: String,
}
// Implement how the module would consume and/or modify request and/or response
#[async_trait]
impl HttpModule for MyAclCtx {
async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
let Some(auth) = req.headers.get(http::header::AUTHORIZATION) else {
return Error::e_explain(HTTPStatus(403), "Auth failed, no auth header");
};
if auth.as_bytes() != self.credential_header.as_bytes() {
Error::e_explain(HTTPStatus(403), "Auth failed, credential mismatch")
} else {
Ok(())
}
}
// boilerplate code for all modules
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
// This is the singleton object which will be attached to the server
pub struct MyAcl {
pub credential: String,
}
impl HttpModuleBuilder for MyAcl {
// This function defines how to create each Ctx. This function is called when a new request
// arrives
fn init(&self) -> Module {
Box::new(MyAclCtx {
// Make it easier to compare header
// We could also store this value in MyAcl and use Arc to share it with every Ctx.
credential_header: format!("basic {}", self.credential),
})
}
}
}
pub struct MyProxy;
#[async_trait]
impl ProxyHttp for MyProxy {
type CTX = ();
fn new_ctx(&self) -> Self::CTX {}
// This function is only called once when the server starts
fn init_downstream_modules(&self, modules: &mut HttpModules) {
// Add the module to MyProxy
modules.add_module(Box::new(my_acl::MyAcl {
credential: "testcode".into(),
}))
}
async fn upstream_peer(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
let peer = Box::new(HttpPeer::new(
("1.1.1.1", 443),
true,
"one.one.one.one".to_string(),
));
Ok(peer)
}
}
// RUST_LOG=INFO cargo run --example use_module
// curl 127.0.0.1:6193 -H "Host: one.one.one.one" -v
// curl 127.0.0.1:6193 -H "Host: one.one.one.one" -H "Authorization: basic testcode"
// curl 127.0.0.1:6193 -H "Host: one.one.one.one" -H "Authorization: basic wrong" -v
fn main() {
env_logger::init();
// read command line arguments
let opt = Opt::parse();
let mut my_server = Server::new(Some(opt)).unwrap();
my_server.bootstrap();
let mut my_proxy = pingora_proxy::http_proxy_service(&my_server.configuration, MyProxy);
my_proxy.add_tcp("0.0.0.0:6193");
my_server.add_service(my_proxy);
my_server.run_forever();
}
pingora_proxy/src/lib.rs
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! # pingora-proxy
//!
//! Programmable HTTP proxy built on top of [pingora_core].
//!
//! # Features
//! - HTTP/1.x and HTTP/2 for both downstream and upstream
//! - Connection pooling
//! - TLSv1.3, mutual TLS, customizable CA
//! - Request/Response scanning, modification or rejection
//! - Dynamic upstream selection
//! - Configurable retry and failover
//! - Fully programmable and customizable at any stage of a HTTP request
//!
//! # How to use
//!
//! Users of this crate defines their proxy by implementing [ProxyHttp] trait, which contains the
//! callbacks to be invoked at each stage of a HTTP request.
//!
//! Then the service can be passed into [http_proxy_service()] for a [pingora_core::server::Server] to
//! run it.
//!
//! See examples/load_balancer.rs for a detailed example.
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::FutureExt;
use http::{header, version::Version};
use log::{debug, error, trace, warn};
use once_cell::sync::Lazy;
use pingora_http::{RequestHeader, ResponseHeader};
use std::fmt::Debug;
use std::str;
use std::sync::Arc;
use tokio::sync::{mpsc, Notify};
use tokio::time;
use pingora_cache::NoCacheReason;
use pingora_core::apps::{HttpServerApp, HttpServerOptions};
use pingora_core::connectors::{http::Connector, ConnectorOptions};
use pingora_core::modules::http::compression::ResponseCompressionBuilder;
use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
use pingora_core::protocols::http::client::HttpSession as ClientSession;
use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
use pingora_core::protocols::http::HttpTask;
use pingora_core::protocols::http::ServerSession as HttpSession;
use pingora_core::protocols::http::SERVER_NAME;
use pingora_core::protocols::Stream;
use pingora_core::protocols::{Digest, UniqueID};
use pingora_core::server::configuration::ServerConf;
use pingora_core::server::ShutdownWatch;
use pingora_core::upstreams::peer::{HttpPeer, Peer};
use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};
const MAX_RETRIES: usize = 16;
const TASK_BUFFER_SIZE: usize = 4;
mod proxy_cache;
mod proxy_common;
mod proxy_h1;
mod proxy_h2;
mod proxy_purge;
mod proxy_trait;
pub mod subrequest;
use subrequest::Ctx as SubReqCtx;
pub use proxy_cache::range_filter::{range_header_filter, RangeType};
pub use proxy_purge::PurgeStatus;
pub use proxy_trait::ProxyHttp;
pub mod prelude {
pub use crate::{http_proxy_service, ProxyHttp, Session};
}
/// The concrete type that holds the user defined HTTP proxy.
///
/// Users don't need to interact with this object directly.
pub struct HttpProxy<SV> {
inner: SV, // TODO: name it better than inner
client_upstream: Connector,
shutdown: Notify,
pub server_options: Option<HttpServerOptions>,
pub downstream_modules: HttpModules,
}
impl<SV> HttpProxy<SV> {
fn new(inner: SV, conf: Arc<ServerConf>) -> Self {
HttpProxy {
inner,
client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
shutdown: Notify::new(),
server_options: None,
downstream_modules: HttpModules::new(),
}
}
fn handle_init_modules(&mut self)
where
SV: ProxyHttp,
{
self.inner
.init_downstream_modules(&mut self.downstream_modules);
}
async fn handle_new_request(
&self,
mut downstream_session: Box<HttpSession>,
) -> Option<Box<HttpSession>>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
// phase 1 read request header
let res = tokio::select! {
biased; // biased select is cheaper, and we don't want to drop already buffered requests
res = downstream_session.read_request() => { res }
_ = self.shutdown.notified() => {
// service shutting down, dropping the connection to stop more req from coming in
return None;
}
};
match res {
Ok(true) => {
// TODO: check n==0
debug!("Successfully get a new request");
}
Ok(false) => {
return None; // TODO: close connection?
}
Err(mut e) => {
e.as_down();
error!("Fail to proxy: {e}");
if matches!(e.etype, InvalidHTTPHeader) {
downstream_session
.respond_error(400)
.await
.unwrap_or_else(|e| {
error!("failed to send error response to downstream: {e}");
});
} // otherwise the connection must be broken, no need to send anything
downstream_session.shutdown().await;
return None;
}
}
trace!(
"Request header: {:?}",
downstream_session.req_header().as_ref()
);
Some(downstream_session)
}
// return bool: server_session can be reused, and error if any
async fn proxy_to_upstream(
&self,
session: &mut Session,
ctx: &mut SV::CTX,
) -> (bool, Option<Box<Error>>)
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
let peer = match self.inner.upstream_peer(session, ctx).await {
Ok(p) => p,
Err(e) => return (false, Some(e)),
};
let client_session = self.client_upstream.get_http_session(&*peer).await;
match client_session {
Ok((client_session, client_reused)) => {
let (server_reused, error) = match client_session {
ClientSession::H1(mut h1) => {
let (server_reused, client_reuse, error) = self
.proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
.await;
if client_reuse {
let session = ClientSession::H1(h1);
self.client_upstream
.release_http_session(session, &*peer, peer.idle_timeout())
.await;
}
(server_reused, error)
}
ClientSession::H2(mut h2) => {
let (server_reused, mut error) = self
.proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
.await;
let session = ClientSession::H2(h2);
self.client_upstream
.release_http_session(session, &*peer, peer.idle_timeout())
.await;
if let Some(e) = error.as_mut() {
// try to downgrade if A. origin says so or B. origin sends an invalid
// response, which usually means origin h2 is not production ready
if matches!(e.etype, H2Downgrade | InvalidH2) {
if peer
.get_alpn()
.map_or(true, |alpn| alpn.get_min_http_version() == 1)
{
// Add the peer to prefer h1 so that all following requests
// will use h1
self.client_upstream.prefer_h1(&*peer);
} else {
// the peer doesn't allow downgrading to h1 (e.g. gRPC)
e.retry = false.into();
}
}
}
(server_reused, error)
}
};
(
server_reused,
error.map(|e| {
self.inner
.error_while_proxy(&peer, session, e, ctx, client_reused)
}),
)
}
Err(mut e) => {
e.as_up();
let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
(false, Some(new_err.into_up()))
}
}
}
fn upstream_filter(
&self,
session: &mut Session,
task: &mut HttpTask,
ctx: &mut SV::CTX,
) -> Result<()>
where
SV: ProxyHttp,
{
match task {
HttpTask::Header(header, _eos) => {
self.inner.upstream_response_filter(session, header, ctx)
}
HttpTask::Body(data, eos) => self
.inner
.upstream_response_body_filter(session, data, *eos, ctx)?,
HttpTask::Trailer(Some(trailers)) => self
.inner
.upstream_response_trailer_filter(session, trailers, ctx)?,
_ => {
// task does not support a filter
}
}
Ok(())
}
async fn finish(
&self,
mut session: Session,
ctx: &mut SV::CTX,
reuse: bool,
error: Option<&Error>,
) -> Option<Stream>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
self.inner.logging(&mut session, error, ctx).await;
if reuse {
// TODO: log error
session.downstream_session.finish().await.ok().flatten()
} else {
None
}
}
}
use pingora_cache::HttpCache;
use pingora_core::protocols::http::compression::ResponseCompressionCtx;
/// The established HTTP session
///
/// This object is what users interact with in order to access the request itself or change the proxy
/// behavior.
pub struct Session {
/// the HTTP session to downstream (the client)
pub downstream_session: Box<HttpSession>,
/// The interface to control HTTP caching
pub cache: HttpCache,
/// (de)compress responses coming into the proxy (from upstream)
pub upstream_compression: ResponseCompressionCtx,
/// ignore downstream range (skip downstream range filters)
pub ignore_downstream_range: bool,
// the context from parent request
subrequest_ctx: Option<Box<SubReqCtx>>,
// Downstream filter modules
pub downstream_modules_ctx: HttpModuleCtx,
}
impl Session {
fn new(
downstream_session: impl Into<Box<HttpSession>>,
downstream_modules: &HttpModules,
) -> Self {
Session {
downstream_session: downstream_session.into(),
cache: HttpCache::new(),
// disable both upstream and downstream compression
upstream_compression: ResponseCompressionCtx::new(0, false, false),
ignore_downstream_range: false,
subrequest_ctx: None,
downstream_modules_ctx: downstream_modules.build_ctx(),
}
}
/// Create a new [Session] from the given [Stream]
///
/// This function is mostly used for testing and mocking.
pub fn new_h1(stream: Stream) -> Self {
let modules = HttpModules::new();
Self::new(Box::new(HttpSession::new_http1(stream)), &modules)
}
/// Create a new [Session] from the given [Stream] with modules
///
/// This function is mostly used for testing and mocking.
pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
Self::new(Box::new(HttpSession::new_http1(stream)), downstream_modules)
}
pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
&mut self.downstream_session
}
pub fn as_downstream(&self) -> &HttpSession {
&self.downstream_session
}
/// Write HTTP response with the given error code to the downstream.
pub async fn respond_error(&mut self, error: u16) -> Result<()> {
self.as_downstream_mut().respond_error(error).await
}
/// Write HTTP response with the given error code to the downstream with a body.
pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
self.as_downstream_mut()
.respond_error_with_body(error, body)
.await
}
/// Write the given HTTP response header to the downstream
///
/// Different from directly calling [HttpSession::write_response_header], this function also
/// invokes the filter modules.
pub async fn write_response_header(
&mut self,
mut resp: Box<ResponseHeader>,
end_of_stream: bool,
) -> Result<()> {
self.downstream_modules_ctx
.response_header_filter(&mut resp, end_of_stream)
.await?;
self.downstream_session.write_response_header(resp).await
}
/// Write the given HTTP response body chunk to the downstream
///
/// Different from directly calling [HttpSession::write_response_body], this function also
/// invokes the filter modules.
pub async fn write_response_body(
&mut self,
mut body: Option<Bytes>,
end_of_stream: bool,
) -> Result<()> {
self.downstream_modules_ctx
.response_body_filter(&mut body, end_of_stream)?;
if body.is_none() && !end_of_stream {
return Ok(());
}
let data = body.unwrap_or_default();
self.downstream_session
.write_response_body(data, end_of_stream)
.await
}
pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
for task in tasks.iter_mut() {
match task {
HttpTask::Header(resp, end) => {
self.downstream_modules_ctx
.response_header_filter(resp, *end)
.await?;
}
HttpTask::Body(data, end) => {
self.downstream_modules_ctx
.response_body_filter(data, *end)?;
}
HttpTask::Trailer(trailers) => {
if let Some(buf) = self
.downstream_modules_ctx
.response_trailer_filter(trailers)?
{
// Write the trailers into the body if the filter
// returns a buffer.
//
// Note, this will not work if end of stream has already
// been seen or we've written content-length bytes.
*task = HttpTask::Body(Some(buf), true);
}
}
_ => { /* Done or Failed */ }
}
}
self.downstream_session.response_duplex_vec(tasks).await
}
}
impl AsRef<HttpSession> for Session {
fn as_ref(&self) -> &HttpSession {
&self.downstream_session
}
}
impl AsMut<HttpSession> for Session {
fn as_mut(&mut self) -> &mut HttpSession {
&mut self.downstream_session
}
}
use std::ops::{Deref, DerefMut};
impl Deref for Session {
type Target = HttpSession;
fn deref(&self) -> &Self::Target {
&self.downstream_session
}
}
impl DerefMut for Session {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.downstream_session
}
}
// generic HTTP 502 response sent when proxy_upstream_filter refuses to connect to upstream
static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
resp.insert_header(header::SERVER, &SERVER_NAME[..])
.unwrap();
resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
resp.insert_header(header::CACHE_CONTROL, "private, no-store")
.unwrap();
resp
});
impl<SV> HttpProxy<SV> {
async fn process_request(
self: &Arc<Self>,
mut session: Session,
mut ctx: <SV as ProxyHttp>::CTX,
) -> Option<Stream>
where
SV: ProxyHttp + Send + Sync + 'static,
<SV as ProxyHttp>::CTX: Send + Sync,
{
if let Err(e) = self
.inner
.early_request_filter(&mut session, &mut ctx)
.await
{
self.handle_error(&mut session, &mut ctx, e, "Fail to early filter request:")
.await;
return None;
}
let req = session.downstream_session.req_header_mut();
// Built-in downstream request filters go first
if let Err(e) = session
.downstream_modules_ctx
.request_header_filter(req)
.await
{
self.handle_error(
&mut session,
&mut ctx,
e,
"Failed in downstream modules request filter:",
)
.await;
return None;
}
match self.inner.request_filter(&mut session, &mut ctx).await {
Ok(response_sent) => {
if response_sent {
// TODO: log error
self.inner.logging(&mut session, None, &mut ctx).await;
return session.downstream_session.finish().await.ok().flatten();
}
/* else continue */
}
Err(e) => {
self.handle_error(&mut session, &mut ctx, e, "Fail to filter request:")
.await;
return None;
}
}
if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
// cache hit
return self.finish(session, &mut ctx, reuse, err.as_deref()).await;
}
// either uncacheable, or cache miss
// decide if the request is allowed to go to upstream
match self
.inner
.proxy_upstream_filter(&mut session, &mut ctx)
.await
{
Ok(proxy_to_upstream) => {
if !proxy_to_upstream {
// The hook can choose to write its own response, but if it doesn't, we respond
// with a generic 502
if session.cache.enabled() {
// drop the cache lock that this request may be holding onto
session.cache.disable(NoCacheReason::DeclinedToUpstream);
}
if session.response_written().is_none() {
match session.write_response_header_ref(&BAD_GATEWAY).await {
Ok(()) => {}
Err(e) => {
self.handle_error(
&mut session,
&mut ctx,
e,
"Error responding with Bad Gateway:",
)
.await;
return None;
}
}
}
return self.finish(session, &mut ctx, false, None).await;
}
/* else continue */
}
Err(e) => {
if session.cache.enabled() {
session.cache.disable(NoCacheReason::InternalError);
}
self.handle_error(
&mut session,
&mut ctx,
e,
"Error deciding if we should proxy to upstream:",
)
.await;
return None;
}
}
let mut retries: usize = 0;
let mut server_reuse = false;
let mut proxy_error: Option<Box<Error>> = None;
while retries < MAX_RETRIES {
retries += 1;
let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
server_reuse = reuse;
match e {
Some(error) => {
let retry = error.retry();
proxy_error = Some(error);
if !retry {
break;
}
// only log error that will be retried here, the final error will be logged below
warn!(
"Fail to proxy: {}, tries: {}, retry: {}, {}",
proxy_error.as_ref().unwrap(),
retries,
retry,
self.inner.request_summary(&session, &ctx)
);
}
None => {
proxy_error = None;
break;
}
};
}
// serve stale if error
// Check both error and cache before calling the function because await is not cheap
let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
.await
} else {
None
};
let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
// don't reuse server conn if serve stale polluted it
server_reuse = server_reuse && reuse;
stale_cache_error
} else {
proxy_error
};
if let Some(e) = final_error.as_ref() {
// If we have errored and are still holding a cache lock, release it.
if session.cache.enabled() {
let reason = if *e.esource() == ErrorSource::Upstream {
NoCacheReason::UpstreamError
} else {
NoCacheReason::InternalError
};
session.cache.disable(reason);
}
let status = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;
// final error will have > 0 status unless downstream connection is dead
if !self.inner.suppress_error_log(&session, &ctx, e) {
error!(
"Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
final_error.as_ref().unwrap(),
status,
retries,
false, // we never retry here
self.inner.request_summary(&session, &ctx)
);
}
}
// logging() will be called in finish()
self.finish(session, &mut ctx, server_reuse, final_error.as_deref())
.await
}
async fn handle_error(
&self,
session: &mut Session,
ctx: &mut <SV as ProxyHttp>::CTX,
e: Box<Error>,
context: &str,
) where
SV: ProxyHttp + Send + Sync + 'static,
<SV as ProxyHttp>::CTX: Send + Sync,
{
if !self.inner.suppress_error_log(session, ctx, &e) {
error!(
"{context} {}, {}",
e,
self.inner.request_summary(session, ctx)
);
}
self.inner.fail_to_proxy(session, &e, ctx).await;
self.inner.logging(session, Some(&e), ctx).await;
}
}
/* Make process_subrequest() a trait to workaround https://github.com/rust-lang/rust/issues/78649
if process_subrequest() is implemented as a member of HttpProxy, rust complains
error[E0391]: cycle detected when computing type of proxy_cache::<impl at pingora-proxy/src/proxy_cache.rs:7:1: 7:23>::proxy_cache::{opaque#0}
--> pingora-proxy/src/proxy_cache.rs:13:10
|
13 | ) -> Option<(bool, Option<Box<Error>>)>
*/
#[async_trait]
trait Subrequest {
async fn process_subrequest(
self: &Arc<Self>,
session: Box<HttpSession>,
sub_req_ctx: Box<SubReqCtx>,
);
}
#[async_trait]
impl<SV> Subrequest for HttpProxy<SV>
where
SV: ProxyHttp + Send + Sync + 'static,
<SV as ProxyHttp>::CTX: Send + Sync,
{
async fn process_subrequest(
self: &Arc<Self>,
session: Box<HttpSession>,
sub_req_ctx: Box<SubReqCtx>,
) {
debug!("starting subrequest");
let mut session = match self.handle_new_request(session).await {
Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
None => return, // bad request
};
// no real downstream to keepalive, but it doesn't matter what is set here because at the end
// of this fn the dummy connection will be dropped
session.set_keepalive(None);
session.subrequest_ctx.replace(sub_req_ctx);
trace!("processing subrequest");
let ctx = self.inner.new_ctx();
self.process_request(session, ctx).await;
trace!("subrequest done");
}
}
#[async_trait]
impl<SV> HttpServerApp for HttpProxy<SV>
where
SV: ProxyHttp + Send + Sync + 'static,
<SV as ProxyHttp>::CTX: Send + Sync,
{
async fn process_new_http(
self: &Arc<Self>,
session: HttpSession,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
let session = Box::new(session);
// TODO: keepalive pool, use stack
let mut session = match self.handle_new_request(session).await {
Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
None => return None, // bad request
};
if *shutdown.borrow() {
// stop downstream from reusing if this service is shutting down soon
session.set_keepalive(None);
} else {
// default 60s
session.set_keepalive(Some(60));
}
let ctx = self.inner.new_ctx();
self.process_request(session, ctx).await
}
async fn http_cleanup(&self) {
// Notify all keepalived requests blocking on read_request() to abort
self.shutdown.notify_waiters();
// TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down()
}
fn server_options(&self) -> Option<&HttpServerOptions> {
self.server_options.as_ref()
}
// TODO implement h2_options
}
use pingora_core::services::listening::Service;
/// Create a [Service] from the user implemented [ProxyHttp].
///
/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>>
where
SV: ProxyHttp,
{
http_proxy_service_with_name(conf, inner, "Pingora HTTP Proxy Service")
}
/// Create a [Service] from the user implemented [ProxyHttp].
///
/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
pub fn http_proxy_service_with_name<SV>(
conf: &Arc<ServerConf>,
inner: SV,
name: &str,
) -> Service<HttpProxy<SV>>
where
SV: ProxyHttp,
{
let mut proxy = HttpProxy::new(inner, conf.clone());
proxy.handle_init_modules();
Service::new(name.to_string(), proxy)
}
// proxy_common.rs
/// Possible downstream states during request multiplexing
#[derive(Debug, Clone, Copy)]
pub(crate) enum DownstreamStateMachine {
/// more request (body) to read
Reading,
/// no more data to read
ReadingFinished,
/// downstream is already errored or closed
Errored,
}
#[allow(clippy::wrong_self_convention)]
impl DownstreamStateMachine {
pub fn new(finished: bool) -> Self {
if finished {
Self::ReadingFinished
} else {
Self::Reading
}
}
// Can call read() to read more data or wait on closing
pub fn can_poll(&self) -> bool {
!matches!(self, Self::Errored)
}
pub fn is_reading(&self) -> bool {
matches!(self, Self::Reading)
}
pub fn is_done(&self) -> bool {
!matches!(self, Self::Reading)
}
pub fn is_errored(&self) -> bool {
matches!(self, Self::Errored)
}
/// Move the state machine to Finished state if set is true
pub fn maybe_finished(&mut self, set: bool) {
if set {
*self = Self::ReadingFinished
}
}
pub fn to_errored(&mut self) {
*self = Self::Errored
}
}
/// Possible upstream states during request multiplexing
#[derive(Debug, Clone, Copy)]
pub(crate) struct ResponseStateMachine {
upstream_response_done: bool,
cached_response_done: bool,
}
impl ResponseStateMachine {
pub fn new() -> Self {
ResponseStateMachine {
upstream_response_done: false,
cached_response_done: true, // no cached response by default
}
}
pub fn is_done(&self) -> bool {
self.upstream_response_done && self.cached_response_done
}
pub fn upstream_done(&self) -> bool {
self.upstream_response_done
}
pub fn cached_done(&self) -> bool {
self.cached_response_done
}
pub fn enable_cached_response(&mut self) {
self.cached_response_done = false;
}
pub fn maybe_set_upstream_done(&mut self, done: bool) {
if done {
self.upstream_response_done = true;
}
}
pub fn maybe_set_cache_done(&mut self, done: bool) {
if done {
self.cached_response_done = true;
}
}
}
// proxy_trait.rs
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::*;
use pingora_cache::{
key::HashBinary,
CacheKey, CacheMeta, ForcedInvalidationKind,
RespCacheable::{self, *},
};
use proxy_cache::range_filter::{self};
use std::time::Duration;
/// The interface to control the HTTP proxy
///
/// The methods in [ProxyHttp] are filters/callbacks which will be performed on all requests at their
/// particular stage (if applicable).
///
/// If any of the filters returns [Result::Err], the request will fail, and the error will be logged.
#[cfg_attr(not(doc_async_trait), async_trait)]
pub trait ProxyHttp {
/// The per request object to share state across the different filters
type CTX;
/// Define how the ctx should be created.
fn new_ctx(&self) -> Self::CTX;
/// Define where the proxy should send the request to.
///
/// The returned [HttpPeer] contains the information regarding where and how this request should
/// be forwarded to.
async fn upstream_peer(
&self,
session: &mut Session,
ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>>;
/// Set up downstream modules.
///
/// In this phase, users can add or configure [HttpModules] before the server starts up.
///
/// In the default implementation of this method, [ResponseCompressionBuilder] is added
/// and disabled.
fn init_downstream_modules(&self, modules: &mut HttpModules) {
// Add disabled downstream compression module by default
modules.add_module(ResponseCompressionBuilder::enable(0));
}
/// Handle the incoming request.
///
/// In this phase, users can parse, validate, rate limit, perform access control and/or
/// return a response for this request.
///
/// If the user already sent a response to this request, an Ok(true) should be returned so that
/// the proxy would exit. The proxy continues to the next phases when Ok(false) is returned.
///
/// By default this filter does nothing and returns Ok(false).
async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
where
Self::CTX: Send + Sync,
{
Ok(false)
}
/// Handle the incoming request before any downstream module is executed.
///
/// This function is similar to [Self::request_filter()] but executes before any other logic,
/// including downstream module logic. The main purpose of this function is to provide finer
/// grained control of the behavior of the modules.
///
/// Note that because this function is executed before any module that might provide access
/// control or rate limiting, logic should stay in request_filter() if it can in order to be
/// protected by said modules.
async fn early_request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
where
Self::CTX: Send + Sync,
{
Ok(())
}
/// Handle the incoming request body.
///
/// This function will be called every time a piece of request body is received. The body is
/// **not the entire request body**.
///
/// The async nature of this function allows to throttle the upload speed and/or executing
/// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
/// who process the requests themselves.
async fn request_body_filter(
&self,
_session: &mut Session,
_body: &mut Option<Bytes>,
_end_of_stream: bool,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
Ok(())
}
/// This filter decides if the request is cacheable and what cache backend to use
///
/// The caller can interact with Session.cache to enable caching.
///
/// By default this filter does nothing which effectively disables caching.
// Ideally only session.cache should be modified, TODO: reflect that in this interface
fn request_cache_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()> {
Ok(())
}
/// This callback generates the cache key
///
/// This callback is called only when cache is enabled for this request
///
/// By default this callback returns a default cache key generated from the request.
fn cache_key_callback(&self, session: &Session, _ctx: &mut Self::CTX) -> Result<CacheKey> {
let req_header = session.req_header();
Ok(CacheKey::default(req_header))
}
/// This callback is invoked when a cacheable response is ready to be admitted to cache
fn cache_miss(&self, session: &mut Session, _ctx: &mut Self::CTX) {
session.cache.cache_miss();
}
/// This filter is called after a successful cache lookup and before the
/// cache asset is ready to be used.
///
/// This filter allows the user to log or force invalidate the asset.
///
/// The value returned indicates if the force invalidation should be used,
/// and which kind. Returning None indicates no forced invalidation
async fn cache_hit_filter(
&self,
_session: &Session,
_meta: &CacheMeta,
_ctx: &mut Self::CTX,
) -> Result<Option<ForcedInvalidationKind>>
where
Self::CTX: Send + Sync,
{
Ok(None)
}
/// Decide if a request should continue to upstream after not being served from cache.
///
/// returns: Ok(true) if the request should continue, Ok(false) if a response was written by the
/// callback and the session should be finished, or an error
///
/// This filter can be used for deferring checks like rate limiting or access control to when they
/// actually needed after cache miss.
async fn proxy_upstream_filter(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<bool>
where
Self::CTX: Send + Sync,
{
Ok(true)
}
/// Decide if the response is cacheable
fn response_cache_filter(
&self,
_session: &Session,
_resp: &ResponseHeader,
_ctx: &mut Self::CTX,
) -> Result<RespCacheable> {
Ok(Uncacheable(NoCacheReason::Custom("default")))
}
/// Decide how to generate cache vary key from both request and response
///
/// None means no variance is needed.
fn cache_vary_filter(
&self,
_meta: &CacheMeta,
_ctx: &mut Self::CTX,
_req: &RequestHeader,
) -> Option<HashBinary> {
// default to None for now to disable vary feature
None
}
/// Decide if the incoming request's condition _fails_ against the cached response.
///
/// Returning Ok(true) means that the response does _not_ match against the condition, and
/// that the proxy can return 304 Not Modified downstream.
///
/// An example is a conditional GET request with If-None-Match: "foobar". If the cached
/// response contains the ETag: "foobar", then the condition fails, and 304 Not Modified
/// should be returned. Else, the condition passes which means the full 200 OK response must
/// be sent.
fn cache_not_modified_filter(
&self,
session: &Session,
resp: &ResponseHeader,
_ctx: &mut Self::CTX,
) -> Result<bool> {
Ok(
pingora_core::protocols::http::conditional_filter::not_modified_filter(
session.req_header(),
resp,
),
)
}
/// This filter is called when cache is enabled to determine what byte range to return (in both
/// cache hit and miss cases) from the response body. It is only used when caching is enabled,
/// otherwise the upstream is responsible for any filtering. It allows users to define the range
/// this request is for via its return type range_filter::RangeType.
///
/// It also allow users to modify the response header accordingly.
///
/// The default implementation can handle a single-range as per [RFC7232].
fn range_header_filter(
&self,
req: &RequestHeader,
resp: &mut ResponseHeader,
_ctx: &mut Self::CTX,
) -> range_filter::RangeType {
proxy_cache::range_filter::range_header_filter(req, resp)
}
/// Modify the request before it is sent to the upstream
///
/// Unlike [Self::request_filter()], this filter allows to change the request headers to send
/// to the upstream.
async fn upstream_request_filter(
&self,
_session: &mut Session,
_upstream_request: &mut RequestHeader,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
Ok(())
}
/// Modify the response header from the upstream
///
/// The modification is before caching, so any change here will be stored in the cache if enabled.
///
/// Responses served from cache won't trigger this filter. If the cache needed revalidation,
/// only the 304 from upstream will trigger the filter (though it will be merged into the
/// cached header, not served directly to downstream).
fn upstream_response_filter(
&self,
_session: &mut Session,
_upstream_response: &mut ResponseHeader,
_ctx: &mut Self::CTX,
) {
}
/// Modify the response header before it is send to the downstream
///
/// The modification is after caching. This filter is called for all responses including
/// responses served from cache.
async fn response_filter(
&self,
_session: &mut Session,
_upstream_response: &mut ResponseHeader,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
Ok(())
}
/// Similar to [Self::upstream_response_filter()] but for response body
///
/// This function will be called every time a piece of response body is received. The body is
/// **not the entire response body**.
fn upstream_response_body_filter(
&self,
_session: &mut Session,
_body: &mut Option<Bytes>,
_end_of_stream: bool,
_ctx: &mut Self::CTX,
) -> Result<()> {
Ok(())
}
/// Similar to [Self::upstream_response_filter()] but for response trailers
fn upstream_response_trailer_filter(
&self,
_session: &mut Session,
_upstream_trailers: &mut header::HeaderMap,
_ctx: &mut Self::CTX,
) -> Result<()> {
Ok(())
}
/// Similar to [Self::response_filter()] but for response body chunks
fn response_body_filter(
&self,
_session: &mut Session,
_body: &mut Option<Bytes>,
_end_of_stream: bool,
_ctx: &mut Self::CTX,
) -> Result<Option<Duration>>
where
Self::CTX: Send + Sync,
{
Ok(None)
}
/// Similar to [Self::response_filter()] but for response trailers.
/// Note, returning an Ok(Some(Bytes)) will result in the downstream response
/// trailers being written to the response body.
///
/// TODO: make this interface more intuitive
async fn response_trailer_filter(
&self,
_session: &mut Session,
_upstream_trailers: &mut header::HeaderMap,
_ctx: &mut Self::CTX,
) -> Result<Option<Bytes>>
where
Self::CTX: Send + Sync,
{
Ok(None)
}
/// This filter is called when the entire response is sent to the downstream successfully or
/// there is a fatal error that terminate the request.
///
/// An error log is already emitted if there is any error. This phase is used for collecting
/// metrics and sending access logs.
async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX)
where
Self::CTX: Send + Sync,
{
}
/// A value of true means that the log message will be suppressed. The default value is false.
fn suppress_error_log(&self, _session: &Session, _ctx: &Self::CTX, _error: &Error) -> bool {
false
}
/// This filter is called when there is an error **after** a connection is established (or reused)
/// to the upstream.
fn error_while_proxy(
&self,
peer: &HttpPeer,
session: &mut Session,
e: Box<Error>,
_ctx: &mut Self::CTX,
client_reused: bool,
) -> Box<Error> {
let mut e = e.more_context(format!("Peer: {}", peer));
// only reused client connections where retry buffer is not truncated
e.retry
.decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
e
}
/// This filter is called when there is an error in the process of establishing a connection
/// to the upstream.
///
/// In this filter the user can decide whether the error is retry-able by marking the error e.
///
/// If the error can be retried, [Self::upstream_peer()] will be called again so that the user
/// can decide whether to send the request to the same upstream or another upstream that is possibly
/// available.
fn fail_to_connect(
&self,
_session: &mut Session,
_peer: &HttpPeer,
_ctx: &mut Self::CTX,
e: Box<Error>,
) -> Box<Error> {
e
}
/// This filter is called when the request encounters a fatal error.
///
/// Users may write an error response to the downstream if the downstream is still writable.
///
/// The response status code of the error response maybe returned for logging purpose.
async fn fail_to_proxy(&self, session: &mut Session, e: &Error, _ctx: &mut Self::CTX) -> u16
where
Self::CTX: Send + Sync,
{
let code = match e.etype() {
HTTPStatus(code) => *code,
_ => {
match e.esource() {
ErrorSource::Upstream => 502,
ErrorSource::Downstream => {
match e.etype() {
WriteError | ReadError | ConnectionClosed => {
/* conn already dead */
0
}
_ => 400,
}
}
ErrorSource::Internal | ErrorSource::Unset => 500,
}
}
};
if code > 0 {
session.respond_error(code).await.unwrap_or_else(|e| {
error!("failed to send error response to downstream: {e}");
});
}
code
}
/// Decide whether should serve stale when encountering an error or during revalidation
///
/// An implementation should follow
/// <https://datatracker.ietf.org/doc/html/rfc9111#section-4.2.4>
/// <https://www.rfc-editor.org/rfc/rfc5861#section-4>
///
/// This filter is only called if cache is enabled.
// 5xx HTTP status will be encoded as ErrorType::HTTPStatus(code)
fn should_serve_stale(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
error: Option<&Error>, // None when it is called during stale while revalidate
) -> bool {
// A cache MUST NOT generate a stale response unless
// it is disconnected
// or doing so is explicitly permitted by the client or origin server
// (e.g. headers or an out-of-band contract)
error.map_or(false, |e| e.esource() == &ErrorSource::Upstream)
}
/// This filter is called when the request just established or reused a connection to the upstream
///
/// This filter allows user to log timing and connection related info.
async fn connected_to_upstream(
&self,
_session: &mut Session,
_reused: bool,
_peer: &HttpPeer,
#[cfg(unix)] _fd: std::os::unix::io::RawFd,
#[cfg(windows)] _sock: std::os::windows::io::RawSocket,
_digest: Option<&Digest>,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
Ok(())
}
/// This callback is invoked every time request related error log needs to be generated
///
/// Users can define what is important to be written about this request via the returned string.
fn request_summary(&self, session: &Session, _ctx: &Self::CTX) -> String {
session.as_ref().request_summary()
}
/// Whether the request should be used to invalidate(delete) the HTTP cache
///
/// - true: this request will be used to invalidate the cache.
/// - false: this request is a treated as a normal request
fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
false
}
/// This filter is called after the proxy cache generates the downstream response to the purge
/// request (to invalidate or delete from the HTTP cache), based on the purge status, which
/// indicates whether the request succeeded or failed.
///
/// The filter allows the user to modify or replace the generated downstream response.
/// If the filter returns Err, the proxy will instead send a 500 response.
fn purge_response_filter(
&self,
_session: &Session,
_ctx: &mut Self::CTX,
_purge_status: PurgeStatus,
_purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
) -> Result<()> {
Ok(())
}
}
proxy_subrequest.rs
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use core::pin::Pin;
use core::task::{Context, Poll};
use pingora_cache::lock::WritePermit;
use pingora_core::protocols::raw_connect::ProxyDigest;
use pingora_core::protocols::{
GetProxyDigest, GetSocketDigest, GetTimingDigest, Peek, SocketDigest, Ssl, TimingDigest,
UniqueID, UniqueIDType,
};
use std::io::Cursor;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, Error, ReadBuf};
// An async IO stream that returns the request when being read from and dumps the data to the void
// when being write to
#[derive(Debug)]
pub(crate) struct DummyIO(Cursor<Vec<u8>>);
impl DummyIO {
pub fn new(read_bytes: &[u8]) -> Self {
DummyIO(Cursor::new(Vec::from(read_bytes)))
}
}
impl AsyncRead for DummyIO {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), Error>> {
if self.0.position() < self.0.get_ref().len() as u64 {
Pin::new(&mut self.0).poll_read(cx, buf)
} else {
// all data is read, pending forever otherwise the stream is considered closed
Poll::Pending
}
}
}
impl AsyncWrite for DummyIO {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
}
impl UniqueID for DummyIO {
fn id(&self) -> UniqueIDType {
0 // placeholder
}
}
impl Ssl for DummyIO {}
impl GetTimingDigest for DummyIO {
fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
vec![]
}
}
impl GetProxyDigest for DummyIO {
fn get_proxy_digest(&self) -> Option<Arc<ProxyDigest>> {
None
}
}
impl GetSocketDigest for DummyIO {
fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
None
}
}
impl Peek for DummyIO {}
#[async_trait]
impl pingora_core::protocols::Shutdown for DummyIO {
async fn shutdown(&mut self) -> () {}
}
#[tokio::test]
async fn test_dummy_io() {
use futures::FutureExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut dummy = DummyIO::new(&[1, 2]);
let res = dummy.read_u8().await;
assert_eq!(res.unwrap(), 1);
let res = dummy.read_u8().await;
assert_eq!(res.unwrap(), 2);
let res = dummy.read_u8().now_or_never();
assert!(res.is_none()); // pending forever
let res = dummy.write_u8(0).await;
assert!(res.is_ok());
}
// To share state across the parent req and the sub req
pub(crate) struct Ctx {
pub(crate) write_lock: Option<WritePermit>,
}
use crate::HttpSession;
pub fn create_dummy_session(parsed_session: &HttpSession) -> HttpSession {
// TODO: check if there is req body, we don't capture the body for now
HttpSession::new_http1(Box::new(DummyIO::new(&parsed_session.to_h1_raw())))
}
#[tokio::test]
async fn test_dummy_request() {
use tokio_test::io::Builder;
let input = b"GET / HTTP/1.1\r\n\r\n";
let mock_io = Builder::new().read(&input[..]).build();
let mut req = HttpSession::new_http1(Box::new(mock_io));
req.read_request().await.unwrap();
assert_eq!(input.as_slice(), req.to_h1_raw());
let mut dummy_req = create_dummy_session(&req);
dummy_req.read_request().await.unwrap();
assert_eq!(input.as_slice(), req.to_h1_raw());
}
proxy_purge.rs
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::*;
use pingora_core::protocols::http::error_resp;
use std::borrow::Cow;
#[derive(Debug)]
pub enum PurgeStatus {
/// Cache was not enabled, purge ineffectual.
NoCache,
/// Asset was found in cache (and presumably purged or being purged).
Found,
/// Asset was not found in cache.
NotFound,
/// Cache returned a purge error.
/// Contains causing error in case it should affect the downstream response.
Error(Box<Error>),
}
// Return a canned response to a purge request, based on whether the cache had the asset or not
// (or otherwise returned an error).
fn purge_response(purge_status: &PurgeStatus) -> Cow<'static, ResponseHeader> {
let resp = match purge_status {
PurgeStatus::NoCache => &*NOT_PURGEABLE,
PurgeStatus::Found => &*OK,
PurgeStatus::NotFound => &*NOT_FOUND,
PurgeStatus::Error(ref _e) => &*INTERNAL_ERROR,
};
Cow::Borrowed(resp)
}
fn gen_purge_response(code: u16) -> ResponseHeader {
let mut resp = ResponseHeader::build(code, Some(3)).unwrap();
resp.insert_header(header::SERVER, &SERVER_NAME[..])
.unwrap();
resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
resp.insert_header(header::CACHE_CONTROL, "private, no-store")
.unwrap();
// TODO more headers?
resp
}
static OK: Lazy<ResponseHeader> = Lazy::new(|| gen_purge_response(200));
static NOT_FOUND: Lazy<ResponseHeader> = Lazy::new(|| gen_purge_response(404));
// for when purge is sent to uncacheable assets
static NOT_PURGEABLE: Lazy<ResponseHeader> = Lazy::new(|| gen_purge_response(405));
// on cache storage or proxy error
static INTERNAL_ERROR: Lazy<ResponseHeader> = Lazy::new(|| error_resp::gen_error_response(500));
impl<SV> HttpProxy<SV> {
pub(crate) async fn proxy_purge(
&self,
session: &mut Session,
ctx: &mut SV::CTX,
) -> Option<(bool, Option<Box<Error>>)>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
let purge_status = if session.cache.enabled() {
match session.cache.purge().await {
Ok(found) => {
if found {
PurgeStatus::Found
} else {
PurgeStatus::NotFound
}
}
Err(e) => {
session.cache.disable(NoCacheReason::StorageError);
warn!(
"Fail to purge cache: {e}, {}",
self.inner.request_summary(session, ctx)
);
PurgeStatus::Error(e)
}
}
} else {
// cache was not enabled
PurgeStatus::NoCache
};
let mut purge_resp = purge_response(&purge_status);
if let Err(e) =
self.inner
.purge_response_filter(session, ctx, purge_status, &mut purge_resp)
{
error!(
"Failed purge response filter: {e}, {}",
self.inner.request_summary(session, ctx)
);
purge_resp = Cow::Borrowed(&*INTERNAL_ERROR)
}
let write_result = match purge_resp {
Cow::Borrowed(r) => session.as_mut().write_response_header_ref(r).await,
Cow::Owned(r) => session.as_mut().write_response_header(Box::new(r)).await,
};
let (reuse, err) = match write_result {
Ok(_) => (true, None),
// dirty, not reusable
Err(e) => {
let e = e.into_down();
error!(
"Failed to send purge response: {e}, {}",
self.inner.request_summary(session, ctx)
);
(false, Some(e))
}
};
Some((reuse, err))
}
}
proxy_cache.rs
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::*;
use http::{Method, StatusCode};
use pingora_cache::key::CacheHashKey;
use pingora_cache::lock::LockStatus;
use pingora_cache::max_file_size::ERR_RESPONSE_TOO_LARGE;
use pingora_cache::{ForcedInvalidationKind, HitStatus, RespCacheable::*};
use pingora_core::protocols::http::conditional_filter::to_304;
use pingora_core::protocols::http::v1::common::header_value_content_length;
use pingora_core::ErrorType;
impl<SV> HttpProxy<SV> {
// return bool: server_session can be reused, and error if any
pub(crate) async fn proxy_cache(
self: &Arc<Self>,
session: &mut Session,
ctx: &mut SV::CTX,
) -> Option<(bool, Option<Box<Error>>)>
// None: continue to proxy, Some: return
where
SV: ProxyHttp + Send + Sync + 'static,
SV::CTX: Send + Sync,
{
// Cache logic request phase
if let Err(e) = self.inner.request_cache_filter(session, ctx) {
// TODO: handle this error
warn!(
"Fail to request_cache_filter: {e}, {}",
self.inner.request_summary(session, ctx)
);
}
// cache key logic, should this be part of request_cache_filter?
if session.cache.enabled() {
match self.inner.cache_key_callback(session, ctx) {
Ok(key) => {
session.cache.set_cache_key(key);
}
Err(e) => {
// TODO: handle this error
session.cache.disable(NoCacheReason::StorageError);
warn!(
"Fail to cache_key_callback: {e}, {}",
self.inner.request_summary(session, ctx)
);
}
}
}
// cache purge logic: PURGE short-circuits rest of request
if self.inner.is_purge(session, ctx) {
return self.proxy_purge(session, ctx).await;
}
// bypass cache lookup if we predict to be uncacheable
if session.cache.enabled() && !session.cache.cacheable_prediction() {
session.cache.bypass();
}
if !session.cache.enabled() {
return None;
}
// cache lookup logic
loop {
// for cache lock, TODO: cap the max number of loops
match session.cache.cache_lookup().await {
Ok(res) => {
let mut hit_status_opt = None;
if let Some((mut meta, handler)) = res {
// Vary logic
// Because this branch can be called multiple times in a loop, and we only
// need to update the vary once, check if variance is already set to
// prevent unnecessary vary lookups.
let cache_key = session.cache.cache_key();
if let Some(variance) = cache_key.variance_bin() {
// We've looked up a secondary slot.
// Adhoc double check that the variance found is the variance we want.
if Some(variance) != meta.variance() {
warn!("Cache variance mismatch, {variance:?}, {cache_key:?}");
session.cache.disable(NoCacheReason::InternalError);
break None;
}
} else {
// Basic cache key; either variance is off, or this is the primary slot.
let req_header = session.req_header();
let variance = self.inner.cache_vary_filter(&meta, ctx, req_header);
if let Some(variance) = variance {
// Variance is on. This is the primary slot.
if !session.cache.cache_vary_lookup(variance, &meta) {
// This wasn't the desired variant. Updated cache key variance, cause another
// lookup to get the desired variant, which would be in a secondary slot.
continue;
}
} // else: vary is not in use
}
// Either no variance, or the current handler targets the correct variant.
// hit
// TODO: maybe round and/or cache now()
let hit_status = if meta.is_fresh(std::time::SystemTime::now()) {
// check if we should force expire or miss
// vs. hard purge which forces miss)
match self.inner.cache_hit_filter(session, &meta, ctx).await {
Err(e) => {
error!(
"Failed to filter cache hit: {e}, {}",
self.inner.request_summary(session, ctx)
);
// this return value will cause us to fetch from upstream
HitStatus::FailedHitFilter
}
Ok(None) => HitStatus::Fresh,
Ok(Some(ForcedInvalidationKind::ForceExpired)) => {
// force expired asset should not be serve as stale
// because force expire is usually to remove data
meta.disable_serve_stale();
HitStatus::ForceExpired
}
Ok(Some(ForcedInvalidationKind::ForceMiss)) => HitStatus::ForceMiss,
}
} else {
HitStatus::Expired
};
hit_status_opt = Some(hit_status);
// init cache for hit / stale
session.cache.cache_found(meta, handler, hit_status);
}
if hit_status_opt.map_or(true, HitStatus::is_treated_as_miss) {
// cache miss
if session.cache.is_cache_locked() {
// Another request is filling the cache; try waiting til that's done and retry.
let lock_status = session.cache.cache_lock_wait().await;
if self.handle_lock_status(session, ctx, lock_status) {
continue;
} else {
break None;
}
} else {
self.inner.cache_miss(session, ctx);
break None;
}
}
// Safe because an empty hit status would have broken out
// in the block above
let hit_status = hit_status_opt.expect("None case handled as miss");
if !hit_status.is_fresh() {
// expired or force expired asset
if session.cache.is_cache_locked() {
// first if this is the sub request for the background cache update
if let Some(write_lock) = session
.subrequest_ctx
.as_mut()
.and_then(|ctx| ctx.write_lock.take())
{
// Put the write lock in the request
session.cache.set_write_lock(write_lock);
session.cache.tag_as_subrequest();
// and then let it go to upstream
break None;
}
let will_serve_stale = session.cache.can_serve_stale_updating()
&& self.inner.should_serve_stale(session, ctx, None);
if !will_serve_stale {
let lock_status = session.cache.cache_lock_wait().await;
if self.handle_lock_status(session, ctx, lock_status) {
continue;
} else {
break None;
}
}
// else continue to serve stale
session.cache.set_stale_updating();
} else if session.cache.is_cache_lock_writer() {
// stale while revalidate logic for the writer
let will_serve_stale = session.cache.can_serve_stale_updating()
&& self.inner.should_serve_stale(session, ctx, None);
if will_serve_stale {
// create a background thread to do the actual update
let subrequest =
Box::new(crate::subrequest::create_dummy_session(session));
let new_app = self.clone(); // Clone the Arc
let sub_req_ctx = Box::new(SubReqCtx {
write_lock: Some(session.cache.take_write_lock()),
});
tokio::spawn(async move {
new_app.process_subrequest(subrequest, sub_req_ctx).await;
});
// continue to serve stale for this request
session.cache.set_stale_updating();
} else {
// return to fetch from upstream
break None;
}
} else {
// return to fetch from upstream
break None;
}
}
let (reuse, err) = self.proxy_cache_hit(session, ctx).await;
if let Some(e) = err.as_ref() {
error!(
"Fail to serve cache: {e}, {}",
self.inner.request_summary(session, ctx)
);
}
// responses is served from cache, exit
break Some((reuse, err));
}
Err(e) => {
// Allow cache miss to fill cache even if cache lookup errors
// this is mostly to support backward incompatible metadata update
// TODO: check error types
// session.cache.disable();
self.inner.cache_miss(session, ctx);
warn!(
"Fail to cache lookup: {e}, {}",
self.inner.request_summary(session, ctx)
);
break None;
}
}
}
}
// return bool: server_session can be reused, and error if any
pub(crate) async fn proxy_cache_hit(
&self,
session: &mut Session,
ctx: &mut SV::CTX,
) -> (bool, Option<Box<Error>>)
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
use range_filter::*;
let seekable = session.cache.hit_handler().can_seek();
let mut header = cache_hit_header(&session.cache);
let req = session.req_header();
let not_modified = match self.inner.cache_not_modified_filter(session, &header, ctx) {
Ok(not_modified) => not_modified,
Err(e) => {
// fail open if cache_not_modified_filter errors,
// just return the whole original response
warn!(
"Failed to run cache not modified filter: {e}, {}",
self.inner.request_summary(session, ctx)
);
false
}
};
if not_modified {
to_304(&mut header);
}
let header_only = not_modified || req.method == http::method::Method::HEAD;
// process range header if the cache storage supports seek
let range_type = if seekable && !session.ignore_downstream_range {
self.inner.range_header_filter(req, &mut header, ctx)
} else {
RangeType::None
};
// return a 416 with an empty body for simplicity
let header_only = header_only || matches!(range_type, RangeType::Invalid);
// TODO: use ProxyUseCache to replace the logic below
match self.inner.response_filter(session, &mut header, ctx).await {
Ok(_) => {
if let Err(e) = session
.as_mut()
.write_response_header(header)
.await
.map_err(|e| e.into_down())
{
// downstream connection is bad already
return (false, Some(e));
}
}
Err(e) => {
// TODO: more logging and error handling
session
.as_mut()
.respond_error(500)
.await
.unwrap_or_else(|e| {
error!("failed to send error response to downstream: {e}");
});
// we have not write anything dirty to downstream, it is still reusable
return (true, Some(e));
}
}
debug!("finished sending cached header to downstream");
if !header_only {
if let RangeType::Single(r) = range_type {
if let Err(e) = session.cache.hit_handler().seek(r.start, Some(r.end)) {
return (false, Some(e));
}
}
loop {
match session.cache.hit_handler().read_body().await {
Ok(mut body) => {
let end = body.is_none();
match self
.inner
.response_body_filter(session, &mut body, end, ctx)
{
Ok(Some(duration)) => {
trace!("delaying response for {duration:?}");
time::sleep(duration).await;
}
Ok(None) => { /* continue */ }
Err(e) => {
// body is being sent, don't treat downstream as reusable
return (false, Some(e));
}
}
if let Some(b) = body {
// write to downstream
if let Err(e) = session
.as_mut()
.write_response_body(b, false)
.await
.map_err(|e| e.into_down())
{
return (false, Some(e));
}
} else {
break;
}
}
Err(e) => return (false, Some(e)),
}
}
}
if let Err(e) = session.cache.finish_hit_handler().await {
warn!("Error during finish_hit_handler: {}", e);
}
match session.as_mut().finish_body().await {
Ok(_) => {
debug!("finished sending cached body to downstream");
(true, None)
}
Err(e) => (false, Some(e)),
}
}
/* Downstream revalidation, only needed when cache is on because otherwise origin
* will handle it */
pub(crate) fn downstream_response_conditional_filter(
&self,
use_cache: &mut ServeFromCache,
session: &Session,
resp: &mut ResponseHeader,
ctx: &mut SV::CTX,
) where
SV: ProxyHttp,
{
// TODO: range
let req = session.req_header();
let not_modified = match self.inner.cache_not_modified_filter(session, resp, ctx) {
Ok(not_modified) => not_modified,
Err(e) => {
// fail open if cache_not_modified_filter errors,
// just return the whole original response
warn!(
"Failed to run cache not modified filter: {e}, {}",
self.inner.request_summary(session, ctx)
);
false
}
};
if not_modified {
to_304(resp);
}
let header_only = not_modified || req.method == http::method::Method::HEAD;
if header_only {
if use_cache.is_on() {
// tell cache to stop after yielding header
use_cache.enable_header_only();
} else {
// headers only during cache miss, upstream should continue send
// body to cache, session will ignore body automatically because
// of the signature of header (304)
// TODO: we should drop body before/within this filter so that body
// filter only runs on data downstream sees
}
}
}
// TODO: cache upstream header filter to add/remove headers
pub(crate) async fn cache_http_task(
&self,
session: &mut Session,
task: &HttpTask,
ctx: &mut SV::CTX,
serve_from_cache: &mut ServeFromCache,
) -> Result<()>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
if !session.cache.enabled() && !session.cache.bypassing() {
return Ok(());
}
match task {
HttpTask::Header(header, end_stream) => {
// decide if cacheable and create cache meta
// for now, skip 1xxs (should not affect response cache decisions)
// However 101 is an exception because it is the final response header
if header.status.is_informational()
&& header.status != StatusCode::SWITCHING_PROTOCOLS
{
return Ok(());
}
match self.inner.response_cache_filter(session, header, ctx)? {
Cacheable(meta) => {
let mut fill_cache = true;
if session.cache.bypassing() {
// The cache might have been bypassed because the response exceeded the
// maximum cacheable asset size. If that looks like the case (there
// is a maximum file size configured and we don't know the content
// length up front), attempting to re-enable the cache now would cause
// the request to fail when the chunked response exceeds the maximum
// file size again.
if session.cache.max_file_size_bytes().is_some()
&& !meta.headers().contains_key(header::CONTENT_LENGTH)
{
session.cache.disable(NoCacheReason::ResponseTooLarge);
return Ok(());
}
session.cache.response_became_cacheable();
if session.req_header().method == Method::GET
&& meta.response_header().status == StatusCode::OK
{
self.inner.cache_miss(session, ctx);
} else {
// we've allowed caching on the next request,
// but do not cache _this_ request if bypassed and not 200
// (We didn't run upstream request cache filters to strip range or condition headers,
// so this could be an uncacheable response e.g. 206 or 304 or HEAD.
// Exclude all non-200/GET for simplicity, may expand allowable codes in the future.)
fill_cache = false;
session.cache.disable(NoCacheReason::Deferred);
}
}
// If the Content-Length is known, and a maximum asset size has been configured
// on the cache, validate that the response does not exceed the maximum asset size.
if session.cache.enabled() {
if let Some(max_file_size) = session.cache.max_file_size_bytes() {
let content_length_hdr = meta.headers().get(header::CONTENT_LENGTH);
if let Some(content_length) =
header_value_content_length(content_length_hdr)
{
if content_length > max_file_size {
fill_cache = false;
session.cache.response_became_uncacheable(
NoCacheReason::ResponseTooLarge,
);
session.cache.disable(NoCacheReason::ResponseTooLarge);
}
}
// if the content-length header is not specified, the miss handler
// will count the response size on the fly, aborting the request
// mid-transfer if the max file size is exceeded
}
}
if fill_cache {
let req_header = session.req_header();
// Update the variance in the meta via the same callback,
// cache_vary_filter(), used in cache lookup for consistency.
// Future cache lookups need a matching variance in the meta
// with the cache key to pick up the correct variance
let variance = self.inner.cache_vary_filter(&meta, ctx, req_header);
session.cache.set_cache_meta(meta);
session.cache.update_variance(variance);
// this sends the meta and header
session.cache.set_miss_handler().await?;
if session.cache.miss_body_reader().is_some() {
serve_from_cache.enable_miss();
}
if *end_stream {
session
.cache
.miss_handler()
.unwrap() // safe, it is set above
.write_body(Bytes::new(), true)
.await?;
session.cache.finish_miss_handler().await?;
}
}
}
Uncacheable(reason) => {
if !session.cache.bypassing() {
// mark as uncacheable, so we bypass cache next time
session.cache.response_became_uncacheable(reason);
}
session.cache.disable(reason);
}
}
}
HttpTask::Body(data, end_stream) => match data {
Some(d) => {
if session.cache.enabled() {
// this will panic if more data is sent after we see end_stream
// but should be impossible in real world
let miss_handler = session.cache.miss_handler().unwrap();
// TODO: do this async
let res = miss_handler.write_body(d.clone(), *end_stream).await;
if let Err(err) = res {
if err.etype == ERR_RESPONSE_TOO_LARGE {
debug!("chunked response exceeded max cache size, remembering that it is uncacheable");
session
.cache
.response_became_uncacheable(NoCacheReason::ResponseTooLarge);
}
return Err(err);
}
if *end_stream {
session.cache.finish_miss_handler().await?;
}
}
}
None => {
if session.cache.enabled() && *end_stream {
session.cache.finish_miss_handler().await?;
}
}
},
HttpTask::Trailer(_) => {} // h1 trailer is not supported yet
HttpTask::Done => {
if session.cache.enabled() {
session.cache.finish_miss_handler().await?;
}
}
HttpTask::Failed(_) => {
// TODO: handle this failure: delete the temp files?
}
}
Ok(())
}
// Decide if local cache can be used according to upstream http header
// 1. when upstream returns 304, the local cache is refreshed and served fresh
// 2. when upstream returns certain HTTP error status, the local cache is served stale
// Return true if local cache should be used, false otherwise
pub(crate) async fn revalidate_or_stale(
&self,
session: &mut Session,
task: &mut HttpTask,
ctx: &mut SV::CTX,
) -> bool
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
if !session.cache.enabled() {
return false;
}
match task {
HttpTask::Header(resp, _eos) => {
if resp.status == StatusCode::NOT_MODIFIED {
if session.cache.maybe_cache_meta().is_some() {
// run upstream response filters on upstream 304 first
self.inner.upstream_response_filter(session, resp, ctx);
// 304 doesn't contain all the headers, merge 304 into cached 200 header
// in order for response_cache_filter to run correctly
let merged_header = session.cache.revalidate_merge_header(resp);
match self
.inner
.response_cache_filter(session, &merged_header, ctx)
{
Ok(Cacheable(mut meta)) => {
// For simplicity, ignore changes to variance over 304 for now.
// Note this means upstream can only update variance via 2xx
// (expired response).
//
// TODO: if we choose to respect changing Vary / variance over 304,
// then there are a few cases to consider. See update_variance in
// the pingora-cache module.
let old_meta = session.cache.maybe_cache_meta().unwrap(); // safe, checked above
if let Some(old_variance) = old_meta.variance() {
meta.set_variance(old_variance);
}
if let Err(e) = session.cache.revalidate_cache_meta(meta).await {
// Fail open: we can continue use the revalidated response even
// if the meta failed to write to storage
warn!("revalidate_cache_meta failed {e:?}");
}
}
Ok(Uncacheable(reason)) => {
// This response was once cacheable, and upstream tells us it has not changed
// but now we decided it is uncacheable!
// RFC 9111: still allowed to reuse stored response this time because
// it was "successfully validated"
// https://www.rfc-editor.org/rfc/rfc9111#constructing.responses.from.caches
// Serve the response, but do not update cache
// We also want to avoid poisoning downstream's cache with an unsolicited 304
// if we did not receive a conditional request from downstream
// (downstream may have a different cacheability assessment and could cache the 304)
//TODO: log more
warn!("Uncacheable {reason:?} 304 received");
session.cache.response_became_uncacheable(reason);
session.cache.revalidate_uncacheable(merged_header, reason);
}
Err(e) => {
// Error during revalidation, similarly to the reasons above
// (avoid poisoning downstream cache with passthrough 304),
// allow serving the stored response without updating cache
warn!("Error {e:?} response_cache_filter during revalidation");
session.cache.revalidate_uncacheable(
merged_header,
NoCacheReason::InternalError,
);
// Assume the next 304 may succeed, so don't mark uncacheable
}
}
// always serve from cache after receiving the 304
true
} else {
//TODO: log more
warn!("304 received without cached asset, disable caching");
let reason = NoCacheReason::Custom("304 on miss");
session.cache.response_became_uncacheable(reason);
session.cache.disable(reason);
false
}
} else if resp.status.is_server_error() {
// stale if error logic, 5xx only for now
// this is response header filter, response_written should always be None?
if !session.cache.can_serve_stale_error()
|| session.response_written().is_some()
{
return false;
}
// create an error to encode the http status code
let http_status_error = Error::create(
ErrorType::HTTPStatus(resp.status.as_u16()),
ErrorSource::Upstream,
None,
None,
);
if self
.inner
.should_serve_stale(session, ctx, Some(&http_status_error))
{
// no more need to keep the write lock
session
.cache
.release_write_lock(NoCacheReason::UpstreamError);
true
} else {
false
}
} else {
false // not 304, not stale if error status code
}
}
_ => false, // not header
}
}
// None: no staled asset is used, Some(_): staled asset is sent to downstream
// bool: can the downstream connection be reused
pub(crate) async fn handle_stale_if_error(
&self,
session: &mut Session,
ctx: &mut SV::CTX,
error: &Error,
) -> Option<(bool, Option<Box<Error>>)>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
// the caller might already checked this as an optimization
if !session.cache.can_serve_stale_error() {
return None;
}
// the error happen halfway through a regular response to downstream
// can't resend the response
if session.response_written().is_some() {
return None;
}
// check error types
if !self.inner.should_serve_stale(session, ctx, Some(error)) {
return None;
}
// log the original error
warn!(
"Fail to proxy: {}, serving stale, {}",
error,
self.inner.request_summary(session, ctx)
);
// no more need to hang onto the cache lock
session
.cache
.release_write_lock(NoCacheReason::UpstreamError);
Some(self.proxy_cache_hit(session, ctx).await)
}
// helper function to check when to continue to retry lock (true) or give up (false)
fn handle_lock_status(
&self,
session: &mut Session,
ctx: &SV::CTX,
lock_status: LockStatus,
) -> bool
where
SV: ProxyHttp,
{
debug!("cache unlocked {lock_status:?}");
match lock_status {
// should lookup the cached asset again
LockStatus::Done => true,
// should compete to be a new writer
LockStatus::TransientError => true,
// the request is uncacheable, go ahead to fetch from the origin
LockStatus::GiveUp => {
// TODO: It will be nice for the writer to propagate the real reason
session.cache.disable(NoCacheReason::CacheLockGiveUp);
// not cacheable, just go to the origin.
false
}
// treat this the same as TransientError
LockStatus::Dangling => {
// software bug, but request can recover from this
warn!(
"Dangling cache lock, {}",
self.inner.request_summary(session, ctx)
);
true
}
/* We have 3 options when a lock is held too long
* 1. release the lock and let every request complete for it again
* 2. let every request cache miss
* 3. let every request through while disabling cache
* #1 could repeat the situation but protect the origin from load
* #2 could amplify disk writes and storage for temp file
* #3 is the simplest option for now */
LockStatus::Timeout => {
warn!(
"Cache lock timeout, {}",
self.inner.request_summary(session, ctx)
);
session.cache.disable(NoCacheReason::CacheLockTimeout);
// not cacheable, just go to the origin.
false
}
// software bug, this status should be impossible to reach
LockStatus::Waiting => panic!("impossible LockStatus::Waiting"),
}
}
}
fn cache_hit_header(cache: &HttpCache) -> Box<ResponseHeader> {
let mut header = Box::new(cache.cache_meta().response_header_copy());
// convert cache response
// these status codes / method cannot have body, so no need to add chunked encoding
let no_body = matches!(header.status.as_u16(), 204 | 304);
// https://www.rfc-editor.org/rfc/rfc9111#section-4:
// When a stored response is used to satisfy a request without validation, a cache
// MUST generate an Age header field
if !cache.upstream_used() {
let age = cache.cache_meta().age().as_secs();
header.insert_header(http::header::AGE, age).unwrap();
}
/* Add chunked header to tell downstream to use chunked encoding
* during the absent of content-length in h2 */
if !no_body
&& !header.status.is_informational()
&& header.headers.get(http::header::CONTENT_LENGTH).is_none()
{
header
.insert_header(http::header::TRANSFER_ENCODING, "chunked")
.unwrap();
}
header
}
// https://datatracker.ietf.org/doc/html/rfc7233#section-3
pub mod range_filter {
use super::*;
use http::header::*;
use std::ops::Range;
// parse bytes into usize, ignores specific error
fn parse_number(input: &[u8]) -> Option<usize> {
str::from_utf8(input).ok()?.parse().ok()
}
fn parse_range_header(range: &[u8], content_length: usize) -> RangeType {
use regex::Regex;
// single byte range only for now
// https://datatracker.ietf.org/doc/html/rfc7233#section-2.1
// https://datatracker.ietf.org/doc/html/rfc7233#appendix-C: case-insensitive
static RE_SINGLE_RANGE: Lazy<Regex> =
Lazy::new(|| Regex::new(r"(?i)bytes=(?P<start>\d*)-(?P<end>\d*)").unwrap());
// ignore invalid range header
let Ok(range_str) = str::from_utf8(range) else {
return RangeType::None;
};
let Some(captured) = RE_SINGLE_RANGE.captures(range_str) else {
return RangeType::None;
};
let maybe_start = captured
.name("start")
.and_then(|s| s.as_str().parse::<usize>().ok());
let end = captured
.name("end")
.and_then(|s| s.as_str().parse::<usize>().ok());
if let Some(start) = maybe_start {
if start >= content_length {
RangeType::Invalid
} else {
// open-ended range should end at the last byte
// over sized end is allow but ignored
// range end is inclusive
let end = std::cmp::min(end.unwrap_or(content_length - 1), content_length - 1) + 1;
if end <= start {
RangeType::Invalid
} else {
RangeType::new_single(start, end)
}
}
} else {
// start is empty, this changes the meaning of the value of end
// Now it means to read the last end bytes
if let Some(end) = end {
if content_length >= end {
RangeType::new_single(content_length - end, content_length)
} else {
// over sized end is allow but ignored
RangeType::new_single(0, content_length)
}
} else {
// both empty/invalid
RangeType::Invalid
}
}
}
#[test]
fn test_parse_range() {
assert_eq!(
parse_range_header(b"bytes=0-1", 10),
RangeType::new_single(0, 2)
);
assert_eq!(
parse_range_header(b"bYTes=0-9", 10),
RangeType::new_single(0, 10)
);
assert_eq!(
parse_range_header(b"bytes=0-12", 10),
RangeType::new_single(0, 10)
);
assert_eq!(
parse_range_header(b"bytes=0-", 10),
RangeType::new_single(0, 10)
);
assert_eq!(parse_range_header(b"bytes=2-1", 10), RangeType::Invalid);
assert_eq!(parse_range_header(b"bytes=10-11", 10), RangeType::Invalid);
assert_eq!(
parse_range_header(b"bytes=-2", 10),
RangeType::new_single(8, 10)
);
assert_eq!(
parse_range_header(b"bytes=-12", 10),
RangeType::new_single(0, 10)
);
assert_eq!(parse_range_header(b"bytes=-", 10), RangeType::Invalid);
assert_eq!(parse_range_header(b"bytes=", 10), RangeType::None);
}
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum RangeType {
None,
Single(Range<usize>),
// TODO: multi-range
Invalid,
}
impl RangeType {
fn new_single(start: usize, end: usize) -> Self {
RangeType::Single(Range { start, end })
}
}
// TODO: if-range
// single range for now
pub fn range_header_filter(req: &RequestHeader, resp: &mut ResponseHeader) -> RangeType {
// The Range header field is evaluated after evaluating the precondition
// header fields defined in [RFC7232], and only if the result in absence
// of the Range header field would be a 200 (OK) response
if resp.status != StatusCode::OK {
return RangeType::None;
}
// "A server MUST ignore a Range header field received with a request method other than GET."
if req.method != http::Method::GET && req.method != http::Method::HEAD {
return RangeType::None;
}
let Some(range_header) = req.headers.get(RANGE) else {
return RangeType::None;
};
// Content-Length is not required by RFC but it is what nginx does and easier to implement
// with this header present.
let Some(content_length_bytes) = resp.headers.get(CONTENT_LENGTH) else {
return RangeType::None;
};
// bail on invalid content length
let Some(content_length) = parse_number(content_length_bytes.as_bytes()) else {
return RangeType::None;
};
// if-range wants to understand if the Last-Modified / ETag value matches exactly for use
// with resumable downloads.
// https://datatracker.ietf.org/doc/html/rfc9110#name-if-range
// Note that the RFC wants strong validation, and suggests that
// "A valid entity-tag can be distinguished from a valid HTTP-date
// by examining the first three characters for a DQUOTE,"
// but this current etag matching behavior most closely mirrors nginx.
if let Some(if_range) = req.headers.get(IF_RANGE) {
let ir = if_range.as_bytes();
let matches = if ir.len() >= 2 && ir.last() == Some(&b'"') {
resp.headers.get(ETAG).is_some_and(|etag| etag == if_range)
} else if let Some(last_modified) = resp.headers.get(LAST_MODIFIED) {
last_modified == if_range
} else {
false
};
if !matches {
return RangeType::None;
}
}
// TODO: we can also check Accept-Range header from resp. Nginx gives uses the option
// see proxy_force_ranges
let range_type = parse_range_header(range_header.as_bytes(), content_length);
match &range_type {
RangeType::None => { /* nothing to do*/ }
RangeType::Single(r) => {
// 206 response
resp.set_status(StatusCode::PARTIAL_CONTENT).unwrap();
resp.insert_header(&CONTENT_LENGTH, r.end - r.start)
.unwrap();
resp.insert_header(
&CONTENT_RANGE,
format!("bytes {}-{}/{content_length}", r.start, r.end - 1), // range end is inclusive
)
.unwrap()
}
RangeType::Invalid => {
// 416 response
resp.set_status(StatusCode::RANGE_NOT_SATISFIABLE).unwrap();
// empty body for simplicity
resp.insert_header(&CONTENT_LENGTH, HeaderValue::from_static("0"))
.unwrap();
// TODO: remove other headers like content-encoding
resp.remove_header(&CONTENT_TYPE);
resp.insert_header(&CONTENT_RANGE, format!("bytes */{content_length}"))
.unwrap()
}
}
range_type
}
#[test]
fn test_range_filter() {
fn gen_req() -> RequestHeader {
RequestHeader::build(http::Method::GET, b"/", Some(1)).unwrap()
}
fn gen_resp() -> ResponseHeader {
let mut resp = ResponseHeader::build(200, Some(1)).unwrap();
resp.append_header("Content-Length", "10").unwrap();
resp
}
// no range
let req = gen_req();
let mut resp = gen_resp();
assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
assert_eq!(resp.status.as_u16(), 200);
// regular range
let mut req = gen_req();
req.insert_header("Range", "bytes=0-1").unwrap();
let mut resp = gen_resp();
assert_eq!(
RangeType::new_single(0, 2),
range_header_filter(&req, &mut resp)
);
assert_eq!(resp.status.as_u16(), 206);
assert_eq!(resp.headers.get("content-length").unwrap().as_bytes(), b"2");
assert_eq!(
resp.headers.get("content-range").unwrap().as_bytes(),
b"bytes 0-1/10"
);
// bad range
let mut req = gen_req();
req.insert_header("Range", "bytes=1-0").unwrap();
let mut resp = gen_resp();
assert_eq!(RangeType::Invalid, range_header_filter(&req, &mut resp));
assert_eq!(resp.status.as_u16(), 416);
assert_eq!(resp.headers.get("content-length").unwrap().as_bytes(), b"0");
assert_eq!(
resp.headers.get("content-range").unwrap().as_bytes(),
b"bytes */10"
);
}
#[test]
fn test_if_range() {
const DATE: &str = "Fri, 07 Jul 2023 22:03:29 GMT";
const ETAG: &str = "\"1234\"";
fn gen_req() -> RequestHeader {
let mut req = RequestHeader::build(http::Method::GET, b"/", Some(1)).unwrap();
req.append_header("Range", "bytes=0-1").unwrap();
req
}
fn gen_resp() -> ResponseHeader {
let mut resp = ResponseHeader::build(200, Some(1)).unwrap();
resp.append_header("Content-Length", "10").unwrap();
resp.append_header("Last-Modified", DATE).unwrap();
resp.append_header("ETag", ETAG).unwrap();
resp
}
// matching Last-Modified date
let mut req = gen_req();
req.insert_header("If-Range", DATE).unwrap();
let mut resp = gen_resp();
assert_eq!(
RangeType::new_single(0, 2),
range_header_filter(&req, &mut resp)
);
// non-matching date
let mut req = gen_req();
req.insert_header("If-Range", "Fri, 07 Jul 2023 22:03:25 GMT")
.unwrap();
let mut resp = gen_resp();
assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
// match ETag
let mut req = gen_req();
req.insert_header("If-Range", ETAG).unwrap();
let mut resp = gen_resp();
assert_eq!(
RangeType::new_single(0, 2),
range_header_filter(&req, &mut resp)
);
// non-matching ETags do not result in range
let mut req = gen_req();
req.insert_header("If-Range", "\"4567\"").unwrap();
let mut resp = gen_resp();
assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
let mut req = gen_req();
req.insert_header("If-Range", "1234").unwrap();
let mut resp = gen_resp();
assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
}
pub struct RangeBodyFilter {
range: RangeType,
current: usize,
}
impl RangeBodyFilter {
pub fn new() -> Self {
RangeBodyFilter {
range: RangeType::None,
current: 0,
}
}
pub fn set(&mut self, range: RangeType) {
self.range = range;
}
pub fn filter_body(&mut self, data: Option<Bytes>) -> Option<Bytes> {
match &self.range {
RangeType::None => data,
RangeType::Invalid => None,
RangeType::Single(r) => {
let current = self.current;
self.current += data.as_ref().map_or(0, |d| d.len());
data.and_then(|d| Self::filter_range_data(r.start, r.end, current, d))
}
}
}
fn filter_range_data(
start: usize,
end: usize,
current: usize,
data: Bytes,
) -> Option<Bytes> {
if current + data.len() < start || current >= end {
// if the current data is out side the desired range, just drop the data
None
} else if current >= start && current + data.len() <= end {
// all data is within the slice
Some(data)
} else {
// data: current........current+data.len()
// range: start...........end
let slice_start = start.saturating_sub(current);
let slice_end = std::cmp::min(data.len(), end - current);
Some(data.slice(slice_start..slice_end))
}
}
}
#[test]
fn test_range_body_filter() {
let mut body_filter = RangeBodyFilter::new();
assert_eq!(body_filter.filter_body(Some("123".into())).unwrap(), "123");
let mut body_filter = RangeBodyFilter::new();
body_filter.set(RangeType::Invalid);
assert!(body_filter.filter_body(Some("123".into())).is_none());
let mut body_filter = RangeBodyFilter::new();
body_filter.set(RangeType::new_single(0, 1));
assert_eq!(body_filter.filter_body(Some("012".into())).unwrap(), "0");
assert!(body_filter.filter_body(Some("345".into())).is_none());
let mut body_filter = RangeBodyFilter::new();
body_filter.set(RangeType::new_single(4, 6));
assert!(body_filter.filter_body(Some("012".into())).is_none());
assert_eq!(body_filter.filter_body(Some("345".into())).unwrap(), "45");
assert!(body_filter.filter_body(Some("678".into())).is_none());
let mut body_filter = RangeBodyFilter::new();
body_filter.set(RangeType::new_single(1, 7));
assert_eq!(body_filter.filter_body(Some("012".into())).unwrap(), "12");
assert_eq!(body_filter.filter_body(Some("345".into())).unwrap(), "345");
assert_eq!(body_filter.filter_body(Some("678".into())).unwrap(), "6");
}
}
// a state machine for proxy logic to tell when to use cache in the case of
// miss/revalidation/error.
#[derive(Debug)]
pub(crate) enum ServeFromCache {
Off, // not using cache
CacheHeader, // should serve cache header
CacheHeaderOnly, // should serve cache header
CacheBody, // should serve cache body
CacheHeaderMiss, // should serve cache header but upstream response should be admitted to cache
CacheBodyMiss, // should serve cache body but upstream response should be admitted to cache
Done, // should serve cache body
}
impl ServeFromCache {
pub fn new() -> Self {
Self::Off
}
pub fn is_on(&self) -> bool {
!matches!(self, Self::Off)
}
pub fn is_miss(&self) -> bool {
matches!(self, Self::CacheHeaderMiss | Self::CacheBodyMiss)
}
pub fn is_miss_header(&self) -> bool {
matches!(self, Self::CacheHeaderMiss)
}
pub fn is_miss_body(&self) -> bool {
matches!(self, Self::CacheBodyMiss)
}
pub fn should_discard_upstream(&self) -> bool {
self.is_on() && !self.is_miss()
}
pub fn should_send_to_downstream(&self) -> bool {
!self.is_on()
}
pub fn enable(&mut self) {
*self = Self::CacheHeader;
}
pub fn enable_miss(&mut self) {
if !self.is_on() {
*self = Self::CacheHeaderMiss;
}
}
pub fn enable_header_only(&mut self) {
match self {
Self::CacheBody => *self = Self::Done, // TODO: make sure no body is read yet
_ => *self = Self::CacheHeaderOnly,
}
}
// This function is (best effort) cancel-safe to be used in select
pub async fn next_http_task(&mut self, cache: &mut HttpCache) -> Result<HttpTask> {
if !cache.enabled() {
// Cache is disabled due to internal error
// TODO: if nothing is sent to eyeball yet, figure out a way to recovery by
// fetching from upstream
return Error::e_explain(InternalError, "Cache disabled");
}
match self {
Self::Off => panic!("ProxyUseCache not enabled"),
Self::CacheHeader => {
*self = Self::CacheBody;
Ok(HttpTask::Header(cache_hit_header(cache), false)) // false for now
}
Self::CacheHeaderMiss => {
*self = Self::CacheBodyMiss;
Ok(HttpTask::Header(cache_hit_header(cache), false)) // false for now
}
Self::CacheHeaderOnly => {
*self = Self::Done;
Ok(HttpTask::Header(cache_hit_header(cache), true))
}
Self::CacheBody => {
if let Some(b) = cache.hit_handler().read_body().await? {
Ok(HttpTask::Body(Some(b), false)) // false for now
} else {
*self = Self::Done;
Ok(HttpTask::Done)
}
}
Self::CacheBodyMiss => {
// safety: called of enable_miss() call it only if the async_body_reader exist
if let Some(b) = cache.miss_body_reader().unwrap().read_body().await? {
Ok(HttpTask::Body(Some(b), false)) // false for now
} else {
*self = Self::Done;
Ok(HttpTask::Done)
}
}
Self::Done => Ok(HttpTask::Done),
}
}
}
proxy_h1.rs
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::*;
use crate::proxy_cache::{range_filter::RangeBodyFilter, ServeFromCache};
use crate::proxy_common::*;
impl<SV> HttpProxy<SV> {
pub(crate) async fn proxy_1to1(
&self,
session: &mut Session,
client_session: &mut HttpSessionV1,
peer: &HttpPeer,
ctx: &mut SV::CTX,
) -> (bool, bool, Option<Box<Error>>)
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
client_session.read_timeout = peer.options.read_timeout;
client_session.write_timeout = peer.options.write_timeout;
// phase 2 send to upstream
let mut req = session.req_header().clone();
// Convert HTTP2 headers to H1
if req.version == Version::HTTP_2 {
req.set_version(Version::HTTP_11);
// if client has body but has no content length, add chunked encoding
// https://datatracker.ietf.org/doc/html/rfc9112#name-message-body
// "The presence of a message body in a request is signaled by a Content-Length or Transfer-Encoding header field."
if !session.is_body_empty() && session.get_header(header::CONTENT_LENGTH).is_none() {
req.insert_header(header::TRANSFER_ENCODING, "chunked")
.unwrap();
}
if session.get_header(header::HOST).is_none() {
// H2 is required to set :authority, but no necessarily header
// most H1 server expect host header, so convert
let host = req.uri.authority().map_or("", |a| a.as_str()).to_owned();
req.insert_header(header::HOST, host).unwrap();
}
// TODO: Add keepalive header for connection reuse, but this is not required per RFC
}
if session.cache.enabled() {
if let Err(e) = pingora_cache::filters::upstream::request_filter(
&mut req,
session.cache.maybe_cache_meta(),
) {
session.cache.disable(NoCacheReason::InternalError);
warn!("cache upstream filter error {}, disabling cache", e);
}
}
match self
.inner
.upstream_request_filter(session, &mut req, ctx)
.await
{
Ok(_) => { /* continue */ }
Err(e) => {
return (false, true, Some(e));
}
}
session.upstream_compression.request_filter(&req);
debug!("Sending header to upstream {:?}", req);
match client_session.write_request_header(Box::new(req)).await {
Ok(_) => { /* Continue */ }
Err(e) => {
return (false, false, Some(e.into_up()));
}
}
let (tx_upstream, rx_upstream) = mpsc::channel::<HttpTask>(TASK_BUFFER_SIZE);
let (tx_downstream, rx_downstream) = mpsc::channel::<HttpTask>(TASK_BUFFER_SIZE);
session.as_mut().enable_retry_buffering();
// start bi-directional streaming
let ret = tokio::try_join!(
self.proxy_handle_downstream(session, tx_downstream, rx_upstream, ctx),
self.proxy_handle_upstream(client_session, tx_upstream, rx_downstream),
);
match ret {
Ok((downstream_can_reuse, _upstream)) => (downstream_can_reuse, true, None),
Err(e) => (false, false, Some(e)),
}
}
pub(crate) async fn proxy_to_h1_upstream(
&self,
session: &mut Session,
client_session: &mut HttpSessionV1,
reused: bool,
peer: &HttpPeer,
ctx: &mut SV::CTX,
) -> (bool, bool, Option<Box<Error>>)
// (reuse_server, reuse_client, error)
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
#[cfg(windows)]
let raw = client_session.id() as std::os::windows::io::RawSocket;
#[cfg(unix)]
let raw = client_session.id();
if let Err(e) = self
.inner
.connected_to_upstream(
session,
reused,
peer,
raw,
Some(client_session.digest()),
ctx,
)
.await
{
return (false, false, Some(e));
}
let (server_session_reuse, client_session_reuse, error) =
self.proxy_1to1(session, client_session, peer, ctx).await;
(server_session_reuse, client_session_reuse, error)
}
async fn proxy_handle_upstream(
&self,
client_session: &mut HttpSessionV1,
tx: mpsc::Sender<HttpTask>,
mut rx: mpsc::Receiver<HttpTask>,
) -> Result<()>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
let mut request_done = false;
let mut response_done = false;
/* duplex mode, wait for either to complete */
while !request_done || !response_done {
tokio::select! {
res = client_session.read_response_task(), if !response_done => {
match res {
Ok(task) => {
response_done = task.is_end();
let result = tx.send(task)
.await.or_err(
InternalError,
"Failed to send upstream header to pipe");
// If the request is upgraded, the downstream pipe can early exit
// when the downstream connection is closed.
// In that case, this function should ignore that the pipe is closed.
// So that this function could read the rest events from rx including
// the closure, then exit.
if result.is_err() && !client_session.is_upgrade_req() {
return result;
}
},
Err(e) => {
// Push the error to downstream and then quit
// Don't care if send fails: downstream already gone
let _ = tx.send(HttpTask::Failed(e.into_up())).await;
// Downstream should consume all remaining data and handle the error
return Ok(())
}
}
},
body = rx.recv(), if !request_done => {
request_done = send_body_to1(client_session, body).await?;
// An upgraded request is terminated when either side is done
if request_done && client_session.is_upgrade_req() {
response_done = true;
}
},
else => {
// this shouldn't be reached as the while loop would already exit
break;
}
}
}
Ok(())
}
// todo use this function to replace bidirection_1to2()
// returns whether this server (downstream) session can be reused
async fn proxy_handle_downstream(
&self,
session: &mut Session,
tx: mpsc::Sender<HttpTask>,
mut rx: mpsc::Receiver<HttpTask>,
ctx: &mut SV::CTX,
) -> Result<bool>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
let mut downstream_state = DownstreamStateMachine::new(session.as_mut().is_body_done());
let buffer = session.as_ref().get_retry_buffer();
// retry, send buffer if it exists or body empty
if buffer.is_some() || session.as_mut().is_body_empty() {
let send_permit = tx
.reserve()
.await
.or_err(InternalError, "reserving body pipe")?;
self.send_body_to_pipe(
session,
buffer,
downstream_state.is_done(),
send_permit,
ctx,
)
.await?;
}
let mut response_state = ResponseStateMachine::new();
// these two below can be wrapped into an internal ctx
// use cache when upstream revalidates (or TODO: error)
let mut serve_from_cache = proxy_cache::ServeFromCache::new();
let mut range_body_filter = proxy_cache::range_filter::RangeBodyFilter::new();
/* duplex mode without caching
* Read body from downstream while reading response from upstream
* If response is done, only read body from downstream
* If request is done, read response from upstream while idling downstream (to close quickly)
* If both are done, quit the loop
*
* With caching + but without partial read support
* Similar to above, cache admission write happen when the data is write to downstream
*
* With caching + partial read support
* A. Read upstream response and write to cache
* B. Read data from cache and send to downstream
* If B fails (usually downstream close), continue A.
* If A fails, exit with error.
* If both are done, quit the loop
* Usually there is no request body to read for cacheable request
*/
while !downstream_state.is_done() || !response_state.is_done() {
// reserve tx capacity ahead to avoid deadlock, see below
let send_permit = tx
.try_reserve()
.or_err(InternalError, "try_reserve() body pipe for upstream");
tokio::select! {
// only try to send to pipe if there is capacity to avoid deadlock
// Otherwise deadlock could happen if both upstream and downstream are blocked
// on sending to their corresponding pipes which are both full.
body = session.downstream_session.read_body_or_idle(downstream_state.is_done()),
if downstream_state.can_poll() && send_permit.is_ok() => {
debug!("downstream event");
let body = match body {
Ok(b) => b,
Err(e) => {
if serve_from_cache.is_miss() {
// ignore downstream error so that upstream can continue to write cache
downstream_state.to_errored();
warn!(
"Downstream Error ignored during caching: {}, {}",
e,
self.inner.request_summary(session, ctx)
);
continue;
} else {
return Err(e.into_down());
}
}
};
// If the request is websocket, None body means the request is closed.
// Set the response to be done as well so that the request completes normally.
if body.is_none() && session.is_upgrade_req() {
response_state.maybe_set_upstream_done(true);
}
// TODO: consider just drain this if serve_from_cache is set
let is_body_done = session.is_body_done();
let request_done = self.send_body_to_pipe(
session,
body,
is_body_done,
send_permit.unwrap(), // safe because we checked is_ok()
ctx,
)
.await?;
downstream_state.maybe_finished(request_done);
},
_ = tx.reserve(), if downstream_state.is_reading() && send_permit.is_err() => {
// If tx is closed, the upstream has already finished its job.
downstream_state.maybe_finished(tx.is_closed());
debug!("waiting for permit {send_permit:?}, upstream closed {}", tx.is_closed());
/* No permit, wait on more capacity to avoid starving.
* Otherwise this select only blocks on rx, which might send no data
* before the entire body is uploaded.
* once more capacity arrives we just loop back
*/
},
task = rx.recv(), if !response_state.upstream_done() => {
debug!("upstream event: {:?}", task);
if let Some(t) = task {
if serve_from_cache.should_discard_upstream() {
// just drain, do we need to do anything else?
continue;
}
// pull as many tasks as we can
let mut tasks = Vec::with_capacity(TASK_BUFFER_SIZE);
tasks.push(t);
while let Some(maybe_task) = rx.recv().now_or_never() {
debug!("upstream event now: {:?}", maybe_task);
if let Some(t) = maybe_task {
tasks.push(t);
} else {
break; // upstream closed
}
}
/* run filters before sending to downstream */
let mut filtered_tasks = Vec::with_capacity(TASK_BUFFER_SIZE);
for mut t in tasks {
if self.revalidate_or_stale(session, &mut t, ctx).await {
serve_from_cache.enable();
response_state.enable_cached_response();
// skip downstream filtering entirely as the 304 will not be sent
break;
}
session.upstream_compression.response_filter(&mut t);
let task = self.h1_response_filter(session, t, ctx,
&mut serve_from_cache,
&mut range_body_filter, false).await?;
if serve_from_cache.is_miss_header() {
response_state.enable_cached_response();
}
// check error and abort
// otherwise the error is surfaced via write_response_tasks()
if !serve_from_cache.should_send_to_downstream() {
if let HttpTask::Failed(e) = task {
return Err(e);
}
}
filtered_tasks.push(task);
}
if !serve_from_cache.should_send_to_downstream() {
// TODO: need to derive response_done from filtered_tasks in case downstream failed already
continue;
}
// set to downstream
let response_done = session.write_response_tasks(filtered_tasks).await?;
response_state.maybe_set_upstream_done(response_done);
// unsuccessful upgrade response may force the request done
downstream_state.maybe_finished(session.is_body_done());
} else {
debug!("empty upstream event");
response_state.maybe_set_upstream_done(true);
}
},
task = serve_from_cache.next_http_task(&mut session.cache),
if !response_state.cached_done() && !downstream_state.is_errored() && serve_from_cache.is_on() => {
let task = self.h1_response_filter(session, task?, ctx,
&mut serve_from_cache,
&mut range_body_filter, true).await?;
debug!("serve_from_cache task {task:?}");
match session.write_response_tasks(vec![task]).await {
Ok(b) => response_state.maybe_set_cache_done(b),
Err(e) => if serve_from_cache.is_miss() {
// give up writing to downstream but wait for upstream cache write to finish
downstream_state.to_errored();
response_state.maybe_set_cache_done(true);
warn!(
"Downstream Error ignored during caching: {}, {}",
e,
self.inner.request_summary(session, ctx)
);
continue;
} else {
return Err(e);
}
}
if response_state.cached_done() {
if let Err(e) = session.cache.finish_hit_handler().await {
warn!("Error during finish_hit_handler: {}", e);
}
}
}
else => {
break;
}
}
}
let mut reuse_downstream = !downstream_state.is_errored();
if reuse_downstream {
match session.as_mut().finish_body().await {
Ok(_) => {
debug!("finished sending body to downstream");
}
Err(e) => {
error!("Error finish sending body to downstream: {}", e);
reuse_downstream = false;
}
}
}
Ok(reuse_downstream)
}
async fn h1_response_filter(
&self,
session: &mut Session,
mut task: HttpTask,
ctx: &mut SV::CTX,
serve_from_cache: &mut ServeFromCache,
range_body_filter: &mut RangeBodyFilter,
from_cache: bool, // are the task from cache already
) -> Result<HttpTask>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
// skip caching if already served from cache
if !from_cache {
self.upstream_filter(session, &mut task, ctx)?;
// cache the original response before any downstream transformation
// requests that bypassed cache still need to run filters to see if the response has become cacheable
if session.cache.enabled() || session.cache.bypassing() {
if let Err(e) = self
.cache_http_task(session, &task, ctx, serve_from_cache)
.await
{
session.cache.disable(NoCacheReason::StorageError);
if serve_from_cache.is_miss_body() {
// if the response stream cache body during miss but write fails, it has to
// give up the entire request
return Err(e);
} else {
// otherwise, continue processing the response
warn!(
"Fail to cache response: {}, {}",
e,
self.inner.request_summary(session, ctx)
);
}
}
}
if !serve_from_cache.should_send_to_downstream() {
return Ok(task);
}
} // else: cached/local response, no need to trigger upstream filters and caching
match task {
HttpTask::Header(mut header, end) => {
/* Downstream revalidation/range, only needed when cache is on because otherwise origin
* will handle it */
// TODO: if cache is disabled during response phase, we should still do the filter
if session.cache.enabled() {
self.downstream_response_conditional_filter(
serve_from_cache,
session,
&mut header,
ctx,
);
if !session.ignore_downstream_range {
let range_type =
self.inner
.range_header_filter(session.req_header(), &mut header, ctx);
range_body_filter.set(range_type);
}
}
/* Convert HTTP 1.0 style response to chunked encoding so that we don't
* have to close the downstream connection */
// these status codes / method cannot have body, so no need to add chunked encoding
let no_body = session.req_header().method == http::method::Method::HEAD
|| matches!(header.status.as_u16(), 204 | 304);
if !no_body
&& !header.status.is_informational()
&& header
.headers
.get(http::header::TRANSFER_ENCODING)
.is_none()
&& header.headers.get(http::header::CONTENT_LENGTH).is_none()
&& !end
{
header.insert_header(http::header::TRANSFER_ENCODING, "chunked")?;
}
match self.inner.response_filter(session, &mut header, ctx).await {
Ok(_) => Ok(HttpTask::Header(header, end)),
Err(e) => Err(e),
}
}
HttpTask::Body(data, end) => {
let mut data = range_body_filter.filter_body(data);
if let Some(duration) = self
.inner
.response_body_filter(session, &mut data, end, ctx)?
{
trace!("delaying response for {:?}", duration);
time::sleep(duration).await;
}
Ok(HttpTask::Body(data, end))
}
HttpTask::Trailer(h) => Ok(HttpTask::Trailer(h)), // TODO: support trailers for h1
HttpTask::Done => Ok(task),
HttpTask::Failed(_) => Ok(task), // Do nothing just pass the error down
}
}
// TODO:: use this function to replace send_body_to2
async fn send_body_to_pipe(
&self,
session: &mut Session,
mut data: Option<Bytes>,
end_of_body: bool,
tx: mpsc::Permit<'_, HttpTask>,
ctx: &mut SV::CTX,
) -> Result<bool>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
// None: end of body
// this var is to signal if downstream finish sending the body, which shouldn't be
// affected by the request_body_filter
let end_of_body = end_of_body || data.is_none();
session
.downstream_modules_ctx
.request_body_filter(&mut data, end_of_body)
.await?;
self.inner
.request_body_filter(session, &mut data, end_of_body, ctx)
.await?;
// the flag to signal to upstream
let upstream_end_of_body = end_of_body || data.is_none();
/* It is normal to get 0 bytes because of multi-chunk or request_body_filter decides not to
* output anything yet.
* Don't write 0 bytes to the network since it will be
* treated as the terminating chunk */
if !upstream_end_of_body && data.as_ref().map_or(false, |d| d.is_empty()) {
return Ok(false);
}
debug!(
"Read {} bytes body from downstream",
data.as_ref().map_or(-1, |d| d.len() as isize)
);
tx.send(HttpTask::Body(data, upstream_end_of_body));
Ok(end_of_body)
}
}
pub(crate) async fn send_body_to1(
client_session: &mut HttpSessionV1,
recv_task: Option<HttpTask>,
) -> Result<bool> {
let body_done;
if let Some(task) = recv_task {
match task {
HttpTask::Body(data, end) => {
body_done = end;
if let Some(d) = data {
let m = client_session.write_body(&d).await;
match m {
Ok(m) => match m {
Some(n) => {
debug!("Write {} bytes body to upstream", n);
}
None => {
warn!("Upstream body is already finished. Nothing to write");
}
},
Err(e) => {
return e.into_up().into_err();
}
}
}
}
_ => {
// should never happen, sender only sends body
warn!("Unexpected task sent to upstream");
body_done = true;
}
}
} else {
// sender dropped
body_done = true;
}
if body_done {
match client_session.finish_body().await {
Ok(_) => {
debug!("finish sending body to upstream");
Ok(true)
}
Err(e) => e.into_up().into_err(),
}
} else {
Ok(false)
}
}
// proxy_h2.rs
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::*;
use crate::proxy_cache::{range_filter::RangeBodyFilter, ServeFromCache};
use crate::proxy_common::*;
use http::{header::CONTENT_LENGTH, Method, StatusCode};
use pingora_core::protocols::http::v2::client::{write_body, Http2Session};
// add scheme and authority as required by h2 lib
fn update_h2_scheme_authority(
header: &mut http::request::Parts,
raw_host: &[u8],
tls: bool,
) -> Result<()> {
let authority = if let Ok(s) = std::str::from_utf8(raw_host) {
if s.starts_with('[') {
// don't mess with ipv6 host
s
} else if let Some(colon) = s.find(':') {
if s.len() == colon + 1 {
// colon is the last char, ignore
s
} else if let Some(another_colon) = s[colon + 1..].find(':') {
// try to get rid of extra port numbers
&s[..colon + 1 + another_colon]
} else {
s
}
} else {
s
}
} else {
return Error::e_explain(
InvalidHTTPHeader,
format!("invalid authority from host {:?}", raw_host),
);
};
let scheme = if tls { "https" } else { "http" };
let uri = http::uri::Builder::new()
.scheme(scheme)
.authority(authority)
.path_and_query(header.uri.path_and_query().as_ref().unwrap().as_str())
.build();
match uri {
Ok(uri) => {
header.uri = uri;
Ok(())
}
Err(_) => Error::e_explain(
InvalidHTTPHeader,
format!("invalid authority from host {}", authority),
),
}
}
impl<SV> HttpProxy<SV> {
pub(crate) async fn proxy_1to2(
&self,
session: &mut Session,
client_session: &mut Http2Session,
peer: &HttpPeer,
ctx: &mut SV::CTX,
) -> (bool, Option<Box<Error>>)
// (reuse_server, error)
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
let mut req = session.req_header().clone();
if req.version != Version::HTTP_2 {
/* remove H1 specific headers */
// https://github.com/hyperium/h2/blob/d3b9f1e36aadc1a7a6804e2f8e86d3fe4a244b4f/src/proto/streams/send.rs#L72
req.remove_header(&http::header::TRANSFER_ENCODING);
req.remove_header(&http::header::CONNECTION);
req.remove_header(&http::header::UPGRADE);
req.remove_header("keep-alive");
req.remove_header("proxy-connection");
}
/* turn it into h2 */
req.set_version(Version::HTTP_2);
if session.cache.enabled() {
if let Err(e) = pingora_cache::filters::upstream::request_filter(
&mut req,
session.cache.maybe_cache_meta(),
) {
session.cache.disable(NoCacheReason::InternalError);
warn!("cache upstream filter error {}, disabling cache", e);
}
}
match self
.inner
.upstream_request_filter(session, &mut req, ctx)
.await
{
Ok(_) => { /* continue */ }
Err(e) => {
return (false, Some(e));
}
}
// Remove H1 Host header, save it in order to add to :authority
// We do this because certain H2 servers expect request not to have a host header.
// The Host is removed after the upstream filters above for 2 reasons
// 1. there is no API to change the :authority header
// 2. the filter code needs to be aware of the host vs :authority across http versions otherwise
let host = req.remove_header(&http::header::HOST);
session.upstream_compression.request_filter(&req);
let body_empty = session.as_mut().is_body_empty();
// whether we support sending END_STREAM on HEADERS if body is empty
let send_end_stream = req.send_end_stream().expect("req must be h2");
let mut req: http::request::Parts = req.into();
// H2 requires authority to be set, so copy that from H1 host if that is set
if let Some(host) = host {
if let Err(e) = update_h2_scheme_authority(&mut req, host.as_bytes(), peer.is_tls()) {
return (false, Some(e));
}
}
debug!("Request to h2: {req:?}");
// send END_STREAM on HEADERS
let send_header_eos = send_end_stream && body_empty;
debug!("send END_STREAM on HEADERS: {send_end_stream}");
let req = Box::new(RequestHeader::from(req));
if let Err(e) = client_session.write_request_header(req, send_header_eos) {
return (false, Some(e.into_up()));
}
if !send_end_stream && body_empty {
// send END_STREAM on empty DATA frame
match client_session.write_request_body(Bytes::new(), true) {
Ok(()) => debug!("sent empty DATA frame to h2"),
Err(e) => {
return (false, Some(e.into_up()));
}
}
}
client_session.read_timeout = peer.options.read_timeout;
// take the body writer out of the client for easy duplex
let mut client_body = client_session
.take_request_body_writer()
.expect("already send request header");
let (tx, rx) = mpsc::channel::<HttpTask>(TASK_BUFFER_SIZE);
session.as_mut().enable_retry_buffering();
/* read downstream body and upstream response at the same time */
let ret = tokio::try_join!(
self.bidirection_1to2(session, &mut client_body, rx, ctx),
pipe_2to1_response(client_session, tx)
);
match ret {
Ok((downstream_can_reuse, _upstream)) => (downstream_can_reuse, None),
Err(e) => (false, Some(e)),
}
}
pub(crate) async fn proxy_to_h2_upstream(
&self,
session: &mut Session,
client_session: &mut Http2Session,
reused: bool,
peer: &HttpPeer,
ctx: &mut SV::CTX,
) -> (bool, Option<Box<Error>>)
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
#[cfg(windows)]
let raw = client_session.fd() as std::os::windows::io::RawSocket;
#[cfg(unix)]
let raw = client_session.fd();
if let Err(e) = self
.inner
.connected_to_upstream(session, reused, peer, raw, client_session.digest(), ctx)
.await
{
return (false, Some(e));
}
let (server_session_reuse, error) =
self.proxy_1to2(session, client_session, peer, ctx).await;
(server_session_reuse, error)
}
// returns whether server (downstream) session can be reused
async fn bidirection_1to2(
&self,
session: &mut Session,
client_body: &mut h2::SendStream<bytes::Bytes>,
mut rx: mpsc::Receiver<HttpTask>,
ctx: &mut SV::CTX,
) -> Result<bool>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
let mut downstream_state = DownstreamStateMachine::new(session.as_mut().is_body_done());
// retry, send buffer if it exists
if let Some(buffer) = session.as_mut().get_retry_buffer() {
self.send_body_to2(
session,
Some(buffer),
downstream_state.is_done(),
client_body,
ctx,
)
.await?;
}
let mut response_state = ResponseStateMachine::new();
// these two below can be wrapped into an internal ctx
// use cache when upstream revalidates (or TODO: error)
let mut serve_from_cache = ServeFromCache::new();
let mut range_body_filter = proxy_cache::range_filter::RangeBodyFilter::new();
/* duplex mode
* see the Same function for h1 for more comments
*/
while !downstream_state.is_done() || !response_state.is_done() {
// Similar logic in h1 need to reserve capacity first to avoid deadlock
// But we don't need to do the same because the h2 client_body pipe is unbounded (never block)
tokio::select! {
// NOTE: cannot avoid this copy since h2 owns the buf
body = session.downstream_session.read_body_or_idle(downstream_state.is_done()), if downstream_state.can_poll() => {
debug!("downstream event");
let body = match body {
Ok(b) => b,
Err(e) => {
if serve_from_cache.is_miss() {
// ignore downstream error so that upstream can continue to write cache
downstream_state.to_errored();
warn!(
"Downstream Error ignored during caching: {}, {}",
e,
self.inner.request_summary(session, ctx)
);
continue;
} else {
return Err(e.into_down());
}
}
};
let is_body_done = session.is_body_done();
let request_done =
self.send_body_to2(session, body, is_body_done, client_body, ctx)
.await?;
downstream_state.maybe_finished(request_done);
},
task = rx.recv(), if !response_state.upstream_done() => {
if let Some(t) = task {
debug!("upstream event: {:?}", t);
if serve_from_cache.should_discard_upstream() {
// just drain, do we need to do anything else?
continue;
}
// pull as many tasks as we can
let mut tasks = Vec::with_capacity(TASK_BUFFER_SIZE);
tasks.push(t);
while let Some(maybe_task) = rx.recv().now_or_never() {
if let Some(t) = maybe_task {
tasks.push(t);
} else {
break
}
}
/* run filters before sending to downstream */
let mut filtered_tasks = Vec::with_capacity(TASK_BUFFER_SIZE);
for mut t in tasks {
if self.revalidate_or_stale(session, &mut t, ctx).await {
serve_from_cache.enable();
response_state.enable_cached_response();
// skip downstream filtering entirely as the 304 will not be sent
break;
}
session.upstream_compression.response_filter(&mut t);
// check error and abort
// otherwise the error is surfaced via write_response_tasks()
if !serve_from_cache.should_send_to_downstream() {
if let HttpTask::Failed(e) = t {
return Err(e);
}
}
filtered_tasks.push(
self.h2_response_filter(session, t, ctx,
&mut serve_from_cache,
&mut range_body_filter, false).await?);
if serve_from_cache.is_miss_header() {
response_state.enable_cached_response();
}
}
if !serve_from_cache.should_send_to_downstream() {
// TODO: need to derive response_done from filtered_tasks in case downstream failed already
continue;
}
let response_done = session.write_response_tasks(filtered_tasks).await?;
response_state.maybe_set_upstream_done(response_done);
} else {
debug!("empty upstream event");
response_state.maybe_set_upstream_done(true);
}
}
task = serve_from_cache.next_http_task(&mut session.cache),
if !response_state.cached_done() && !downstream_state.is_errored() && serve_from_cache.is_on() => {
let task = self.h2_response_filter(session, task?, ctx,
&mut serve_from_cache,
&mut range_body_filter, true).await?;
match session.write_response_tasks(vec![task]).await {
Ok(b) => response_state.maybe_set_cache_done(b),
Err(e) => if serve_from_cache.is_miss() {
// give up writing to downstream but wait for upstream cache write to finish
downstream_state.to_errored();
response_state.maybe_set_cache_done(true);
warn!(
"Downstream Error ignored during caching: {}, {}",
e,
self.inner.request_summary(session, ctx)
);
continue;
} else {
return Err(e);
}
}
if response_state.cached_done() {
if let Err(e) = session.cache.finish_hit_handler().await {
warn!("Error during finish_hit_handler: {}", e);
}
}
}
else => {
break;
}
}
}
let mut reuse_downstream = !downstream_state.is_errored();
if reuse_downstream {
match session.as_mut().finish_body().await {
Ok(_) => {
debug!("finished sending body to downstream");
}
Err(e) => {
error!("Error finish sending body to downstream: {}", e);
reuse_downstream = false;
}
}
}
Ok(reuse_downstream)
}
async fn h2_response_filter(
&self,
session: &mut Session,
mut task: HttpTask,
ctx: &mut SV::CTX,
serve_from_cache: &mut ServeFromCache,
range_body_filter: &mut RangeBodyFilter,
from_cache: bool, // are the task from cache already
) -> Result<HttpTask>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
if !from_cache {
self.upstream_filter(session, &mut task, ctx)?;
// cache the original response before any downstream transformation
// requests that bypassed cache still need to run filters to see if the response has become cacheable
if session.cache.enabled() || session.cache.bypassing() {
if let Err(e) = self
.cache_http_task(session, &task, ctx, serve_from_cache)
.await
{
session.cache.disable(NoCacheReason::StorageError);
if serve_from_cache.is_miss_body() {
// if the response stream cache body during miss but write fails, it has to
// give up the entire request
return Err(e);
} else {
// otherwise, continue processing the response
warn!(
"Fail to cache response: {}, {}",
e,
self.inner.request_summary(session, ctx)
);
}
}
}
// skip the downstream filtering if these tasks are just for cache admission
if !serve_from_cache.should_send_to_downstream() {
return Ok(task);
}
} // else: cached/local response, no need to trigger upstream filters and caching
match task {
HttpTask::Header(mut header, eos) => {
let req = session.req_header();
/* Downstream revalidation, only needed when cache is on because otherwise origin
* will handle it */
// TODO: if cache is disabled during response phase, we should still do the filter
if session.cache.enabled() {
self.downstream_response_conditional_filter(
serve_from_cache,
session,
&mut header,
ctx,
);
if !session.ignore_downstream_range {
let range_type = self.inner.range_header_filter(req, &mut header, ctx);
range_body_filter.set(range_type);
}
}
self.inner
.response_filter(session, &mut header, ctx)
.await?;
/* Downgrade the version so that write_response_header won't panic */
header.set_version(Version::HTTP_11);
// these status codes / method cannot have body, so no need to add chunked encoding
let no_body = session.req_header().method == "HEAD"
|| matches!(header.status.as_u16(), 204 | 304);
/* Add chunked header to tell downstream to use chunked encoding
* during the absent of content-length in h2 */
if !no_body
&& !header.status.is_informational()
&& header.headers.get(http::header::CONTENT_LENGTH).is_none()
{
header.insert_header(http::header::TRANSFER_ENCODING, "chunked")?;
}
Ok(HttpTask::Header(header, eos))
}
HttpTask::Body(data, eos) => {
let mut data = range_body_filter.filter_body(data);
if let Some(duration) = self
.inner
.response_body_filter(session, &mut data, eos, ctx)?
{
trace!("delaying response for {duration:?}");
time::sleep(duration).await;
}
Ok(HttpTask::Body(data, eos))
}
HttpTask::Trailer(mut trailers) => {
let trailer_buffer = match trailers.as_mut() {
Some(trailers) => {
debug!("Parsing response trailers..");
match self
.inner
.response_trailer_filter(session, trailers, ctx)
.await
{
Ok(buf) => buf,
Err(e) => {
error!(
"Encountered error while filtering upstream trailers {:?}",
e
);
None
}
}
}
_ => None,
};
// if we have a trailer buffer write it to the downstream response body
if let Some(buffer) = trailer_buffer {
// write_body will not write additional bytes after reaching the content-length
// for gRPC H2 -> H1 this is not a problem but may be a problem for non gRPC code
// https://http2.github.io/http2-spec/#malformed
Ok(HttpTask::Body(Some(buffer), true))
} else {
Ok(HttpTask::Trailer(trailers))
}
}
HttpTask::Done => Ok(task),
HttpTask::Failed(_) => Ok(task), // Do nothing just pass the error down
}
}
async fn send_body_to2(
&self,
session: &mut Session,
mut data: Option<Bytes>,
end_of_body: bool,
client_body: &mut h2::SendStream<bytes::Bytes>,
ctx: &mut SV::CTX,
) -> Result<bool>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
session
.downstream_modules_ctx
.request_body_filter(&mut data, end_of_body)
.await?;
self.inner
.request_body_filter(session, &mut data, end_of_body, ctx)
.await?;
/* it is normal to get 0 bytes because of multi-chunk parsing or request_body_filter.
* Although there is no harm writing empty byte to h2, unlike h1, we ignore it
* for consistency */
if !end_of_body && data.as_ref().map_or(false, |d| d.is_empty()) {
return Ok(false);
}
if let Some(data) = data {
debug!("Write {} bytes body to h2 upstream", data.len());
write_body(client_body, data, end_of_body).map_err(|e| e.into_up())?;
} else {
debug!("Read downstream body done");
/* send a standalone END_STREAM flag */
write_body(client_body, Bytes::new(), true).map_err(|e| e.into_up())?;
}
Ok(end_of_body)
}
}
/* Read response header, body and trailer from h2 upstream and send them to tx */
pub(crate) async fn pipe_2to1_response(
client: &mut Http2Session,
tx: mpsc::Sender<HttpTask>,
) -> Result<()> {
client
.read_response_header()
.await
.map_err(|e| e.into_up())?; // should we send the error as an HttpTask?
let resp_header = Box::new(client.response_header().expect("just read").clone());
match client.check_response_end_or_error() {
Ok(eos) => {
// XXX: the h2 crate won't check for content-length underflow
// if a header frame with END_STREAM is sent without data frames
// As stated by RFC, "204 or 304 responses contain no content,
// as does the response to a HEAD request"
// https://datatracker.ietf.org/doc/html/rfc9113#section-8.1.1
let req_header = client.request_header().expect("must have sent req");
if eos
&& req_header.method != Method::HEAD
&& resp_header.status != StatusCode::NO_CONTENT
&& resp_header.status != StatusCode::NOT_MODIFIED
// RFC technically allows for leading zeroes
// https://datatracker.ietf.org/doc/html/rfc9110#name-content-length
&& resp_header
.headers
.get(CONTENT_LENGTH)
.is_some_and(|cl| cl.as_bytes().iter().any(|b| *b != b'0'))
{
let _ = tx
.send(HttpTask::Failed(
Error::explain(H2Error, "non-zero content-length on EOS headers frame")
.into_up(),
))
.await;
return Ok(());
}
tx.send(HttpTask::Header(resp_header, eos))
.await
.or_err(InternalError, "sending h2 headers to pipe")?;
}
Err(e) => {
// If upstream errored, then push error to downstream and then quit
// Don't care if send fails (which means downstream already gone)
// we were still able to retrieve the headers, so try sending
let _ = tx.send(HttpTask::Header(resp_header, false)).await;
let _ = tx.send(HttpTask::Failed(e.into_up())).await;
return Ok(());
}
}
while let Some(chunk) = client
.read_response_body()
.await
.map_err(|e| e.into_up())
.transpose()
{
let data = match chunk {
Ok(d) => d,
Err(e) => {
// Push the error to downstream and then quit
let _ = tx.send(HttpTask::Failed(e.into_up())).await;
// Downstream should consume all remaining data and handle the error
return Ok(());
}
};
match client.check_response_end_or_error() {
Ok(eos) => {
if data.is_empty() && !eos {
/* it is normal to get 0 bytes because of multi-chunk
* don't write 0 bytes to downstream since it will be
* misread as the terminating chunk */
continue;
}
tx.send(HttpTask::Body(Some(data), eos))
.await
.or_err(InternalError, "sending h2 body to pipe")?;
}
Err(e) => {
// Similar to above, push the error to downstream and then quit
let _ = tx.send(HttpTask::Failed(e.into_up())).await;
return Ok(());
}
}
}
// attempt to get trailers
let trailers = match client.read_trailers().await {
Ok(t) => t,
Err(e) => {
// Similar to above, push the error to downstream and then quit
let _ = tx.send(HttpTask::Failed(e.into_up())).await;
return Ok(());
}
};
let trailers = trailers.map(Box::new);
if trailers.is_some() {
tx.send(HttpTask::Trailer(trailers))
.await
.or_err(InternalError, "sending h2 trailer to pipe")?;
}
tx.send(HttpTask::Done)
.await
.unwrap_or_else(|_| debug!("h2 to h1 channel closed!"));
Ok(())
}
#[test]
fn test_update_authority() {
let mut parts = http::request::Builder::new()
.body(())
.unwrap()
.into_parts()
.0;
update_h2_scheme_authority(&mut parts, b"example.com", true).unwrap();
assert_eq!("example.com", parts.uri.authority().unwrap());
update_h2_scheme_authority(&mut parts, b"example.com:456", true).unwrap();
assert_eq!("example.com:456", parts.uri.authority().unwrap());
update_h2_scheme_authority(&mut parts, b"example.com:", true).unwrap();
assert_eq!("example.com:", parts.uri.authority().unwrap());
update_h2_scheme_authority(&mut parts, b"example.com:123:345", true).unwrap();
assert_eq!("example.com:123", parts.uri.authority().unwrap());
update_h2_scheme_authority(&mut parts, b"[::1]", true).unwrap();
assert_eq!("[::1]", parts.uri.authority().unwrap());
// verify scheme
update_h2_scheme_authority(&mut parts, b"example.com", true).unwrap();
assert_eq!("https://example.com", parts.uri);
update_h2_scheme_authority(&mut parts, b"example.com", false).unwrap();
assert_eq!("http://example.com", parts.uri);
}