Awesome Cursor Rules Collection

Showing 853-864 of 2626 matches

TypeScript
You are an expert full-stack web developer specializing in TypeScript, Next.js (App Router), React, Shadcn UI, Radix UI, Tailwind CSS, and Framer Motion. You always use the latest stable versions of these technologies and adhere to the latest features and best practices.

---

## Code Style and Structure
- Write clear, concise, readable TypeScript code with accurate examples.  
- Use functional and declarative programming patterns; avoid classes.  
- Prefer modular, reusable code over duplication, with descriptive variable names (e.g., `isLoading`, `hasError`).  
- Structure files effectively: exported components, subcomponents, helpers, static content, and types.  
- Always add loading and error states to data-fetching components. Implement error handling and logging.  

---

## Naming Conventions
- Use kebab-case for file and directory names (e.g., `components/auth-wizard`).  
- All components should go in `components` and be named like `new-component.tsx`.  
- Favor named exports for components.  

---

## TypeScript Usage
- Use TypeScript for all code; prefer interfaces over types.  
- Avoid enums; use maps instead.  
- Use functional components with TypeScript interfaces.  

---

## Syntax and Formatting
- Use the `function` keyword for pure functions.  
- Avoid unnecessary curly braces in conditionals; use concise syntax for simple statements.  
- Use declarative JSX.  

---

## UI and Styling
- Use Shadcn UI, Radix UI, and Tailwind CSS for consistent, scalable, and responsive design.  
- Implement responsive design with Tailwind CSS, using a mobile-first approach.  
- Focus on accessible, semantic HTML and follow accessibility best practices.  

---

## Performance Optimization
- Minimize `use client`, `useEffect`, and `setState`; favor React Server Components (RSC).  
- Wrap client components in Suspense with a fallback.  
- Use dynamic loading for non-critical components.  
- Optimize images with the WebP format, size specifications, and lazy loading.  
- Optimize Web Vitals (LCP, CLS, FID).  

---

## Key Conventions
- Use `nuqs` for URL search parameter state management.  
- Limit `use client`:  
  - Favor server components and Next.js SSR.  
  - Use only for Web API access in isolated components.  
  - Avoid for data fetching or state management.  
- Follow Next.js documentation for data fetching, rendering, and routing.  
- While creating placeholder images as a part of your seed data, use `https://placekitten.com`.  

---

## Project Structure
- Place both the `/app` and `/components` folders in the root `/` directory.  
- Maintain a clean, modular, and scalable structure to adhere to industry standards:  
  - Simplify navigation and management of components and pages.  
  - Provide a clear separation between application logic (`/app`) and UI components (`/components`).  
  - Improve readability, reusability, and scalability as the application grows.  
  - Adhere to the principle of separation of concerns.  

---

## Components Organization
Within the `/components` folder, organize components by type or feature:  

### By Type  
Group components like forms, buttons, layout elements, etc.  

### By Feature  
For larger applications, group components related to specific features or domains.  

Example:  
```
/components
├── /ui
│   ├── /Button
│   ├── /Modal
│   └── /Card
├── /forms
│   ├── /TextField
│   └── /Select
└── /layout
    ├── /Navbar
    └── /Footer
```

- Private Components: For components used only within specific pages, you can create a `_components` folder within the relevant `/app` subdirectory.  
- Shared Components: The `/components` folder should contain reusable components used across multiple pages or features.  
- Modular Approach: As your project grows, adopt a modular structure, where each feature or domain has its own folder containing components, hooks, and utilities specific to that feature.  

---

## General Guidelines
- Follow requirements carefully and implement fully functional, bug-free, secure, and performant solutions.  
- Prioritize readability over raw performance but ensure efficient and maintainable code.  
- If uncertain, acknowledge knowledge gaps instead of guessing.  
- Always reference filenames, keep answers concise, and minimize unnecessary prose.
- Use yarn package manager for installing dependencies.
css
golang
javascript
next.js
radix-ui
react
shadcn/ui
tailwindcss
+2 more
Agamya-Samuel/shuats-quiz-app

Used in 1 repository

Python
# Rules

---

## Writing Markdown

---

- In Markdown, fenced code blocks should be surrounded by blank lines

### Example of a properly formatted fenced code block

   ```md
   1. Example of a properly formatted fenced code block

      ```bash
      echo "Like this!"
      ```

   ```

## Frontmatter

### Aliases

- For translated content, only the English (en) version should have `/docs` root aliases. Other language versions should only have their language-specific aliases (e.g. `/ja/docs` for Japanese)

#### Example of a properly formatted aliases in the frontmatter

```yaml
<!-- For the English version -->

---
title: "Tracking Training Changes"
description: "Guide to keeping track of changes during LoRA training using automated scripts"
summary: "Learn how to use automated scripts to organize and track your LoRA training process, including managing model versions, backing up configurations, and maintaining clean training workspaces."
weight: 8
bookToC: false
bookFlatSection: false
aliases:
  - /en/docs/yiff_toolkit/lora_training/Tracking-Training-Changes/
  - /en/docs/yiff_toolkit/lora_training/Tracking-Training-Changes
  - /en/docs/yiff_toolkit/lora_training/Tracking_Training_Changes/
  - /en/docs/yiff_toolkit/lora_training/Tracking_Training_Changes
  - "/en/docs/yiff_toolkit/lora_training/Tracking Training Changes/"
  - "/en/docs/yiff_toolkit/lora_training/Tracking Training Changes"
  - /docs/yiff_toolkit/lora_training/Tracking-Training-Changes/
  - /docs/yiff_toolkit/lora_training/Tracking-Training-Changes
  - /docs/yiff_toolkit/lora_training/Tracking_Training_Changes/
  - /docs/yiff_toolkit/lora_training/Tracking_Training_Changes
  - "/docs/yiff_toolkit/lora_training/Tracking Training Changes/"
  - "/docs/yiff_toolkit/lora_training/Tracking Training Changes"
---

<!-- For the Hungarian version -->

---
title: "Tanítási Változások Követése"
description: "Útmutató a LoRA tanítás során történő változások követéséhez automatizált szkriptek segítségével"
summary: "Ismerje meg, hogyan használhat automatizált szkripteket a LoRA tanítási folyamat rendszerezéséhez és követéséhez, beleértve a modellverziók kezelését, a konfigurációk biztonsági mentését és a tiszta tanítási munkakörnyezet fenntartását."
weight: 8
bookToC: false
bookFlatSection: false
aliases:
  - /hu/docs/yiff_toolkit/lora_training/Tracking-Training-Changes/
  - /hu/docs/yiff_toolkit/lora_training/Tracking-Training-Changes
  - /hu/docs/yiff_toolkit/lora_training/Tracking_Training_Changes/
  - /hu/docs/yiff_toolkit/lora_training/Tracking_Training_Changes
  - "/hu/docs/yiff_toolkit/lora_training/Tracking Training Changes/"
  - "/hu/docs/yiff_toolkit/lora_training/Tracking Training Changes"
---
```

## Hugo Templates

- Ignore linter errors for Hugo template syntax like `{{ . | safeJS }}` within `<script>` tags - these are valid Hugo template expressions

## SCSS/CSS Styling

### File Organization

- SCSS partials should be prefixed with an underscore (e.g. `_blurhash.scss`)
- All SCSS partials should be imported in `_main.scss`
- Each component should have its own SCSS partial file
- Global styles and variables should be in `_main.scss`

### Import Order

The import order in `_main.scss` should be:

1. Variables and mixins
2. Base styles
3. Components
4. Utilities
5. Third-party styles

Example:

```scss
// Variables and mixins first
@import "variables";
@import "mixins";

// Base styles
@import "defaults";
@import "fonts";

// Components
@import "markdown";
@import "shortcodes";
@import "images";

// Utilities
@import "utils";
@import "print";

// Third-party
@import "blurhash";
```

### Naming Conventions

- Use kebab-case for class names (e.g. `image-container`)
- Prefix classes with component name for scoping (e.g. `book-menu`, `book-toc`)
- Use BEM-like naming for modifiers (e.g. `book-menu--collapsed`)

### Media Queries

- Define breakpoints as variables in `_main.scss`
- Use mobile-first approach
- Include media queries within the component styles

Example:

```scss
.component {
  // Mobile styles first
  width: 100%;
  
  // Then tablet
  @media screen and (min-width: $tablet-breakpoint) {
    width: 50%;
  }
  
  // Then desktop
  @media screen and (min-width: $desktop-breakpoint) {
    width: 33.33%;
  }
}
```

### Performance

- Avoid deeply nested selectors (max 3 levels)
- Use CSS Grid and Flexbox for layouts
- Minimize use of `!important`
- Use CSS variables for theme colors and values that change with user preferences

Example:

```scss
// Good
.image-container {
  .blur-hash {
    width: 100%;
  }
}

// Bad - too deeply nested
.container {
  .wrapper {
    .image {
      .blur-hash {
        width: 100%;
      }
    }
  }
}
```

### Theme Variables

- Define theme variables in `:root`
- Use semantic naming for variables
- Include both light and dark theme values

Example:

```scss
:root {
  // Light theme
  --body-background: #ffffff;
  --body-font-color: #000000;
  
  // Dark theme
  @media (prefers-color-scheme: dark) {
    --body-background: #1a1a1a;
    --body-font-color: #ffffff;
  }
}
```
c
c++
cmake
css
cuda
express.js
golang
html
+8 more

First seen in:

ka-de/cringe.live

Used in 1 repository

Rust
# Connection pooling and reuse

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#connection-pooling-and-reuse)

When the request to a Peer (upstream server) is finished, the connection to that peer is kept alive and added to a connection pool to be _reused_ by subsequent requests. This happens automatically without any special configuration.

Requests that reuse previously established connections avoid the latency and compute cost of setting up a new connection, improving the Pingora server's overall performance and scalability.

## Same Peer

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#same-peer)

Only the connections to the exact same Peer can be reused by a request. For correctness and security reasons, two Peers are the same if and only if all the following attributes are the same

-   IP:port
-   scheme
-   SNI
-   client cert
-   verify cert
-   verify hostname
-   alternative\_cn
-   proxy settings

## Disable pooling

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#disable-pooling)

To disable connection pooling and reuse to a certain Peer, just set the idle_timeout to 0 seconds to all requests using that Peer.

## Failure

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#failure)

A connection is considered not reusable if errors happen during the request.

# Daemonization

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/daemon.md#daemonization)

When a Pingora server is configured to run as a daemon, after its bootstrapping, it will move itself to the background and optionally change to run under the configured user and group. The pid_file option comes handy in this case for the user to track the PID of the daemon in the background.

Daemonization also allows the server to perform privileged actions like loading secrets and then switch to an unprivileged user before accepting any requests from the network.

This process happens in the run_forever() call. Because daemonization involves fork(), certain things like threads created before this call are likely lost.


# Configuration

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/conf.md#configuration)

A Pingora configuration file is a list of Pingora settings in yaml format.

Example

\---
version: 1
threads: 2
pid\_file: /run/pingora.pid
upgrade\_sock: /tmp/pingora\_upgrade.sock
user: nobody
group: webusers

## Settings

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/conf.md#settings)

Key

meaning

value type

version

the version of the conf, currently it is a constant 1

number

pid\_file

The path to the pid file

string

daemon

whether to run the server in the background

bool

error\_log

the path to error log output file. STDERR is used if not set

string

upgrade\_sock

the path to the upgrade socket.

string

threads

number of threads per service

number

user

the user the pingora server should be run under after daemonization

string

group

the group the pingora server should be run under after daemonization

string

client\_bind\_to\_ipv4

source IPv4 addresses to bind to when connecting to server

list of string

client\_bind\_to\_ipv6

source IPv6 addresses to bind to when connecting to server

list of string

ca\_file

The path to the root CA file

string

work\_stealing

Enable work stealing runtime (default true). See Pingora runtime (WIP) section for more info

bool

upstream\_keepalive\_pool\_size

The number of total connections to keep in the connection pool

number

## Extension

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/conf.md#extension)

Any unknown settings will be ignored. This allows extending the conf file to add and pass user defined settings. See User defined configuration section.


# Sharing state across phases with CTX

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/ctx.md#sharing-state-across-phases-with-ctx)

## Using CTX

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/ctx.md#using-ctx)

The custom filters users implement in different phases of the request don't interact with each other directly. In order to share information and state across the filters, users can define a CTX struct. Each request owns a single CTX object. All the filters are able to read and update members of the CTX object. The CTX object will be dropped at the end of the request.

### Example

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/ctx.md#example)

In the following example, the proxy parses the request header in the request_filter phase, it stores the boolean flag so that later in the upstream_peer phase the flag is used to decide which server to route traffic to. (Technically, the header can be parsed in upstream_peer phase, but we just do it in an earlier phase just for the demonstration.)

pub struct MyProxy();

pub struct MyCtx {
    beta\_user: bool,
}

fn check\_beta\_user(req: &pingora\_http::RequestHeader) -> bool {
    // some simple logic to check if user is beta
    req.headers.get("beta-flag").is\_some()
}

#\[async\_trait\]
impl ProxyHttp for MyProxy {
    type CTX = MyCtx;
    fn new\_ctx(&self) -> Self::CTX {
        MyCtx { beta\_user: false }
    }

    async fn request\_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool\> {
        ctx.beta\_user = check\_beta\_user(session.req\_header());
        Ok(false)
    }

    async fn upstream\_peer(
        &self,
        \_session: &mut Session,
        ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer\>\> {
        let addr = if ctx.beta\_user {
            info!("I'm a beta user");
            ("1.0.0.1", 443)
        } else {
            ("1.1.1.1", 443)
        };

        let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to\_string()));
        Ok(peer)
    }
}

## Sharing state across requests

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/ctx.md#sharing-state-across-requests)

Sharing state such as a counter, cache and other info across requests is common. There is nothing special needed for sharing resources and data across requests in Pingora. Arc, static or any other mechanism can be used.

### Example

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/ctx.md#example-1)

Let's modify the example above to track the number of beta visitors as well as the number of total visitors. The counters can either be defined in the MyProxy struct itself or defined as a global variable. Because the counters can be concurrently accessed, Mutex is used here.

// global counter
static REQ\_COUNTER: Mutex<usize\> = Mutex::new(0);

pub struct MyProxy {
    // counter for the service
    beta\_counter: Mutex<usize\>, // AtomicUsize works too
}

pub struct MyCtx {
    beta\_user: bool,
}

fn check\_beta\_user(req: &pingora\_http::RequestHeader) -> bool {
    // some simple logic to check if user is beta
    req.headers.get("beta-flag").is\_some()
}

#\[async\_trait\]
impl ProxyHttp for MyProxy {
    type CTX = MyCtx;
    fn new\_ctx(&self) -> Self::CTX {
        MyCtx { beta\_user: false }
    }

    async fn request\_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool\> {
        ctx.beta\_user = check\_beta\_user(session.req\_header());
        Ok(false)
    }

    async fn upstream\_peer(
        &self,
        \_session: &mut Session,
        ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer\>\> {
        let mut req\_counter = REQ\_COUNTER.lock().unwrap();
        \*req\_counter += 1;

        let addr = if ctx.beta\_user {
            let mut beta\_count = self.beta\_counter.lock().unwrap();
            \*beta\_count += 1;
            info!("I'm a beta user #{beta\_count}");
            ("1.0.0.1", 443)
        } else {
            info!("I'm an user #{req\_counter}");
            ("1.1.1.1", 443)
        };

        let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to\_string()));
        Ok(peer)
    }
}

The complete example can be found under [pingora-proxy/examples/ctx.rs](https://github.com/cloudflare/pingora/blob/main/pingora-proxy/examples/ctx.rs). You can run it using cargo:

    RUST_LOG=INFO cargo run --example ctx


# How to return errors

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/errors.md#how-to-return-errors)

For easy error handling, the pingora-error crate exports a custom Result type used throughout other Pingora crates.

The Error struct used in this Result's error variant is a wrapper around arbitrary error types. It allows the user to tag the source of the underlying error and attach other custom context info.

Users will often need to return errors by propagating an existing error or creating a wholly new one. pingora-error makes this easy with its error building functions.

## Examples

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/errors.md#examples)

For example, one could return an error when an expected header is not present:

fn validate\_req\_header(req: &RequestHeader) -> Result<()\> {
    // validate that the \host\ header exists
    req.headers()
        .get(http::header::HOST)
        .ok\_or\_else(|| Error::explain(InvalidHTTPHeader, "No host header detected"))
}

impl MyServer {
    pub async fn handle\_request\_filter(
        &self,
        http\_session: &mut Session,
        ctx: &mut CTX,
    ) -> Result<bool\> {
        validate\_req\_header(session.req\_header()?).or\_err(HTTPStatus(400), "Missing required headers")?;
        Ok(true)
    }
}

validate_req_header returns an Error if the host header is not found, using Error::explain to create a new Error along with an associated type (InvalidHTTPHeader) and helpful context that may be logged in an error log.

This error will eventually propagate to the request filter, where it is returned as a new HTTPStatus error using or_err. (As part of the default pingora-proxy fail_to_proxy() phase, not only will this error be logged, but it will result in sending a 400 Bad Request response downstream.)

Note that the original causing error will be visible in the error logs as well. or_err wraps the original causing error in a new one with additional context, but Error's Display implementation also prints the chain of causing errors.

## Guidelines

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/errors.md#guidelines)

An error has a _type_ (e.g. ConnectionClosed), a _source_ (e.g. Upstream, Downstream, Internal), and optionally, a _cause_ (another wrapped error) and a _context_ (arbitrary user-provided string details).

A minimal error can be created using functions like new_in / new_up / new_down, each of which specifies a source and asks the user to provide a type.

Generally speaking:

-   To create a new error, without a direct cause but with more context, use Error::explain. You can also use explain_err on a Result to replace the potential error inside it with a new one.
-   To wrap a causing error in a new one with more context, use Error::because. You can also use or_err on a Result to replace the potential error inside it by wrapping the original one.

## Retry

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/errors.md#retry)

Errors can be "retry-able." If the error is retry-able, pingora-proxy will be allowed to retry the upstream request. Some errors are only retry-able on [reused connections](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md), e.g. to handle situations where the remote end has dropped a connection we attempted to reuse.

By default a newly created Error either takes on its direct causing error's retry status, or, if left unspecified, is considered not retry-able.


# Error logging

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/error_log.md#error-logging)

Pingora libraries are built to expect issues like disconnects, timeouts and invalid inputs from the network. A common way to record these issues are to output them in error log (STDERR or log files).

## Log level guidelines

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/error_log.md#log-level-guidelines)

Pingora adopts the idea behind [log](https://docs.rs/log/latest/log/). There are five log levels:

-   error: This level should be used when the error stops the request from being handled correctly. For example when the server we try to connect to is offline.
-   warning: This level should be used when an error occurs but the system recovers from it. For example when the primary DNS timed out but the system is able to query the secondary DNS.
-   info: Pingora logs when the server is starting up or shutting down.
-   debug: Internal details. This log level is not compiled in release builds.
-   trace: Fine-grained internal details. This log level is not compiled in release builds.

The pingora-proxy crate has a well-defined interface to log errors, so that users don't have to manually log common proxy errors. See its guide for more details.

# Handling failures and failover

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/failover.md#handling-failures-and-failover)

Pingora-proxy allows users to define how to handle failures throughout the life of a proxied request.

When a failure happens before the response header is sent downstream, users have a few options:

1.  Send an error page downstream and then give up.
2.  Retry the same upstream again.
3.  Try another upstream if applicable.

Otherwise, once the response header is already sent downstream, there is nothing the proxy can do other than logging an error and then giving up on the request.

## Retry / Failover

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/failover.md#retry--failover)

In order to implement retry or failover, fail_to_connect() / error_while_proxy() needs to mark the error as "retry-able." For failover, fail_to_connect() / error_while_proxy() also needs to update the CTX to tell upstream_peer() not to use the same Peer again.

### Safety

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/failover.md#safety)

In general, idempotent HTTP requests, e.g., GET, are safe to retry. Other requests, e.g., POST, are not safe to retry if the requests have already been sent. When fail_to_connect() is called, pingora-proxy guarantees that nothing was sent upstream. Users are not recommended to retry a non-idempotent request after error_while_proxy() unless they know the upstream server enough to know whether it is safe.

### Example

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/failover.md#example)

In the following example we set a tries variable on the CTX to track how many connection attempts we've made. When setting our peer in upstream_peer we check if tries is less than one and connect to 192.0.2.1. On connect failure we increment tries in fail_to_connect and set e.set_retry(true) which tells Pingora this is a retryable error. On retry, we enter upstream_peer again and this time connect to 1.1.1.1. If we're unable to connect to 1.1.1.1 we return a 502 since we only set e.set_retry(true) in fail_to_connect when tries is zero.

pub struct MyProxy();

pub struct MyCtx {
    tries: usize,
}

#\[async\_trait\]
impl ProxyHttp for MyProxy {
    type CTX = MyCtx;
    fn new\_ctx(&self) -> Self::CTX {
        MyCtx { tries: 0 }
    }

    fn fail\_to\_connect(
        &self,
        \_session: &mut Session,
        \_peer: &HttpPeer,
        ctx: &mut Self::CTX,
        mut e: Box<Error\>,
    ) -> Box<Error\> {
        if ctx.tries > 0 {
            return e;
        }
        ctx.tries += 1;
        e.set\_retry(true);
        e
    }

    async fn upstream\_peer(
        &self,
        \_session: &mut Session,
        ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer\>\> {
        let addr = if ctx.tries < 1 {
            ("192.0.2.1", 443)
        } else {
            ("1.1.1.1", 443)
        };

        let mut peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to\_string()));
        peer.options.connection\_timeout = Some(Duration::from\_millis(100));
        Ok(peer)
    }
}


# Graceful restart and shutdown

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/graceful.md#graceful-restart-and-shutdown)

Graceful restart, upgrade, and shutdown mechanisms are very commonly used to avoid errors or downtime when releasing new versions of Pingora servers.

Pingora graceful upgrade mechanism guarantees the following:

-   A request is guaranteed to be handled either by the old server instance or the new one. No request will see connection refused when trying to connect to the server endpoints.
-   A request that can finish within the grace period is guaranteed not to be terminated.

## How to graceful upgrade

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/graceful.md#how-to-graceful-upgrade)

### Step 0

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/graceful.md#step-0)

Configure the upgrade socket. The old and new server need to agree on the same path to this socket. See configuration manual for details.

### Step 1

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/graceful.md#step-1)

Start the new instance with the --upgrade CLI option. The new instance will not try to listen to the service endpoint right away. It will try to acquire the listening socket from the old instance instead.

### Step 2

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/graceful.md#step-2)

Send SIGQUIT signal to the old instance. The old instance will start to transfer the listening socket to the new instance.

Once step 2 is successful, the new instance will start to handle new incoming connections right away. Meanwhile, the old instance will enter its graceful shutdown mode. It waits a short period of time (to give the new instance time to initialize and prepare to handle traffic), after which it will not accept any new connections.

# Pingora Internals

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#pingora-internals)

(Special thanks to [James Munns](https://github.com/jamesmunns) for writing this section)

## Starting the Server

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#starting-the-server)

The pingora system starts by spawning a _server_. The server is responsible for starting _services_, and listening for termination events.

                                   ┌───────────┐
                        ┌─────────>│  Service  │
                        │          └───────────┘
    ┌────────┐          │          ┌───────────┐
    │ Server │──Spawns──┼─────────>│  Service  │
    └────────┘          │          └───────────┘
                        │          ┌───────────┐
                        └─────────>│  Service  │
                                   └───────────┘


After spawning the _services_, the server continues to listen to a termination event, which it will propagate to the created services.

## Services

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#services)

_Services_ are entities that handle listening to given sockets, and perform the core functionality. A _service_ is tied to a particular protocol and set of options.

> NOTE: there are also "background" services, which just do _stuff_, and aren't necessarily listening to a socket. For now we're just talking about listener services.

Each service has its own threadpool/tokio runtime, with a number of threads based on the configured value. Worker threads are not shared cross-service. Service runtime threadpools may be work-stealing (tokio-default), or non-work-stealing (N isolated single threaded runtimes).

    ┌─────────────────────────┐
    │ ┌─────────────────────┐ │
    │ │┌─────────┬─────────┐│ │
    │ ││  Conn   │  Conn   ││ │
    │ │├─────────┼─────────┤│ │
    │ ││Endpoint │Endpoint ││ │
    │ │├─────────┴─────────┤│ │
    │ ││     Listeners     ││ │
    │ │├─────────┬─────────┤│ │
    │ ││ Worker  │ Worker  ││ │
    │ ││ Thread  │ Thread  ││ │
    │ │├─────────┴─────────┤│ │
    │ ││  Tokio Executor   ││ │
    │ │└───────────────────┘│ │
    │ └─────────────────────┘ │
    │ ┌───────┐               │
    └─┤Service├───────────────┘
      └───────┘


## Service Listeners

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#service-listeners)

At startup, each Service is assigned a set of downstream endpoints that they listen to. A single service may listen to more than one endpoint. The Server also passes along any relevant configuration, including TLS settings if relevant.

These endpoints are converted into listening sockets, called TransportStacks. Each TransportStack is assigned to an async task within that service's executor.

                                     ┌───────────────────┐
                                     │┌─────────────────┐│    ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
     ┌─────────┐                     ││ TransportStack  ││                                ┌────────────────────┐│
    ┌┤Listeners├────────┐            ││                 ││    │                       │  ││                    │
    │└─────────┘        │            ││ (Listener, TLS  │├──────spawn(run_endpoint())────>│ Service<ServerApp> ││
    │┌─────────────────┐│            ││    Acceptor,    ││    │                       │  ││                    │
    ││    Endpoint     ││            ││   UpgradeFDs)   ││                                └────────────────────┘│
    ││   addr/ports    ││            │├─────────────────┤│    │                       │  │
    ││ + TLS Settings  ││            ││ TransportStack  ││                                ┌────────────────────┐│
    │├─────────────────┤│            ││                 ││    │                       │  ││                    │
    ││    Endpoint     ││──build()─> ││ (Listener, TLS  │├──────spawn(run_endpoint())────>│ Service<ServerApp> ││
    ││   addr/ports    ││            ││    Acceptor,    ││    │                       │  ││                    │
    ││ + TLS Settings  ││            ││   UpgradeFDs)   ││                                └────────────────────┘│
    │├─────────────────┤│            │├─────────────────┤│    │                       │  │
    ││    Endpoint     ││            ││ TransportStack  ││                                ┌────────────────────┐│
    ││   addr/ports    ││            ││                 ││    │                       │  ││                    │
    ││ + TLS Settings  ││            ││ (Listener, TLS  │├──────spawn(run_endpoint())────>│ Service<ServerApp> ││
    │└─────────────────┘│            ││    Acceptor,    ││    │                       │  ││                    │
    └───────────────────┘            ││   UpgradeFDs)   ││                                └────────────────────┘│
                                     │└─────────────────┘│    │ ┌───────────────┐     │  │ ┌──────────────┐
                                     └───────────────────┘     ─│start_service()│─ ─ ─    ─│ Worker Tasks ├ ─ ─ ┘
                                                                └───────────────┘          └──────────────┘


## Downstream connection lifecycle

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#downstream-connection-lifecycle)

Each service processes incoming connections by spawning a task-per-connection. These connections are held open as long as there are new events to be handled.

                                      ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐

                                      │  ┌───────────────┐   ┌────────────────┐   ┌─────────────────┐    ┌─────────────┐  │
    ┌────────────────────┐               │ UninitStream  │   │    Service     │   │       App       │    │  Task Ends  │
    │                    │            │  │ ::handshake() │──>│::handle_event()│──>│ ::process_new() │──┬>│             │  │
    │ Service<ServerApp> │──spawn()──>   └───────────────┘   └────────────────┘   └─────────────────┘  │ └─────────────┘
    │                    │            │                                                    ▲           │                  │
    └────────────────────┘                                                                 │         while
                                      │                                                    └─────────reuse                │
                                         ┌───────────────────────────┐
                                      └ ─│  Task on Service Runtime  │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
                                         └───────────────────────────┘


## What is a proxy then?

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#what-is-a-proxy-then)

Interestingly, the pingora Server itself has no particular notion of a Proxy.

Instead, it only thinks in terms of Services, which are expected to contain a particular implementor of the ServiceApp trait.

For example, this is how an HttpProxy struct, from the pingora-proxy crate, "becomes" a Service spawned by the Server:

    ┌─────────────┐
    │  HttpProxy  │
    │  (struct)   │
    └─────────────┘
           │
       implements   ┌─────────────┐
           │        │HttpServerApp│
           └───────>│   (trait)   │
                    └─────────────┘
                           │
                       implements   ┌─────────────┐
                           │        │  ServerApp  │
                           └───────>│   (trait)   │
                                    └─────────────┘
                                           │
                                       contained    ┌─────────────────────┐
                                         within     │                     │
                                           └───────>│ Service<ServiceApp> │
                                                    │                     │
                                                    └─────────────────────┘


Different functionalities and helpers are provided at different layers in this representation.

    ┌─────────────┐        ┌──────────────────────────────────────┐
    │  HttpProxy  │        │Handles high level Proxying workflow, │
    │  (struct)   │─ ─ ─ ─ │   customizable via ProxyHttp trait   │
    └──────┬──────┘        └──────────────────────────────────────┘
           │
    ┌──────▼──────┐        ┌──────────────────────────────────────┐
    │HttpServerApp│        │ Handles selection of H1 vs H2 stream │
    │   (trait)   │─ ─ ─ ─ │     handling, incl H2 handshake      │
    └──────┬──────┘        └──────────────────────────────────────┘
           │
    ┌──────▼──────┐        ┌──────────────────────────────────────┐
    │  ServerApp  │        │ Handles dispatching of App instances │
    │   (trait)   │─ ─ ─ ─ │   as individual tasks, per Session   │
    └──────┬──────┘        └──────────────────────────────────────┘
           │
    ┌──────▼──────┐        ┌──────────────────────────────────────┐
    │ Service<A>  │        │ Handles dispatching of App instances │
    │  (struct)   │─ ─ ─ ─ │  as individual tasks, per Listener   │
    └─────────────┘        └──────────────────────────────────────┘


The HttpProxy struct handles the high level workflow of proxying an HTTP connection

It uses the ProxyHttp (note the flipped wording order!) **trait** to allow customization at each of the following steps (note: taken from [the phase chart](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase_chart.md) doc):

Loading

 graph TD;
    start("new request")-->request\_filter;
    request\_filter-->upstream\_peer;

    upstream\_peer-->Connect{{IO: connect to upstream}};

    Connect--connection success-->connected\_to\_upstream;
    Connect--connection failure-->fail\_to\_connect;

    connected\_to\_upstream-->upstream\_request\_filter;
    upstream\_request\_filter --> SendReq{{IO: send request to upstream}};
    SendReq-->RecvResp{{IO: read response from upstream}};
    RecvResp-->upstream\_response\_filter-->response\_filter-->upstream\_response\_body\_filter-->response\_body\_filter-->logging-->endreq("request done");

    fail\_to\_connect --can retry-->upstream\_peer;
    fail\_to\_connect --can't retry-->fail\_to\_proxy--send error response-->logging;

    RecvResp--failure-->IOFailure;
    SendReq--failure-->IOFailure;
    error\_while\_proxy--can retry-->upstream\_peer;
    error\_while\_proxy--can't retry-->fail\_to\_proxy;

    request\_filter --send response-->logging


    Error>any response filter error\]-->error\_while\_proxy
    IOFailure>IO error\]-->error\_while\_proxy

## Zooming out

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#zooming-out)

Before we zoom in, it's probably good to zoom out and remind ourselves how a proxy generally works:

    ┌────────────┐          ┌─────────────┐         ┌────────────┐
    │ Downstream │          │    Proxy    │         │  Upstream  │
    │   Client   │─────────>│             │────────>│   Server   │
    └────────────┘          └─────────────┘         └────────────┘


The proxy will be taking connections from the **Downstream** client, and (if everything goes right), establishing a connection with the appropriate **Upstream** server. This selected upstream server is referred to as the **Peer**.

Once the connection is established, the Downstream and Upstream can communicate bidirectionally.

So far, the discussion of Server, Services, and Listeners have focused on the LEFT half of this diagram, handling incoming Downstream connections, and getting it TO the proxy component.

Next, we'll look at the RIGHT half of this diagram, connecting to Upstreams.

## Managing the Upstream

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#managing-the-upstream)

Connections to Upstream Peers are made through Connectors. This is not a specific type or trait, but more of a "style".

Connectors are responsible for a few things:

-   Establishing a connection with a Peer
-   Maintaining a connection pool with the Peer, allowing for connection reuse across:
    -   Multiple requests from a single downstream client
    -   Multiple requests from different downstream clients
-   Measuring health of connections, for connections like H2, which perform regular pings
-   Handling protocols with multiple poolable layers, like H2
-   Caching, if relevant to the protocol and enabled
-   Compression, if relevant to the protocol and enabled

Now in context, we can see how each end of the Proxy is handled:

    ┌────────────┐          ┌─────────────┐         ┌────────────┐
    │ Downstream │       ┌ ─│─   Proxy  ┌ ┼ ─       │  Upstream  │
    │   Client   │─────────>│ │           │──┼─────>│   Server   │
    └────────────┘       │  └───────────┼─┘         └────────────┘
                          ─ ─ ┘          ─ ─ ┘
                            ▲              ▲
                         ┌──┘              └──┐
                         │                    │
                    ┌ ─ ─ ─ ─ ┐         ┌ ─ ─ ─ ─ ─
                     Listeners           Connectors│
                    └ ─ ─ ─ ─ ┘         └ ─ ─ ─ ─ ─


## What about multiple peers?

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md#what-about-multiple-peers)

Connectors only handle the connection to a single peer, so selecting one of potentially multiple Peers is actually handled one level up, in the upstream_peer() method of the ProxyHttp trait.


# Examples: taking control of the request

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md#examples-taking-control-of-the-request)

In this section we will go through how to route, modify or reject requests.

## Routing

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md#routing)

Any information from the request can be used to make routing decision. Pingora doesn't impose any constraints on how users could implement their own routing logic.

In the following example, the proxy sends traffic to 1.0.0.1 only when the request path start with /family/. All the other requests are routed to 1.1.1.1.

pub struct MyGateway;

#\[async\_trait\]
impl ProxyHttp for MyGateway {
    type CTX = ();
    fn new\_ctx(&self) -> Self::CTX {}

    async fn upstream\_peer(
        &self,
        session: &mut Session,
        \_ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer\>\> {
        let addr = if session.req\_header().uri.path().starts\_with("/family/") {
            ("1.0.0.1", 443)
        } else {
            ("1.1.1.1", 443)
        };

        info!("connecting to {addr:?}");

        let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to\_string()));
        Ok(peer)
    }
}

## Modifying headers

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md#modifying-headers)

Both request and response headers can be added, removed or modified in their corresponding phases. In the following example, we add logic to the response_filter phase to update the Server header and remove the alt-svc header.

#\[async\_trait\]
impl ProxyHttp for MyGateway {
    ...
    async fn response\_filter(
        &self,
        \_session: &mut Session,
        upstream\_response: &mut ResponseHeader,
        \_ctx: &mut Self::CTX,
    ) -> Result<()\>
    where
        Self::CTX: Send + Sync,
    {
        // replace existing header if any
        upstream\_response
            .insert\_header("Server", "MyGateway")
            .unwrap();
        // because we don't support h3
        upstream\_response.remove\_header("alt-svc");

        Ok(())
    }
}

## Return Error pages

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md#return-error-pages)

Sometimes instead of proxying the traffic, under certain conditions, such as authentication failures, you might want the proxy to just return an error page.

fn check\_login(req: &pingora\_http::RequestHeader) -> bool {
    // implement you logic check logic here
    req.headers.get("Authorization").map(|v| v.as\_bytes()) == Some(b"password")
}

#\[async\_trait\]
impl ProxyHttp for MyGateway {
    ...
    async fn request\_filter(&self, session: &mut Session, \_ctx: &mut Self::CTX) -> Result<bool\> {
        if session.req\_header().uri.path().starts\_with("/login")
            && !check\_login(session.req\_header())
        {
            let \_ = session.respond\_error(403).await;
            // true: tell the proxy that the response is already written
            return Ok(true);
        }
        Ok(false)
    }

## Logging

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md#logging)

Logging logic can be added to the logging phase of Pingora. The logging phase runs on every request right before Pingora proxy finish processing it. This phase runs for both successful and failed requests.

In the example below, we add Prometheus metric and access logging to the proxy. In order for the metrics to be scraped, we also start a Prometheus metric server on a different port.

pub struct MyGateway {
    req\_metric: prometheus::IntCounter,
}

#\[async\_trait\]
impl ProxyHttp for MyGateway {
    ...
    async fn logging(
        &self,
        session: &mut Session,
        \_e: Option<&pingora::Error\>,
        ctx: &mut Self::CTX,
    ) {
        let response\_code = session
            .response\_written()
            .map\_or(0, |resp| resp.status.as\_u16());
        // access log
        info!(
            "{} response code: {response\_code}",
            self.request\_summary(session, ctx)
        );

        self.req\_metric.inc();
    }

fn main() {
   ...
    let mut prometheus\_service\_http =
        pingora::services::listening::Service::prometheus\_http\_service();
    prometheus\_service\_http.add\_tcp("127.0.0.1:6192");
    my\_server.add\_service(prometheus\_service\_http);

    my\_server.run\_forever();
}

# Handling panics

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/panic.md#handling-panics)

Any panic that happens to particular requests does not affect other ongoing requests or the server's ability to handle other requests. Sockets acquired by the panicking requests are dropped (closed). The panics will be captured by the tokio runtime and then ignored.

In order to monitor the panics, Pingora server has built-in Sentry integration.

my\_server.sentry = Some(
    sentry::ClientOptions{
        dsn: "SENTRY\_DSN".into\_dsn().unwrap(),
        ..Default::default()
    }
);

Even though a panic is not fatal in Pingora, it is still not the preferred way to handle failures like network timeouts. Panics should be reserved for unexpected logic errors.


# Peer: how to connect to upstream

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/peer.md#peer-how-to-connect-to-upstream)

In the upstream_peer() phase the user should return a Peer object which defines how to connect to a certain upstream.

## Peer

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/peer.md#peer)

A HttpPeer defines which upstream to connect to.

attribute

meaning

address: SocketAddr

The IP:Port to connect to

scheme: Scheme

Http or Https

sni: String

The SNI to use, Https only

proxy: Option<Proxy>

The setting to proxy the request through a [CONNECT proxy](https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/CONNECT)

client\_cert\_key: Option<Arc<CertKey>>

The client certificate to use in mTLS connections to upstream

options: PeerOptions

See below

## PeerOptions

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/peer.md#peeroptions)

A PeerOptions defines how to connect to the upstream.

attribute

meaning

bind\_to: Option<InetSocketAddr>

Which local address to bind to as the client IP

connection\_timeout: Option<Duration>

How long to wait before giving up _establishing_ a TCP connection

total\_connection\_timeout: Option<Duration>

How long to wait before giving up _establishing_ a connection including TLS handshake time

read\_timeout: Option<Duration>

How long to wait before each individual read() from upstream. The timer is reset after each read()

idle\_timeout: Option<Duration>

How long to wait before closing a idle connection waiting for connection reuse

write\_timeout: Option<Duration>

How long to wait before a write() to upstream finishes

verify\_cert: bool

Whether to check if upstream' server cert is valid and validated

verify\_hostname: bool

Whether to check if upstream server cert's CN matches the SNI

alternative\_cn: Option<String>

Accept the cert if the CN matches this name

alpn: ALPN

Which HTTP protocol to advertise during ALPN, http1.1 and/or http2

ca: Option<Arc<Box<[X509]>>>

Which Root CA to use to validate the server's cert

tcp\_keepalive: Option<TcpKeepalive>

TCP keepalive settings to upstream

## Examples

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/peer.md#examples)

TBD

# Life of a request: pingora-proxy phases and filters

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#life-of-a-request-pingora-proxy-phases-and-filters)

## Intro

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#intro)

The pingora-proxy HTTP proxy framework supports highly programmable proxy behaviors. This is done by allowing users to inject custom logic into different phases (stages) in the life of a request.

## Life of a proxied HTTP request

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#life-of-a-proxied-http-request)

1.  The life of a proxied HTTP request starts when the proxy reads the request header from the **downstream** (i.e., the client).
2.  Then, the proxy connects to the **upstream** (i.e., the remote server). This step is skipped if there is a previously established [connection to reuse](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md).
3.  The proxy then sends the request header to the upstream.
4.  Once the request header is sent, the proxy enters a duplex mode, which simultaneously proxies: a. upstream response (both header and body) to the downstream, and b. downstream request body to upstream (if any).
5.  Once the entire request/response finishes, the life of the request is ended. All resources are released. The downstream connections and the upstream connections are recycled to be reused if applicable.

## Pingora-proxy phases and filters

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#pingora-proxy-phases-and-filters)

Pingora-proxy allows users to insert arbitrary logic into the life of a request.

Loading

 graph TD;
    start("new request")-->early\_request\_filter;
    early\_request\_filter-->request\_filter;
    request\_filter-->upstream\_peer;

    upstream\_peer-->Connect{{IO: connect to upstream}};

    Connect--connection success-->connected\_to\_upstream;
    Connect--connection failure-->fail\_to\_connect;

    connected\_to\_upstream-->upstream\_request\_filter;
    upstream\_request\_filter --> request\_body\_filter;
    request\_body\_filter --> SendReq{{IO: send request to upstream}};
    SendReq-->RecvResp{{IO: read response from upstream}};
    RecvResp-->upstream\_response\_filter-->response\_filter-->upstream\_response\_body\_filter-->response\_body\_filter-->logging-->endreq("request done");

    fail\_to\_connect --can retry-->upstream\_peer;
    fail\_to\_connect --can't retry-->fail\_to\_proxy--send error response-->logging;

    RecvResp--failure-->IOFailure;
    SendReq--failure-->IOFailure;
    error\_while\_proxy--can retry-->upstream\_peer;
    error\_while\_proxy--can't retry-->fail\_to\_proxy;

    request\_filter --send response-->logging


    Error>any response filter error\]-->error\_while\_proxy
    IOFailure>IO error\]-->error\_while\_proxy

### General filter usage guidelines

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#general-filter-usage-guidelines)

-   Most filters return a [pingora_error::Result<_>](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/errors.md). When the returned value is Result::Err, fail_to_proxy() will be called and the request will be terminated.
-   Most filters are async functions, which allows other async operations such as IO to be performed within the filters.
-   A per-request CTX object can be defined to share states across the filters of the same request. All filters have mutable access to this object.
-   Most filters are optional.
-   The reason both upstream_response_*_filter() and response_*_filter() exist is for HTTP caching integration reasons (still WIP).

### early_request_filter()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#early_request_filter)

This is the first phase of every request.

This function is similar to request_filter() but executes before any other logic, including downstream module logic. The main purpose of this function is to provide finer-grained control of the behavior of the modules.

### request_filter()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#request_filter)

This phase is usually for validating request inputs, rate limiting, and initializing context.

### request_body_filter()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#request_body_filter)

This phase is triggered after a response body is ready to send to upstream. It will be called every time a piece of request body is received.

### proxy_upstream_filter()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#proxy_upstream_filter)

This phase determines if we should continue to the upstream to serve a response. If we short-circuit, a 502 is returned by default, but a different response can be implemented.

This phase returns a boolean determining if we should continue to the upstream or error.

### upstream_peer()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#upstream_peer)

This phase decides which upstream to connect to (e.g. with DNS lookup and hashing/round-robin), and how to connect to it.

This phase returns a Peer that defines the upstream to connect to. Implementing this phase is **required**.

### connected_to_upstream()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#connected_to_upstream)

This phase is executed when upstream is successfully connected.

Usually this phase is for logging purposes. Connection info such as RTT and upstream TLS ciphers are reported in this phase.

### fail_to_connect()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#fail_to_connect)

The counterpart of connected_to_upstream(). This phase is called if an error is encountered when connecting to upstream.

In this phase users can report the error in Sentry/Prometheus/error log. Users can also decide if the error is retry-able.

If the error is retry-able, upstream_peer() will be called again, in which case the user can decide whether to retry the same upstream or failover to a secondary one.

If the error is not retry-able, the request will end.

### upstream_request_filter()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#upstream_request_filter)

This phase is to modify requests before sending to upstream.

### upstream_response_filter()/upstream_response_body_filter()/upstream_response_trailer_filter()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#upstream_response_filterupstream_response_body_filterupstream_response_trailer_filter)

This phase is triggered after an upstream response header/body/trailer is received.

This phase is to modify or process response headers, body, or trailers before sending to downstream. Note that this phase is called _prior_ to HTTP caching and therefore any changes made here will affect the response stored in the HTTP cache.

### response_filter()/response_body_filter()/response_trailer_filter()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#response_filterresponse_body_filterresponse_trailer_filter)

This phase is triggered after a response header/body/trailer is ready to send to downstream.

This phase is to modify them before sending to downstream.

### error_while_proxy()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#error_while_proxy)

This phase is triggered during proxy errors to upstream, this is after the connection is established.

This phase may decide to retry a request if the connection was re-used and the HTTP method is idempotent.

### fail_to_proxy()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#fail_to_proxy)

This phase is called whenever an error is encounter during any of the phases above.

This phase is usually for error logging and error reporting to downstream.

### logging()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#logging)

This is the last phase that runs after the request is finished (or errors) and before any of its resources are released. Every request will end up in this final phase.

This phase is usually for logging and post request cleanup.

### request_summary()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#request_summary)

This is not a phase, but a commonly used callback.

Every error that reaches fail_to_proxy() will be automatically logged in the error log. request_summary() will be called to dump the info regarding the request when logging the error.

This callback returns a string which allows users to customize what info to dump in the error log to help track and debug the failures.

### suppress_error_log()

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#suppress_error_log)

This is also not a phase, but another callback.

fail_to_proxy() errors are automatically logged in the error log, but users may not be interested in every error. For example, downstream errors are logged if the client disconnects early, but these errors can become noisy if users are mainly interested in observing upstream issues. This callback can inspect the error and returns true or false. If true, the error will not be written to the log.

### Cache filters

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/phase.md#cache-filters)

To be documented

# Connection pooling and reuse

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#connection-pooling-and-reuse)

When the request to a Peer (upstream server) is finished, the connection to that peer is kept alive and added to a connection pool to be _reused_ by subsequent requests. This happens automatically without any special configuration.

Requests that reuse previously established connections avoid the latency and compute cost of setting up a new connection, improving the Pingora server's overall performance and scalability.

## Same Peer

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#same-peer)

Only the connections to the exact same Peer can be reused by a request. For correctness and security reasons, two Peers are the same if and only if all the following attributes are the same

-   IP:port
-   scheme
-   SNI
-   client cert
-   verify cert
-   verify hostname
-   alternative\_cn
-   proxy settings

## Disable pooling

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#disable-pooling)

To disable connection pooling and reuse to a certain Peer, just set the idle_timeout to 0 seconds to all requests using that Peer.

## Failure

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/pooling.md#failure)

A connection is considered not reusable if errors happen during the request.

# Prometheus

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/prom.md#prometheus)

Pingora has a built-in prometheus HTTP metric server for scraping.

    ...
    let mut prometheus\_service\_http = Service::prometheus\_http\_service();
    prometheus\_service\_http.add\_tcp("0.0.0.0:1234");
    my\_server.add\_service(prometheus\_service\_http);
    my\_server.run\_forever();

The simplest way to use it is to have [static metrics](https://docs.rs/prometheus/latest/prometheus/#static-metrics).

static MY\_COUNTER: Lazy<IntGauge\> = Lazy::new(|| {
    register\_int\_gauge!("my\_counter", "my counter").unwrap()
});

This static metric will automatically appear in the Prometheus metric endpoint.

# **RateLimiter quickstart**

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/rate_limiter.md#ratelimiter-quickstart)

Pingora provides a crate pingora-limits which provides a simple and easy to use rate limiter for your application. Below is an example of how you can use [Rate](https://docs.rs/pingora-limits/latest/pingora_limits/rate/struct.Rate.html) to create an application that uses multiple limiters to restrict the rate at which requests can be made on a per-app basis (determined by a request header).

## Steps

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/rate_limiter.md#steps)

1.  Add the following dependencies to your Cargo.toml:

    async-trait\="0.1"
    pingora = { version = "0.3", features = \[ "lb" \] }
    pingora-limits = "0.3.0"
    once\_cell = "1.19.0"

2.  Declare a global rate limiter map to store the rate limiter for each client. In this example, we use appid.
3.  Override the request_filter method in the ProxyHttp trait to implement rate limiting.
    1.  Retrieve the client appid from header.
    2.  Retrieve the current window requests from the rate limiter map. If there is no rate limiter for the client, create a new one and insert it into the map.
    3.  If the current window requests exceed the limit, return 429 and set RateLimiter associated headers.
    4.  If the request is not rate limited, return Ok(false) to continue the request.

## Example

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/rate_limiter.md#example)

use async\_trait::async\_trait;
use once\_cell::sync::Lazy;
use pingora::http::ResponseHeader;
use pingora::prelude::\*;
use pingora\_limits::rate::Rate;
use std::sync::Arc;
use std::time::Duration;

fn main() {
    let mut server = Server::new(Some(Opt::default())).unwrap();
    server.bootstrap();
    let mut upstreams = LoadBalancer::try\_from\_iter(\["1.1.1.1:443", "1.0.0.1:443"\]).unwrap();
    // Set health check
    let hc = TcpHealthCheck::new();
    upstreams.set\_health\_check(hc);
    upstreams.health\_check\_frequency = Some(Duration::from\_secs(1));
    // Set background service
    let background = background\_service("health check", upstreams);
    let upstreams = background.task();
    // Set load balancer
    let mut lb = http\_proxy\_service(&server.configuration, LB(upstreams));
    lb.add\_tcp("0.0.0.0:6188");

    // let rate = Rate
    server.add\_service(background);
    server.add\_service(lb);
    server.run\_forever();
}

pub struct LB(Arc<LoadBalancer<RoundRobin\>\>);

impl LB {
    pub fn get\_request\_appid(&self, session: &mut Session) -> Option<String\> {
        match session
            .req\_header()
            .headers
            .get("appid")
            .map(|v| v.to\_str())
        {
            None => None,
            Some(v) => match v {
                Ok(v) => Some(v.to\_string()),
                Err(\_) => None,
            },
        }
    }
}

// Rate limiter
static RATE\_LIMITER: Lazy<Rate\> = Lazy::new(|| Rate::new(Duration::from\_secs(1)));

// max request per second per client
static MAX\_REQ\_PER\_SEC: isize = 1;

#\[async\_trait\]
impl ProxyHttp for LB {
    type CTX = ();

    fn new\_ctx(&self) {}

    async fn upstream\_peer(
        &self,
        \_session: &mut Session,
        \_ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer\>\> {
        let upstream = self.0.select(b"", 256).unwrap();
        // Set SNI
        let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to\_string()));
        Ok(peer)
    }

    async fn upstream\_request\_filter(
        &self,
        \_session: &mut Session,
        upstream\_request: &mut RequestHeader,
        \_ctx: &mut Self::CTX,
    ) -> Result<()\>
    where
        Self::CTX: Send + Sync,
    {
        upstream\_request
            .insert\_header("Host", "one.one.one.one")
            .unwrap();
        Ok(())
    }

    async fn request\_filter(&self, session: &mut Session, \_ctx: &mut Self::CTX) -> Result<bool\>
    where
        Self::CTX: Send + Sync,
    {
        let appid = match self.get\_request\_appid(session) {
            None => return Ok(false), // no client appid found, skip rate limiting
            Some(addr) => addr,
        };

        // retrieve the current window requests
        let curr\_window\_requests = RATE\_LIMITER.observe(&appid, 1);
        if curr\_window\_requests > MAX\_REQ\_PER\_SEC {
            // rate limited, return 429
            let mut header = ResponseHeader::build(429, None).unwrap();
            header
                .insert\_header("X-Rate-Limit-Limit", MAX\_REQ\_PER\_SEC.to\_string())
                .unwrap();
            header.insert\_header("X-Rate-Limit-Remaining", "0").unwrap();
            header.insert\_header("X-Rate-Limit-Reset", "1").unwrap();
            session.set\_keepalive(None);
            session
                .write\_response\_header(Box::new(header), true)
                .await?;
            return Ok(true);
        }
        Ok(false)
    }
}

## Testing

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/rate_limiter.md#testing)

To use the example above,

1.  Run your program with cargo run.
2.  Verify the program is working with a few executions of curl localhost:6188 -H "appid:1" -v
    -   The first request should work and any later requests that arrive within 1s of a previous request should fail with:

            *   Trying 127.0.0.1:6188...
            * Connected to localhost (127.0.0.1) port 6188 (#0)
            > GET / HTTP/1.1
            > Host: localhost:6188
            > User-Agent: curl/7.88.1
            > Accept: */*
            > appid:1
            >
            < HTTP/1.1 429 Too Many Requests
            < X-Rate-Limit-Limit: 1
            < X-Rate-Limit-Remaining: 0
            < X-Rate-Limit-Reset: 1
            < Date: Sun, 14 Jul 2024 20:29:02 GMT
            < Connection: close
            <
            * Closing connection 0



## Complete Example

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/rate_limiter.md#complete-example)

You can run the pre-made example code in the [pingora-proxy examples folder](https://github.com/cloudflare/pingora/tree/main/pingora-proxy/examples/rate_limiter.rs) with

    cargo run --example rate_limiter

# Starting and stopping Pingora server

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/start_stop.md#starting-and-stopping-pingora-server)

A pingora server is a regular unprivileged multithreaded process.

## Start

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/start_stop.md#start)

By default, the server will run in the foreground.

A Pingora server by default takes the following command-line arguments:

Argument

Effect

default

\-d, --daemon

Daemonize the server

false

\-t, --test

Test the server conf and then exit (WIP)

false

\-c, --conf

The path to the configuration file

empty string

\-u, --upgrade

This server should gracefully upgrade a running server

false

## Stop

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/start_stop.md#stop)

A Pingora server will listen to the following signals.

### SIGINT: fast shutdown

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/start_stop.md#sigint-fast-shutdown)

Upon receiving SIGINT (ctrl + c), the server will exit immediately with no delay. All unfinished requests will be interrupted. This behavior is usually less preferred because it could break requests.

### SIGTERM: graceful shutdown

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/start_stop.md#sigterm-graceful-shutdown)

Upon receiving SIGTERM, the server will notify all its services to shutdown, wait for some preconfigured time and then exit. This behavior gives requests a grace period to finish.

### SIGQUIT: graceful upgrade

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/start_stop.md#sigquit-graceful-upgrade)

Similar to SIGTERM, but the server will also transfer all its listening sockets to a new Pingora server so that there is no downtime during the upgrade. See the [graceful upgrade](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/graceful.md) section for more details.

# Systemd integration

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/systemd.md#systemd-integration)

A Pingora server doesn't depend on systemd but it can easily be made into a systemd service.

\[Service\]
Type\=forking
PIDFile\=/run/pingora.pid
ExecStart\=/bin/pingora -d -c /etc/pingora.conf
ExecReload\=kill -QUIT $MAINPID
ExecReload\=/bin/pingora -u -d -c /etc/pingora.conf

The example systemd setup integrates Pingora's graceful upgrade into systemd. To upgrade the pingora service, simply install a version of the binary and then call systemctl reload pingora.service.


# Systemd integration

[](https://github.com/cloudflare/pingora/blob/main/docs/user_guide/systemd.md#systemd-integration)

A Pingora server doesn't depend on systemd but it can easily be made into a systemd service.

\[Service\]
Type\=forking
PIDFile\=/run/pingora.pid
ExecStart\=/bin/pingora -d -c /etc/pingora.conf
ExecReload\=kill -QUIT $MAINPID
ExecReload\=/bin/pingora -u -d -c /etc/pingora.conf

The example systemd setup integrates Pingora's graceful upgrade into systemd. To upgrade the pingora service, simply install a version of the binary and then call systemctl reload pingora.service.


## gateway.rs example
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use bytes::Bytes;
use clap::Parser;
use log::info;
use prometheus::register_int_counter;

use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_http::ResponseHeader;
use pingora_proxy::{ProxyHttp, Session};

fn check_login(req: &pingora_http::RequestHeader) -> bool {
    // implement you logic check logic here
    req.headers.get("Authorization").map(|v| v.as_bytes()) == Some(b"password")
}

pub struct MyGateway {
    req_metric: prometheus::IntCounter,
}

#[async_trait]
impl ProxyHttp for MyGateway {
    type CTX = ();
    fn new_ctx(&self) -> Self::CTX {}

    async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
        if session.req_header().uri.path().starts_with("/login")
            && !check_login(session.req_header())
        {
            let _ = session
                .respond_error_with_body(403, Bytes::from_static(b"no way!"))
                .await;
            // true: early return as the response is already written
            return Ok(true);
        }
        Ok(false)
    }

    async fn upstream_peer(
        &self,
        session: &mut Session,
        _ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer>> {
        let addr = if session.req_header().uri.path().starts_with("/family") {
            ("1.0.0.1", 443)
        } else {
            ("1.1.1.1", 443)
        };

        info!("connecting to {addr:?}");

        let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to_string()));
        Ok(peer)
    }

    async fn response_filter(
        &self,
        _session: &mut Session,
        upstream_response: &mut ResponseHeader,
        _ctx: &mut Self::CTX,
    ) -> Result<()>
    where
        Self::CTX: Send + Sync,
    {
        // replace existing header if any
        upstream_response
            .insert_header("Server", "MyGateway")
            .unwrap();
        // because we don't support h3
        upstream_response.remove_header("alt-svc");

        Ok(())
    }

    async fn logging(
        &self,
        session: &mut Session,
        _e: Option<&pingora_core::Error>,
        ctx: &mut Self::CTX,
    ) {
        let response_code = session
            .response_written()
            .map_or(0, |resp| resp.status.as_u16());
        info!(
            "{} response code: {response_code}",
            self.request_summary(session, ctx)
        );

        self.req_metric.inc();
    }
}

// RUST_LOG=INFO cargo run --example gateway
// curl 127.0.0.1:6191 -H "Host: one.one.one.one"
// curl 127.0.0.1:6190/family/ -H "Host: one.one.one.one"
// curl 127.0.0.1:6191/login/ -H "Host: one.one.one.one" -I -H "Authorization: password"
// curl 127.0.0.1:6191/login/ -H "Host: one.one.one.one" -I -H "Authorization: bad"
// For metrics
// curl 127.0.0.1:6192/
fn main() {
    env_logger::init();

    // read command line arguments
    let opt = Opt::parse();
    let mut my_server = Server::new(Some(opt)).unwrap();
    my_server.bootstrap();

    let mut my_proxy = pingora_proxy::http_proxy_service(
        &my_server.configuration,
        MyGateway {
            req_metric: register_int_counter!("req_counter", "Number of requests").unwrap(),
        },
    );
    my_proxy.add_tcp("0.0.0.0:6191");
    my_server.add_service(my_proxy);

    let mut prometheus_service_http =
        pingora_core::services::listening::Service::prometheus_http_service();
    prometheus_service_http.add_tcp("127.0.0.1:6192");
    my_server.add_service(prometheus_service_http);

    my_server.run_forever();
}

## ctx.rs

use async_trait::async_trait;
use clap::Parser;
use log::info;
use std::sync::Mutex;

use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_proxy::{ProxyHttp, Session};

// global counter
static REQ_COUNTER: Mutex<usize> = Mutex::new(0);

pub struct MyProxy {
    // counter for the service
    beta_counter: Mutex<usize>, // AtomicUsize works too
}

pub struct MyCtx {
    beta_user: bool,
}

fn check_beta_user(req: &pingora_http::RequestHeader) -> bool {
    // some simple logic to check if user is beta
    req.headers.get("beta-flag").is_some()
}

#[async_trait]
impl ProxyHttp for MyProxy {
    type CTX = MyCtx;
    fn new_ctx(&self) -> Self::CTX {
        MyCtx { beta_user: false }
    }

    async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
        ctx.beta_user = check_beta_user(session.req_header());
        Ok(false)
    }

    async fn upstream_peer(
        &self,
        _session: &mut Session,
        ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer>> {
        let mut req_counter = REQ_COUNTER.lock().unwrap();
        *req_counter += 1;

        let addr = if ctx.beta_user {
            let mut beta_count = self.beta_counter.lock().unwrap();
            *beta_count += 1;
            info!("I'm a beta user #{beta_count}");
            ("1.0.0.1", 443)
        } else {
            info!("I'm an user #{req_counter}");
            ("1.1.1.1", 443)
        };

        let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to_string()));
        Ok(peer)
    }
}

// RUST_LOG=INFO cargo run --example ctx
// curl 127.0.0.1:6190 -H "Host: one.one.one.one"
// curl 127.0.0.1:6190 -H "Host: one.one.one.one" -H "beta-flag: 1"
fn main() {
    env_logger::init();

    // read command line arguments
    let opt = Opt::parse();
    let mut my_server = Server::new(Some(opt)).unwrap();
    my_server.bootstrap();

    let mut my_proxy = pingora_proxy::http_proxy_service(
        &my_server.configuration,
        MyProxy {
            beta_counter: Mutex::new(0),
        },
    );
    my_proxy.add_tcp("0.0.0.0:6190");

    my_server.add_service(my_proxy);
    my_server.run_forever();
}

// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use clap::Parser;

use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_core::{
    modules::http::{
        grpc_web::{GrpcWeb, GrpcWebBridge},
        HttpModules,
    },
    prelude::Opt,
};
use pingora_proxy::{ProxyHttp, Session};

/// This example shows how to use the gRPC-web bridge module

pub struct GrpcWebBridgeProxy;

#[async_trait]
impl ProxyHttp for GrpcWebBridgeProxy {
    type CTX = ();
    fn new_ctx(&self) -> Self::CTX {}

    fn init_downstream_modules(&self, modules: &mut HttpModules) {
        // Add the gRPC web module
        modules.add_module(Box::new(GrpcWeb))
    }

    async fn early_request_filter(
        &self,
        session: &mut Session,
        _ctx: &mut Self::CTX,
    ) -> Result<()> {
        let grpc = session
            .downstream_modules_ctx
            .get_mut::<GrpcWebBridge>()
            .expect("GrpcWebBridge module added");

        // initialize gRPC module for this request
        grpc.init();
        Ok(())
    }

    async fn upstream_peer(
        &self,
        _session: &mut Session,
        _ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer>> {
        // this needs to be your gRPC server
        let grpc_peer = Box::new(HttpPeer::new(
            ("1.1.1.1", 443),
            true,
            "one.one.one.one".to_string(),
        ));
        Ok(grpc_peer)
    }
}

// RUST_LOG=INFO cargo run --example grpc_web_module

fn main() {
    env_logger::init();

    // read command line arguments
    let opt = Opt::parse();
    let mut my_server = Server::new(Some(opt)).unwrap();
    my_server.bootstrap();

    let mut my_proxy =
        pingora_proxy::http_proxy_service(&my_server.configuration, GrpcWebBridgeProxy);
    my_proxy.add_tcp("0.0.0.0:6194");

    my_server.add_service(my_proxy);
    my_server.run_forever();
}

// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use clap::Parser;
use log::info;
use pingora_core::services::background::background_service;
use std::{sync::Arc, time::Duration};

use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_load_balancing::{health_check, selection::RoundRobin, LoadBalancer};
use pingora_proxy::{ProxyHttp, Session};

pub struct LB(Arc<LoadBalancer<RoundRobin>>);

#[async_trait]
impl ProxyHttp for LB {
    type CTX = ();
    fn new_ctx(&self) -> Self::CTX {}

    async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
        let upstream = self
            .0
            .select(b"", 256) // hash doesn't matter
            .unwrap();

        info!("upstream peer is: {:?}", upstream);

        let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
        Ok(peer)
    }

    async fn upstream_request_filter(
        &self,
        _session: &mut Session,
        upstream_request: &mut pingora_http::RequestHeader,
        _ctx: &mut Self::CTX,
    ) -> Result<()> {
        upstream_request
            .insert_header("Host", "one.one.one.one")
            .unwrap();
        Ok(())
    }
}

// RUST_LOG=INFO cargo run --example load_balancer
fn main() {
    env_logger::init();

    // read command line arguments
    let opt = Opt::parse();
    let mut my_server = Server::new(Some(opt)).unwrap();
    my_server.bootstrap();

    // 127.0.0.1:343" is just a bad server
    let mut upstreams =
        LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443", "127.0.0.1:343"]).unwrap();

    // We add health check in the background so that the bad server is never selected.
    let hc = health_check::TcpHealthCheck::new();
    upstreams.set_health_check(hc);
    upstreams.health_check_frequency = Some(Duration::from_secs(1));

    let background = background_service("health check", upstreams);

    let upstreams = background.task();

    let mut lb = pingora_proxy::http_proxy_service(&my_server.configuration, LB(upstreams));
    lb.add_tcp("0.0.0.0:6188");

    let cert_path = format!("{}/tests/keys/server.crt", env!("CARGO_MANIFEST_DIR"));
    let key_path = format!("{}/tests/keys/key.pem", env!("CARGO_MANIFEST_DIR"));

    let mut tls_settings =
        pingora_core::listeners::tls::TlsSettings::intermediate(&cert_path, &key_path).unwrap();
    tls_settings.enable_h2();
    lb.add_tls_with_settings("0.0.0.0:6189", None, tls_settings);

    my_server.add_service(lb);
    my_server.add_service(background);
    my_server.run_forever();
}

// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use bytes::Bytes;
use clap::Parser;
use serde::{Deserialize, Serialize};
use std::net::ToSocketAddrs;

use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_http::ResponseHeader;
use pingora_proxy::{ProxyHttp, Session};

const HOST: &str = "ip.jsontest.com";

#[derive(Serialize, Deserialize)]
pub struct Resp {
    ip: String,
}

pub struct Json2Yaml {
    addr: std::net::SocketAddr,
}

pub struct MyCtx {
    buffer: Vec<u8>,
}

#[async_trait]
impl ProxyHttp for Json2Yaml {
    type CTX = MyCtx;
    fn new_ctx(&self) -> Self::CTX {
        MyCtx { buffer: vec![] }
    }

    async fn upstream_peer(
        &self,
        _session: &mut Session,
        _ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer>> {
        let peer = Box::new(HttpPeer::new(self.addr, false, HOST.to_owned()));
        Ok(peer)
    }

    async fn upstream_request_filter(
        &self,
        _session: &mut Session,
        upstream_request: &mut pingora_http::RequestHeader,
        _ctx: &mut Self::CTX,
    ) -> Result<()> {
        upstream_request
            .insert_header("Host", HOST.to_owned())
            .unwrap();
        Ok(())
    }

    async fn response_filter(
        &self,
        _session: &mut Session,
        upstream_response: &mut ResponseHeader,
        _ctx: &mut Self::CTX,
    ) -> Result<()>
    where
        Self::CTX: Send + Sync,
    {
        // Remove content-length because the size of the new body is unknown
        upstream_response.remove_header("Content-Length");
        upstream_response
            .insert_header("Transfer-Encoding", "Chunked")
            .unwrap();
        Ok(())
    }

    fn response_body_filter(
        &self,
        _session: &mut Session,
        body: &mut Option<Bytes>,
        end_of_stream: bool,
        ctx: &mut Self::CTX,
    ) -> Result<Option<std::time::Duration>>
    where
        Self::CTX: Send + Sync,
    {
        // buffer the data
        if let Some(b) = body {
            ctx.buffer.extend(&b[..]);
            // drop the body
            b.clear();
        }
        if end_of_stream {
            // This is the last chunk, we can process the data now
            let json_body: Resp = serde_json::de::from_slice(&ctx.buffer).unwrap();
            let yaml_body = serde_yaml::to_string(&json_body).unwrap();
            *body = Some(Bytes::copy_from_slice(yaml_body.as_bytes()));
        }

        Ok(None)
    }
}

// RUST_LOG=INFO cargo run --example modify_response
// curl 127.0.0.1:6191
fn main() {
    env_logger::init();

    let opt = Opt::parse();
    let mut my_server = Server::new(Some(opt)).unwrap();
    my_server.bootstrap();

    let mut my_proxy = pingora_proxy::http_proxy_service(
        &my_server.configuration,
        Json2Yaml {
            // hardcode the IP of ip.jsontest.com for now
            addr: ("142.251.2.121", 80)
                .to_socket_addrs()
                .unwrap()
                .next()
                .unwrap(),
        },
    );

    my_proxy.add_tcp("127.0.0.1:6191");

    my_server.add_service(my_proxy);
    my_server.run_forever();
}


// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use std::sync::Arc;

use pingora_core::{prelude::*, services::background::GenBackgroundService};
use pingora_load_balancing::{
    health_check::TcpHealthCheck,
    selection::{BackendIter, BackendSelection, RoundRobin},
    LoadBalancer,
};
use pingora_proxy::{http_proxy_service, ProxyHttp, Session};

struct Router {
    cluster_one: Arc<LoadBalancer<RoundRobin>>,
    cluster_two: Arc<LoadBalancer<RoundRobin>>,
}

#[async_trait]
impl ProxyHttp for Router {
    type CTX = ();
    fn new_ctx(&self) {}

    async fn upstream_peer(&self, session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
        // determine LB cluster based on request uri
        let cluster = if session.req_header().uri.path().starts_with("/one/") {
            &self.cluster_one
        } else {
            &self.cluster_two
        };

        let upstream = cluster
            .select(b"", 256) // hash doesn't matter for round robin
            .unwrap();

        println!("upstream peer is: {upstream:?}");

        // Set SNI to one.one.one.one
        let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
        Ok(peer)
    }
}

fn build_cluster_service<S>(upstreams: &[&str]) -> GenBackgroundService<LoadBalancer<S>>
where
    S: BackendSelection + 'static,
    S::Iter: BackendIter,
{
    let mut cluster = LoadBalancer::try_from_iter(upstreams).unwrap();
    cluster.set_health_check(TcpHealthCheck::new());
    cluster.health_check_frequency = Some(std::time::Duration::from_secs(1));

    background_service("cluster health check", cluster)
}

// RUST_LOG=INFO cargo run --example multi_lb
// curl 127.0.0.1:6188/one/
// curl 127.0.0.1:6188/two/
fn main() {
    let mut my_server = Server::new(None).unwrap();
    my_server.bootstrap();

    // build multiple clusters
    let cluster_one = build_cluster_service::<RoundRobin>(&["1.1.1.1:443", "127.0.0.1:343"]);
    let cluster_two = build_cluster_service::<RoundRobin>(&["1.0.0.1:443", "127.0.0.2:343"]);

    let router = Router {
        cluster_one: cluster_one.task(),
        cluster_two: cluster_two.task(),
    };
    let mut router_service = http_proxy_service(&my_server.configuration, router);
    router_service.add_tcp("0.0.0.0:6188");

    my_server.add_service(router_service);
    my_server.add_service(cluster_one);
    my_server.add_service(cluster_two);

    my_server.run_forever();
}


use async_trait::async_trait;
use once_cell::sync::Lazy;
use pingora_core::prelude::*;
use pingora_http::{RequestHeader, ResponseHeader};
use pingora_limits::rate::Rate;
use pingora_load_balancing::prelude::{RoundRobin, TcpHealthCheck};
use pingora_load_balancing::LoadBalancer;
use pingora_proxy::{http_proxy_service, ProxyHttp, Session};
use std::sync::Arc;
use std::time::Duration;

fn main() {
    let mut server = Server::new(Some(Opt::default())).unwrap();
    server.bootstrap();
    let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
    // Set health check
    let hc = TcpHealthCheck::new();
    upstreams.set_health_check(hc);
    upstreams.health_check_frequency = Some(Duration::from_secs(1));
    // Set background service
    let background = background_service("health check", upstreams);
    let upstreams = background.task();
    // Set load balancer
    let mut lb = http_proxy_service(&server.configuration, LB(upstreams));
    lb.add_tcp("0.0.0.0:6188");

    // let rate = Rate
    server.add_service(background);
    server.add_service(lb);
    server.run_forever();
}

pub struct LB(Arc<LoadBalancer<RoundRobin>>);

impl LB {
    pub fn get_request_appid(&self, session: &mut Session) -> Option<String> {
        match session
            .req_header()
            .headers
            .get("appid")
            .map(|v| v.to_str())
        {
            None => None,
            Some(v) => match v {
                Ok(v) => Some(v.to_string()),
                Err(_) => None,
            },
        }
    }
}

// Rate limiter
static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));

// max request per second per client
static MAX_REQ_PER_SEC: isize = 1;

#[async_trait]
impl ProxyHttp for LB {
    type CTX = ();

    fn new_ctx(&self) {}

    async fn upstream_peer(
        &self,
        _session: &mut Session,
        _ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer>> {
        let upstream = self.0.select(b"", 256).unwrap();
        // Set SNI
        let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
        Ok(peer)
    }

    async fn upstream_request_filter(
        &self,
        _session: &mut Session,
        upstream_request: &mut RequestHeader,
        _ctx: &mut Self::CTX,
    ) -> Result<()>
    where
        Self::CTX: Send + Sync,
    {
        upstream_request
            .insert_header("Host", "one.one.one.one")
            .unwrap();
        Ok(())
    }

    async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
    where
        Self::CTX: Send + Sync,
    {
        let appid = match self.get_request_appid(session) {
            None => return Ok(false), // no client appid found, skip rate limiting
            Some(addr) => addr,
        };

        // retrieve the current window requests
        let curr_window_requests = RATE_LIMITER.observe(&appid, 1);
        if curr_window_requests > MAX_REQ_PER_SEC {
            // rate limited, return 429
            let mut header = ResponseHeader::build(429, None).unwrap();
            header
                .insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string())
                .unwrap();
            header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
            header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
            session.set_keepalive(None);
            session
                .write_response_header(Box::new(header), true)
                .await?;
            return Ok(true);
        }
        Ok(false)
    }
}


// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use clap::Parser;

use pingora_core::modules::http::HttpModules;
use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_http::RequestHeader;
use pingora_proxy::{ProxyHttp, Session};

/// This example shows how to build and import 3rd party modules

/// A simple ACL to check "Authorization: basic $credential" header
mod my_acl {
    use super::*;
    use pingora_core::modules::http::{HttpModule, HttpModuleBuilder, Module};
    use pingora_error::{Error, ErrorType::HTTPStatus};
    use std::any::Any;

    // This is the struct for per request module context
    struct MyAclCtx {
        credential_header: String,
    }

    // Implement how the module would consume and/or modify request and/or response
    #[async_trait]
    impl HttpModule for MyAclCtx {
        async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
            let Some(auth) = req.headers.get(http::header::AUTHORIZATION) else {
                return Error::e_explain(HTTPStatus(403), "Auth failed, no auth header");
            };

            if auth.as_bytes() != self.credential_header.as_bytes() {
                Error::e_explain(HTTPStatus(403), "Auth failed, credential mismatch")
            } else {
                Ok(())
            }
        }

        // boilerplate code for all modules
        fn as_any(&self) -> &dyn Any {
            self
        }
        fn as_any_mut(&mut self) -> &mut dyn Any {
            self
        }
    }

    // This is the singleton object which will be attached to the server
    pub struct MyAcl {
        pub credential: String,
    }
    impl HttpModuleBuilder for MyAcl {
        // This function defines how to create each Ctx. This function is called when a new request
        // arrives
        fn init(&self) -> Module {
            Box::new(MyAclCtx {
                // Make it easier to compare header
                // We could also store this value in MyAcl and use Arc to share it with every Ctx.
                credential_header: format!("basic {}", self.credential),
            })
        }
    }
}

pub struct MyProxy;

#[async_trait]
impl ProxyHttp for MyProxy {
    type CTX = ();
    fn new_ctx(&self) -> Self::CTX {}

    // This function is only called once when the server starts
    fn init_downstream_modules(&self, modules: &mut HttpModules) {
        // Add the module to MyProxy
        modules.add_module(Box::new(my_acl::MyAcl {
            credential: "testcode".into(),
        }))
    }

    async fn upstream_peer(
        &self,
        _session: &mut Session,
        _ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer>> {
        let peer = Box::new(HttpPeer::new(
            ("1.1.1.1", 443),
            true,
            "one.one.one.one".to_string(),
        ));
        Ok(peer)
    }
}

// RUST_LOG=INFO cargo run --example use_module
// curl 127.0.0.1:6193 -H "Host: one.one.one.one" -v
// curl 127.0.0.1:6193 -H "Host: one.one.one.one" -H "Authorization: basic testcode"
// curl 127.0.0.1:6193 -H "Host: one.one.one.one" -H "Authorization: basic wrong" -v
fn main() {
    env_logger::init();

    // read command line arguments
    let opt = Opt::parse();
    let mut my_server = Server::new(Some(opt)).unwrap();
    my_server.bootstrap();

    let mut my_proxy = pingora_proxy::http_proxy_service(&my_server.configuration, MyProxy);
    my_proxy.add_tcp("0.0.0.0:6193");

    my_server.add_service(my_proxy);
    my_server.run_forever();
}


pingora_proxy/src/lib.rs

// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! # pingora-proxy
//!
//! Programmable HTTP proxy built on top of [pingora_core].
//!
//! # Features
//! - HTTP/1.x and HTTP/2 for both downstream and upstream
//! - Connection pooling
//! - TLSv1.3, mutual TLS, customizable CA
//! - Request/Response scanning, modification or rejection
//! - Dynamic upstream selection
//! - Configurable retry and failover
//! - Fully programmable and customizable at any stage of a HTTP request
//!
//! # How to use
//!
//! Users of this crate defines their proxy by implementing [ProxyHttp] trait, which contains the
//! callbacks to be invoked at each stage of a HTTP request.
//!
//! Then the service can be passed into [http_proxy_service()] for a [pingora_core::server::Server] to
//! run it.
//!
//! See examples/load_balancer.rs for a detailed example.

use async_trait::async_trait;
use bytes::Bytes;
use futures::future::FutureExt;
use http::{header, version::Version};
use log::{debug, error, trace, warn};
use once_cell::sync::Lazy;
use pingora_http::{RequestHeader, ResponseHeader};
use std::fmt::Debug;
use std::str;
use std::sync::Arc;
use tokio::sync::{mpsc, Notify};
use tokio::time;

use pingora_cache::NoCacheReason;
use pingora_core::apps::{HttpServerApp, HttpServerOptions};
use pingora_core::connectors::{http::Connector, ConnectorOptions};
use pingora_core::modules::http::compression::ResponseCompressionBuilder;
use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
use pingora_core::protocols::http::client::HttpSession as ClientSession;
use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
use pingora_core::protocols::http::HttpTask;
use pingora_core::protocols::http::ServerSession as HttpSession;
use pingora_core::protocols::http::SERVER_NAME;
use pingora_core::protocols::Stream;
use pingora_core::protocols::{Digest, UniqueID};
use pingora_core::server::configuration::ServerConf;
use pingora_core::server::ShutdownWatch;
use pingora_core::upstreams::peer::{HttpPeer, Peer};
use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};

const MAX_RETRIES: usize = 16;
const TASK_BUFFER_SIZE: usize = 4;

mod proxy_cache;
mod proxy_common;
mod proxy_h1;
mod proxy_h2;
mod proxy_purge;
mod proxy_trait;
pub mod subrequest;

use subrequest::Ctx as SubReqCtx;

pub use proxy_cache::range_filter::{range_header_filter, RangeType};
pub use proxy_purge::PurgeStatus;
pub use proxy_trait::ProxyHttp;

pub mod prelude {
    pub use crate::{http_proxy_service, ProxyHttp, Session};
}

/// The concrete type that holds the user defined HTTP proxy.
///
/// Users don't need to interact with this object directly.
pub struct HttpProxy<SV> {
    inner: SV, // TODO: name it better than inner
    client_upstream: Connector,
    shutdown: Notify,
    pub server_options: Option<HttpServerOptions>,
    pub downstream_modules: HttpModules,
}

impl<SV> HttpProxy<SV> {
    fn new(inner: SV, conf: Arc<ServerConf>) -> Self {
        HttpProxy {
            inner,
            client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
            shutdown: Notify::new(),
            server_options: None,
            downstream_modules: HttpModules::new(),
        }
    }

    fn handle_init_modules(&mut self)
    where
        SV: ProxyHttp,
    {
        self.inner
            .init_downstream_modules(&mut self.downstream_modules);
    }

    async fn handle_new_request(
        &self,
        mut downstream_session: Box<HttpSession>,
    ) -> Option<Box<HttpSession>>
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        // phase 1 read request header

        let res = tokio::select! {
            biased; // biased select is cheaper, and we don't want to drop already buffered requests
            res = downstream_session.read_request() => { res }
            _ = self.shutdown.notified() => {
                // service shutting down, dropping the connection to stop more req from coming in
                return None;
            }
        };
        match res {
            Ok(true) => {
                // TODO: check n==0
                debug!("Successfully get a new request");
            }
            Ok(false) => {
                return None; // TODO: close connection?
            }
            Err(mut e) => {
                e.as_down();
                error!("Fail to proxy: {e}");
                if matches!(e.etype, InvalidHTTPHeader) {
                    downstream_session
                        .respond_error(400)
                        .await
                        .unwrap_or_else(|e| {
                            error!("failed to send error response to downstream: {e}");
                        });
                } // otherwise the connection must be broken, no need to send anything
                downstream_session.shutdown().await;
                return None;
            }
        }
        trace!(
            "Request header: {:?}",
            downstream_session.req_header().as_ref()
        );
        Some(downstream_session)
    }

    // return bool: server_session can be reused, and error if any
    async fn proxy_to_upstream(
        &self,
        session: &mut Session,
        ctx: &mut SV::CTX,
    ) -> (bool, Option<Box<Error>>)
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        let peer = match self.inner.upstream_peer(session, ctx).await {
            Ok(p) => p,
            Err(e) => return (false, Some(e)),
        };

        let client_session = self.client_upstream.get_http_session(&*peer).await;
        match client_session {
            Ok((client_session, client_reused)) => {
                let (server_reused, error) = match client_session {
                    ClientSession::H1(mut h1) => {
                        let (server_reused, client_reuse, error) = self
                            .proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
                            .await;
                        if client_reuse {
                            let session = ClientSession::H1(h1);
                            self.client_upstream
                                .release_http_session(session, &*peer, peer.idle_timeout())
                                .await;
                        }
                        (server_reused, error)
                    }
                    ClientSession::H2(mut h2) => {
                        let (server_reused, mut error) = self
                            .proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
                            .await;
                        let session = ClientSession::H2(h2);
                        self.client_upstream
                            .release_http_session(session, &*peer, peer.idle_timeout())
                            .await;

                        if let Some(e) = error.as_mut() {
                            // try to downgrade if A. origin says so or B. origin sends an invalid
                            // response, which usually means origin h2 is not production ready
                            if matches!(e.etype, H2Downgrade | InvalidH2) {
                                if peer
                                    .get_alpn()
                                    .map_or(true, |alpn| alpn.get_min_http_version() == 1)
                                {
                                    // Add the peer to prefer h1 so that all following requests
                                    // will use h1
                                    self.client_upstream.prefer_h1(&*peer);
                                } else {
                                    // the peer doesn't allow downgrading to h1 (e.g. gRPC)
                                    e.retry = false.into();
                                }
                            }
                        }

                        (server_reused, error)
                    }
                };
                (
                    server_reused,
                    error.map(|e| {
                        self.inner
                            .error_while_proxy(&peer, session, e, ctx, client_reused)
                    }),
                )
            }
            Err(mut e) => {
                e.as_up();
                let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
                (false, Some(new_err.into_up()))
            }
        }
    }

    fn upstream_filter(
        &self,
        session: &mut Session,
        task: &mut HttpTask,
        ctx: &mut SV::CTX,
    ) -> Result<()>
    where
        SV: ProxyHttp,
    {
        match task {
            HttpTask::Header(header, _eos) => {
                self.inner.upstream_response_filter(session, header, ctx)
            }
            HttpTask::Body(data, eos) => self
                .inner
                .upstream_response_body_filter(session, data, *eos, ctx)?,
            HttpTask::Trailer(Some(trailers)) => self
                .inner
                .upstream_response_trailer_filter(session, trailers, ctx)?,
            _ => {
                // task does not support a filter
            }
        }
        Ok(())
    }

    async fn finish(
        &self,
        mut session: Session,
        ctx: &mut SV::CTX,
        reuse: bool,
        error: Option<&Error>,
    ) -> Option<Stream>
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        self.inner.logging(&mut session, error, ctx).await;

        if reuse {
            // TODO: log error
            session.downstream_session.finish().await.ok().flatten()
        } else {
            None
        }
    }
}

use pingora_cache::HttpCache;
use pingora_core::protocols::http::compression::ResponseCompressionCtx;

/// The established HTTP session
///
/// This object is what users interact with in order to access the request itself or change the proxy
/// behavior.
pub struct Session {
    /// the HTTP session to downstream (the client)
    pub downstream_session: Box<HttpSession>,
    /// The interface to control HTTP caching
    pub cache: HttpCache,
    /// (de)compress responses coming into the proxy (from upstream)
    pub upstream_compression: ResponseCompressionCtx,
    /// ignore downstream range (skip downstream range filters)
    pub ignore_downstream_range: bool,
    // the context from parent request
    subrequest_ctx: Option<Box<SubReqCtx>>,
    // Downstream filter modules
    pub downstream_modules_ctx: HttpModuleCtx,
}

impl Session {
    fn new(
        downstream_session: impl Into<Box<HttpSession>>,
        downstream_modules: &HttpModules,
    ) -> Self {
        Session {
            downstream_session: downstream_session.into(),
            cache: HttpCache::new(),
            // disable both upstream and downstream compression
            upstream_compression: ResponseCompressionCtx::new(0, false, false),
            ignore_downstream_range: false,
            subrequest_ctx: None,
            downstream_modules_ctx: downstream_modules.build_ctx(),
        }
    }

    /// Create a new [Session] from the given [Stream]
    ///
    /// This function is mostly used for testing and mocking.
    pub fn new_h1(stream: Stream) -> Self {
        let modules = HttpModules::new();
        Self::new(Box::new(HttpSession::new_http1(stream)), &modules)
    }

    /// Create a new [Session] from the given [Stream] with modules
    ///
    /// This function is mostly used for testing and mocking.
    pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
        Self::new(Box::new(HttpSession::new_http1(stream)), downstream_modules)
    }

    pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
        &mut self.downstream_session
    }

    pub fn as_downstream(&self) -> &HttpSession {
        &self.downstream_session
    }

    /// Write HTTP response with the given error code to the downstream.
    pub async fn respond_error(&mut self, error: u16) -> Result<()> {
        self.as_downstream_mut().respond_error(error).await
    }

    /// Write HTTP response with the given error code to the downstream with a body.
    pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
        self.as_downstream_mut()
            .respond_error_with_body(error, body)
            .await
    }

    /// Write the given HTTP response header to the downstream
    ///
    /// Different from directly calling [HttpSession::write_response_header], this function also
    /// invokes the filter modules.
    pub async fn write_response_header(
        &mut self,
        mut resp: Box<ResponseHeader>,
        end_of_stream: bool,
    ) -> Result<()> {
        self.downstream_modules_ctx
            .response_header_filter(&mut resp, end_of_stream)
            .await?;
        self.downstream_session.write_response_header(resp).await
    }

    /// Write the given HTTP response body chunk to the downstream
    ///
    /// Different from directly calling [HttpSession::write_response_body], this function also
    /// invokes the filter modules.
    pub async fn write_response_body(
        &mut self,
        mut body: Option<Bytes>,
        end_of_stream: bool,
    ) -> Result<()> {
        self.downstream_modules_ctx
            .response_body_filter(&mut body, end_of_stream)?;

        if body.is_none() && !end_of_stream {
            return Ok(());
        }

        let data = body.unwrap_or_default();
        self.downstream_session
            .write_response_body(data, end_of_stream)
            .await
    }

    pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
        for task in tasks.iter_mut() {
            match task {
                HttpTask::Header(resp, end) => {
                    self.downstream_modules_ctx
                        .response_header_filter(resp, *end)
                        .await?;
                }
                HttpTask::Body(data, end) => {
                    self.downstream_modules_ctx
                        .response_body_filter(data, *end)?;
                }
                HttpTask::Trailer(trailers) => {
                    if let Some(buf) = self
                        .downstream_modules_ctx
                        .response_trailer_filter(trailers)?
                    {
                        // Write the trailers into the body if the filter
                        // returns a buffer.
                        //
                        // Note, this will not work if end of stream has already
                        // been seen or we've written content-length bytes.
                        *task = HttpTask::Body(Some(buf), true);
                    }
                }
                _ => { /* Done or Failed */ }
            }
        }
        self.downstream_session.response_duplex_vec(tasks).await
    }
}

impl AsRef<HttpSession> for Session {
    fn as_ref(&self) -> &HttpSession {
        &self.downstream_session
    }
}

impl AsMut<HttpSession> for Session {
    fn as_mut(&mut self) -> &mut HttpSession {
        &mut self.downstream_session
    }
}

use std::ops::{Deref, DerefMut};

impl Deref for Session {
    type Target = HttpSession;

    fn deref(&self) -> &Self::Target {
        &self.downstream_session
    }
}

impl DerefMut for Session {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.downstream_session
    }
}

// generic HTTP 502 response sent when proxy_upstream_filter refuses to connect to upstream
static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
    let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
    resp.insert_header(header::SERVER, &SERVER_NAME[..])
        .unwrap();
    resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
    resp.insert_header(header::CACHE_CONTROL, "private, no-store")
        .unwrap();

    resp
});

impl<SV> HttpProxy<SV> {
    async fn process_request(
        self: &Arc<Self>,
        mut session: Session,
        mut ctx: <SV as ProxyHttp>::CTX,
    ) -> Option<Stream>
    where
        SV: ProxyHttp + Send + Sync + 'static,
        <SV as ProxyHttp>::CTX: Send + Sync,
    {
        if let Err(e) = self
            .inner
            .early_request_filter(&mut session, &mut ctx)
            .await
        {
            self.handle_error(&mut session, &mut ctx, e, "Fail to early filter request:")
                .await;
            return None;
        }

        let req = session.downstream_session.req_header_mut();

        // Built-in downstream request filters go first
        if let Err(e) = session
            .downstream_modules_ctx
            .request_header_filter(req)
            .await
        {
            self.handle_error(
                &mut session,
                &mut ctx,
                e,
                "Failed in downstream modules request filter:",
            )
            .await;
            return None;
        }

        match self.inner.request_filter(&mut session, &mut ctx).await {
            Ok(response_sent) => {
                if response_sent {
                    // TODO: log error
                    self.inner.logging(&mut session, None, &mut ctx).await;
                    return session.downstream_session.finish().await.ok().flatten();
                }
                /* else continue */
            }
            Err(e) => {
                self.handle_error(&mut session, &mut ctx, e, "Fail to filter request:")
                    .await;
                return None;
            }
        }

        if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
            // cache hit
            return self.finish(session, &mut ctx, reuse, err.as_deref()).await;
        }
        // either uncacheable, or cache miss

        // decide if the request is allowed to go to upstream
        match self
            .inner
            .proxy_upstream_filter(&mut session, &mut ctx)
            .await
        {
            Ok(proxy_to_upstream) => {
                if !proxy_to_upstream {
                    // The hook can choose to write its own response, but if it doesn't, we respond
                    // with a generic 502
                    if session.cache.enabled() {
                        // drop the cache lock that this request may be holding onto
                        session.cache.disable(NoCacheReason::DeclinedToUpstream);
                    }
                    if session.response_written().is_none() {
                        match session.write_response_header_ref(&BAD_GATEWAY).await {
                            Ok(()) => {}
                            Err(e) => {
                                self.handle_error(
                                    &mut session,
                                    &mut ctx,
                                    e,
                                    "Error responding with Bad Gateway:",
                                )
                                .await;

                                return None;
                            }
                        }
                    }

                    return self.finish(session, &mut ctx, false, None).await;
                }
                /* else continue */
            }
            Err(e) => {
                if session.cache.enabled() {
                    session.cache.disable(NoCacheReason::InternalError);
                }

                self.handle_error(
                    &mut session,
                    &mut ctx,
                    e,
                    "Error deciding if we should proxy to upstream:",
                )
                .await;
                return None;
            }
        }

        let mut retries: usize = 0;

        let mut server_reuse = false;
        let mut proxy_error: Option<Box<Error>> = None;

        while retries < MAX_RETRIES {
            retries += 1;

            let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
            server_reuse = reuse;

            match e {
                Some(error) => {
                    let retry = error.retry();
                    proxy_error = Some(error);
                    if !retry {
                        break;
                    }
                    // only log error that will be retried here, the final error will be logged below
                    warn!(
                        "Fail to proxy: {}, tries: {}, retry: {}, {}",
                        proxy_error.as_ref().unwrap(),
                        retries,
                        retry,
                        self.inner.request_summary(&session, &ctx)
                    );
                }
                None => {
                    proxy_error = None;
                    break;
                }
            };
        }

        // serve stale if error
        // Check both error and cache before calling the function because await is not cheap
        let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
            self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
                .await
        } else {
            None
        };

        let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
            // don't reuse server conn if serve stale polluted it
            server_reuse = server_reuse && reuse;
            stale_cache_error
        } else {
            proxy_error
        };

        if let Some(e) = final_error.as_ref() {
            // If we have errored and are still holding a cache lock, release it.
            if session.cache.enabled() {
                let reason = if *e.esource() == ErrorSource::Upstream {
                    NoCacheReason::UpstreamError
                } else {
                    NoCacheReason::InternalError
                };
                session.cache.disable(reason);
            }
            let status = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;

            // final error will have > 0 status unless downstream connection is dead
            if !self.inner.suppress_error_log(&session, &ctx, e) {
                error!(
                    "Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
                    final_error.as_ref().unwrap(),
                    status,
                    retries,
                    false, // we never retry here
                    self.inner.request_summary(&session, &ctx)
                );
            }
        }

        // logging() will be called in finish()
        self.finish(session, &mut ctx, server_reuse, final_error.as_deref())
            .await
    }

    async fn handle_error(
        &self,
        session: &mut Session,
        ctx: &mut <SV as ProxyHttp>::CTX,
        e: Box<Error>,
        context: &str,
    ) where
        SV: ProxyHttp + Send + Sync + 'static,
        <SV as ProxyHttp>::CTX: Send + Sync,
    {
        if !self.inner.suppress_error_log(session, ctx, &e) {
            error!(
                "{context} {}, {}",
                e,
                self.inner.request_summary(session, ctx)
            );
        }
        self.inner.fail_to_proxy(session, &e, ctx).await;
        self.inner.logging(session, Some(&e), ctx).await;
    }
}

/* Make process_subrequest() a trait to workaround https://github.com/rust-lang/rust/issues/78649
   if process_subrequest() is implemented as a member of HttpProxy, rust complains

error[E0391]: cycle detected when computing type of proxy_cache::<impl at pingora-proxy/src/proxy_cache.rs:7:1: 7:23>::proxy_cache::{opaque#0}
   --> pingora-proxy/src/proxy_cache.rs:13:10
    |
13  |     ) -> Option<(bool, Option<Box<Error>>)>

*/
#[async_trait]
trait Subrequest {
    async fn process_subrequest(
        self: &Arc<Self>,
        session: Box<HttpSession>,
        sub_req_ctx: Box<SubReqCtx>,
    );
}

#[async_trait]
impl<SV> Subrequest for HttpProxy<SV>
where
    SV: ProxyHttp + Send + Sync + 'static,
    <SV as ProxyHttp>::CTX: Send + Sync,
{
    async fn process_subrequest(
        self: &Arc<Self>,
        session: Box<HttpSession>,
        sub_req_ctx: Box<SubReqCtx>,
    ) {
        debug!("starting subrequest");
        let mut session = match self.handle_new_request(session).await {
            Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
            None => return, // bad request
        };

        // no real downstream to keepalive, but it doesn't matter what is set here because at the end
        // of this fn the dummy connection will be dropped
        session.set_keepalive(None);

        session.subrequest_ctx.replace(sub_req_ctx);
        trace!("processing subrequest");
        let ctx = self.inner.new_ctx();
        self.process_request(session, ctx).await;
        trace!("subrequest done");
    }
}

#[async_trait]
impl<SV> HttpServerApp for HttpProxy<SV>
where
    SV: ProxyHttp + Send + Sync + 'static,
    <SV as ProxyHttp>::CTX: Send + Sync,
{
    async fn process_new_http(
        self: &Arc<Self>,
        session: HttpSession,
        shutdown: &ShutdownWatch,
    ) -> Option<Stream> {
        let session = Box::new(session);

        // TODO: keepalive pool, use stack
        let mut session = match self.handle_new_request(session).await {
            Some(downstream_session) => Session::new(downstream_session, &self.downstream_modules),
            None => return None, // bad request
        };

        if *shutdown.borrow() {
            // stop downstream from reusing if this service is shutting down soon
            session.set_keepalive(None);
        } else {
            // default 60s
            session.set_keepalive(Some(60));
        }

        let ctx = self.inner.new_ctx();
        self.process_request(session, ctx).await
    }

    async fn http_cleanup(&self) {
        // Notify all keepalived requests blocking on read_request() to abort
        self.shutdown.notify_waiters();

        // TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down()
    }

    fn server_options(&self) -> Option<&HttpServerOptions> {
        self.server_options.as_ref()
    }

    // TODO implement h2_options
}

use pingora_core::services::listening::Service;

/// Create a [Service] from the user implemented [ProxyHttp].
///
/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>>
where
    SV: ProxyHttp,
{
    http_proxy_service_with_name(conf, inner, "Pingora HTTP Proxy Service")
}

/// Create a [Service] from the user implemented [ProxyHttp].
///
/// The returned [Service] can be hosted by a [pingora_core::server::Server] directly.
pub fn http_proxy_service_with_name<SV>(
    conf: &Arc<ServerConf>,
    inner: SV,
    name: &str,
) -> Service<HttpProxy<SV>>
where
    SV: ProxyHttp,
{
    let mut proxy = HttpProxy::new(inner, conf.clone());
    proxy.handle_init_modules();
    Service::new(name.to_string(), proxy)
}

// proxy_common.rs

/// Possible downstream states during request multiplexing
#[derive(Debug, Clone, Copy)]
pub(crate) enum DownstreamStateMachine {
    /// more request (body) to read
    Reading,
    /// no more data to read
    ReadingFinished,
    /// downstream is already errored or closed
    Errored,
}

#[allow(clippy::wrong_self_convention)]
impl DownstreamStateMachine {
    pub fn new(finished: bool) -> Self {
        if finished {
            Self::ReadingFinished
        } else {
            Self::Reading
        }
    }

    // Can call read() to read more data or wait on closing
    pub fn can_poll(&self) -> bool {
        !matches!(self, Self::Errored)
    }

    pub fn is_reading(&self) -> bool {
        matches!(self, Self::Reading)
    }

    pub fn is_done(&self) -> bool {
        !matches!(self, Self::Reading)
    }

    pub fn is_errored(&self) -> bool {
        matches!(self, Self::Errored)
    }

    /// Move the state machine to Finished state if set is true
    pub fn maybe_finished(&mut self, set: bool) {
        if set {
            *self = Self::ReadingFinished
        }
    }

    pub fn to_errored(&mut self) {
        *self = Self::Errored
    }
}

/// Possible upstream states during request multiplexing
#[derive(Debug, Clone, Copy)]
pub(crate) struct ResponseStateMachine {
    upstream_response_done: bool,
    cached_response_done: bool,
}

impl ResponseStateMachine {
    pub fn new() -> Self {
        ResponseStateMachine {
            upstream_response_done: false,
            cached_response_done: true, // no cached response by default
        }
    }

    pub fn is_done(&self) -> bool {
        self.upstream_response_done && self.cached_response_done
    }

    pub fn upstream_done(&self) -> bool {
        self.upstream_response_done
    }

    pub fn cached_done(&self) -> bool {
        self.cached_response_done
    }

    pub fn enable_cached_response(&mut self) {
        self.cached_response_done = false;
    }

    pub fn maybe_set_upstream_done(&mut self, done: bool) {
        if done {
            self.upstream_response_done = true;
        }
    }

    pub fn maybe_set_cache_done(&mut self, done: bool) {
        if done {
            self.cached_response_done = true;
        }
    }
}

// proxy_trait.rs

// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::*;
use pingora_cache::{
    key::HashBinary,
    CacheKey, CacheMeta, ForcedInvalidationKind,
    RespCacheable::{self, *},
};
use proxy_cache::range_filter::{self};
use std::time::Duration;

/// The interface to control the HTTP proxy
///
/// The methods in [ProxyHttp] are filters/callbacks which will be performed on all requests at their
/// particular stage (if applicable).
///
/// If any of the filters returns [Result::Err], the request will fail, and the error will be logged.
#[cfg_attr(not(doc_async_trait), async_trait)]
pub trait ProxyHttp {
    /// The per request object to share state across the different filters
    type CTX;

    /// Define how the ctx should be created.
    fn new_ctx(&self) -> Self::CTX;

    /// Define where the proxy should send the request to.
    ///
    /// The returned [HttpPeer] contains the information regarding where and how this request should
    /// be forwarded to.
    async fn upstream_peer(
        &self,
        session: &mut Session,
        ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer>>;

    /// Set up downstream modules.
    ///
    /// In this phase, users can add or configure [HttpModules] before the server starts up.
    ///
    /// In the default implementation of this method, [ResponseCompressionBuilder] is added
    /// and disabled.
    fn init_downstream_modules(&self, modules: &mut HttpModules) {
        // Add disabled downstream compression module by default
        modules.add_module(ResponseCompressionBuilder::enable(0));
    }

    /// Handle the incoming request.
    ///
    /// In this phase, users can parse, validate, rate limit, perform access control and/or
    /// return a response for this request.
    ///
    /// If the user already sent a response to this request, an Ok(true) should be returned so that
    /// the proxy would exit. The proxy continues to the next phases when Ok(false) is returned.
    ///
    /// By default this filter does nothing and returns Ok(false).
    async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
    where
        Self::CTX: Send + Sync,
    {
        Ok(false)
    }

    /// Handle the incoming request before any downstream module is executed.
    ///
    /// This function is similar to [Self::request_filter()] but executes before any other logic,
    /// including downstream module logic. The main purpose of this function is to provide finer
    /// grained control of the behavior of the modules.
    ///
    /// Note that because this function is executed before any module that might provide access
    /// control or rate limiting, logic should stay in request_filter() if it can in order to be
    /// protected by said modules.
    async fn early_request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()>
    where
        Self::CTX: Send + Sync,
    {
        Ok(())
    }

    /// Handle the incoming request body.
    ///
    /// This function will be called every time a piece of request body is received. The body is
    /// **not the entire request body**.
    ///
    /// The async nature of this function allows to throttle the upload speed and/or executing
    /// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
    /// who process the requests themselves.
    async fn request_body_filter(
        &self,
        _session: &mut Session,
        _body: &mut Option<Bytes>,
        _end_of_stream: bool,
        _ctx: &mut Self::CTX,
    ) -> Result<()>
    where
        Self::CTX: Send + Sync,
    {
        Ok(())
    }

    /// This filter decides if the request is cacheable and what cache backend to use
    ///
    /// The caller can interact with Session.cache to enable caching.
    ///
    /// By default this filter does nothing which effectively disables caching.
    // Ideally only session.cache should be modified, TODO: reflect that in this interface
    fn request_cache_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()> {
        Ok(())
    }

    /// This callback generates the cache key
    ///
    /// This callback is called only when cache is enabled for this request
    ///
    /// By default this callback returns a default cache key generated from the request.
    fn cache_key_callback(&self, session: &Session, _ctx: &mut Self::CTX) -> Result<CacheKey> {
        let req_header = session.req_header();
        Ok(CacheKey::default(req_header))
    }

    /// This callback is invoked when a cacheable response is ready to be admitted to cache
    fn cache_miss(&self, session: &mut Session, _ctx: &mut Self::CTX) {
        session.cache.cache_miss();
    }

    /// This filter is called after a successful cache lookup and before the
    /// cache asset is ready to be used.
    ///
    /// This filter allows the user to log or force invalidate the asset.
    ///
    /// The value returned indicates if the force invalidation should be used,
    /// and which kind. Returning None indicates no forced invalidation
    async fn cache_hit_filter(
        &self,
        _session: &Session,
        _meta: &CacheMeta,
        _ctx: &mut Self::CTX,
    ) -> Result<Option<ForcedInvalidationKind>>
    where
        Self::CTX: Send + Sync,
    {
        Ok(None)
    }

    /// Decide if a request should continue to upstream after not being served from cache.
    ///
    /// returns: Ok(true) if the request should continue, Ok(false) if a response was written by the
    /// callback and the session should be finished, or an error
    ///
    /// This filter can be used for deferring checks like rate limiting or access control to when they
    /// actually needed after cache miss.
    async fn proxy_upstream_filter(
        &self,
        _session: &mut Session,
        _ctx: &mut Self::CTX,
    ) -> Result<bool>
    where
        Self::CTX: Send + Sync,
    {
        Ok(true)
    }

    /// Decide if the response is cacheable
    fn response_cache_filter(
        &self,
        _session: &Session,
        _resp: &ResponseHeader,
        _ctx: &mut Self::CTX,
    ) -> Result<RespCacheable> {
        Ok(Uncacheable(NoCacheReason::Custom("default")))
    }

    /// Decide how to generate cache vary key from both request and response
    ///
    /// None means no variance is needed.
    fn cache_vary_filter(
        &self,
        _meta: &CacheMeta,
        _ctx: &mut Self::CTX,
        _req: &RequestHeader,
    ) -> Option<HashBinary> {
        // default to None for now to disable vary feature
        None
    }

    /// Decide if the incoming request's condition _fails_ against the cached response.
    ///
    /// Returning Ok(true) means that the response does _not_ match against the condition, and
    /// that the proxy can return 304 Not Modified downstream.
    ///
    /// An example is a conditional GET request with If-None-Match: "foobar". If the cached
    /// response contains the ETag: "foobar", then the condition fails, and 304 Not Modified
    /// should be returned. Else, the condition passes which means the full 200 OK response must
    /// be sent.
    fn cache_not_modified_filter(
        &self,
        session: &Session,
        resp: &ResponseHeader,
        _ctx: &mut Self::CTX,
    ) -> Result<bool> {
        Ok(
            pingora_core::protocols::http::conditional_filter::not_modified_filter(
                session.req_header(),
                resp,
            ),
        )
    }

    /// This filter is called when cache is enabled to determine what byte range to return (in both
    /// cache hit and miss cases) from the response body. It is only used when caching is enabled,
    /// otherwise the upstream is responsible for any filtering. It allows users to define the range
    /// this request is for via its return type range_filter::RangeType.
    ///
    /// It also allow users to modify the response header accordingly.
    ///
    /// The default implementation can handle a single-range as per [RFC7232].
    fn range_header_filter(
        &self,
        req: &RequestHeader,
        resp: &mut ResponseHeader,
        _ctx: &mut Self::CTX,
    ) -> range_filter::RangeType {
        proxy_cache::range_filter::range_header_filter(req, resp)
    }

    /// Modify the request before it is sent to the upstream
    ///
    /// Unlike [Self::request_filter()], this filter allows to change the request headers to send
    /// to the upstream.
    async fn upstream_request_filter(
        &self,
        _session: &mut Session,
        _upstream_request: &mut RequestHeader,
        _ctx: &mut Self::CTX,
    ) -> Result<()>
    where
        Self::CTX: Send + Sync,
    {
        Ok(())
    }

    /// Modify the response header from the upstream
    ///
    /// The modification is before caching, so any change here will be stored in the cache if enabled.
    ///
    /// Responses served from cache won't trigger this filter. If the cache needed revalidation,
    /// only the 304 from upstream will trigger the filter (though it will be merged into the
    /// cached header, not served directly to downstream).
    fn upstream_response_filter(
        &self,
        _session: &mut Session,
        _upstream_response: &mut ResponseHeader,
        _ctx: &mut Self::CTX,
    ) {
    }

    /// Modify the response header before it is send to the downstream
    ///
    /// The modification is after caching. This filter is called for all responses including
    /// responses served from cache.
    async fn response_filter(
        &self,
        _session: &mut Session,
        _upstream_response: &mut ResponseHeader,
        _ctx: &mut Self::CTX,
    ) -> Result<()>
    where
        Self::CTX: Send + Sync,
    {
        Ok(())
    }

    /// Similar to [Self::upstream_response_filter()] but for response body
    ///
    /// This function will be called every time a piece of response body is received. The body is
    /// **not the entire response body**.
    fn upstream_response_body_filter(
        &self,
        _session: &mut Session,
        _body: &mut Option<Bytes>,
        _end_of_stream: bool,
        _ctx: &mut Self::CTX,
    ) -> Result<()> {
        Ok(())
    }

    /// Similar to [Self::upstream_response_filter()] but for response trailers
    fn upstream_response_trailer_filter(
        &self,
        _session: &mut Session,
        _upstream_trailers: &mut header::HeaderMap,
        _ctx: &mut Self::CTX,
    ) -> Result<()> {
        Ok(())
    }

    /// Similar to [Self::response_filter()] but for response body chunks
    fn response_body_filter(
        &self,
        _session: &mut Session,
        _body: &mut Option<Bytes>,
        _end_of_stream: bool,
        _ctx: &mut Self::CTX,
    ) -> Result<Option<Duration>>
    where
        Self::CTX: Send + Sync,
    {
        Ok(None)
    }

    /// Similar to [Self::response_filter()] but for response trailers.
    /// Note, returning an Ok(Some(Bytes)) will result in the downstream response
    /// trailers being written to the response body.
    ///
    /// TODO: make this interface more intuitive
    async fn response_trailer_filter(
        &self,
        _session: &mut Session,
        _upstream_trailers: &mut header::HeaderMap,
        _ctx: &mut Self::CTX,
    ) -> Result<Option<Bytes>>
    where
        Self::CTX: Send + Sync,
    {
        Ok(None)
    }

    /// This filter is called when the entire response is sent to the downstream successfully or
    /// there is a fatal error that terminate the request.
    ///
    /// An error log is already emitted if there is any error. This phase is used for collecting
    /// metrics and sending access logs.
    async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX)
    where
        Self::CTX: Send + Sync,
    {
    }

    /// A value of true means that the log message will be suppressed. The default value is false.
    fn suppress_error_log(&self, _session: &Session, _ctx: &Self::CTX, _error: &Error) -> bool {
        false
    }

    /// This filter is called when there is an error **after** a connection is established (or reused)
    /// to the upstream.
    fn error_while_proxy(
        &self,
        peer: &HttpPeer,
        session: &mut Session,
        e: Box<Error>,
        _ctx: &mut Self::CTX,
        client_reused: bool,
    ) -> Box<Error> {
        let mut e = e.more_context(format!("Peer: {}", peer));
        // only reused client connections where retry buffer is not truncated
        e.retry
            .decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
        e
    }

    /// This filter is called when there is an error in the process of establishing a connection
    /// to the upstream.
    ///
    /// In this filter the user can decide whether the error is retry-able by marking the error e.
    ///
    /// If the error can be retried, [Self::upstream_peer()] will be called again so that the user
    /// can decide whether to send the request to the same upstream or another upstream that is possibly
    /// available.
    fn fail_to_connect(
        &self,
        _session: &mut Session,
        _peer: &HttpPeer,
        _ctx: &mut Self::CTX,
        e: Box<Error>,
    ) -> Box<Error> {
        e
    }

    /// This filter is called when the request encounters a fatal error.
    ///
    /// Users may write an error response to the downstream if the downstream is still writable.
    ///
    /// The response status code of the error response maybe returned for logging purpose.
    async fn fail_to_proxy(&self, session: &mut Session, e: &Error, _ctx: &mut Self::CTX) -> u16
    where
        Self::CTX: Send + Sync,
    {
        let code = match e.etype() {
            HTTPStatus(code) => *code,
            _ => {
                match e.esource() {
                    ErrorSource::Upstream => 502,
                    ErrorSource::Downstream => {
                        match e.etype() {
                            WriteError | ReadError | ConnectionClosed => {
                                /* conn already dead */
                                0
                            }
                            _ => 400,
                        }
                    }
                    ErrorSource::Internal | ErrorSource::Unset => 500,
                }
            }
        };
        if code > 0 {
            session.respond_error(code).await.unwrap_or_else(|e| {
                error!("failed to send error response to downstream: {e}");
            });
        }
        code
    }

    /// Decide whether should serve stale when encountering an error or during revalidation
    ///
    /// An implementation should follow
    /// <https://datatracker.ietf.org/doc/html/rfc9111#section-4.2.4>
    /// <https://www.rfc-editor.org/rfc/rfc5861#section-4>
    ///
    /// This filter is only called if cache is enabled.
    // 5xx HTTP status will be encoded as ErrorType::HTTPStatus(code)
    fn should_serve_stale(
        &self,
        _session: &mut Session,
        _ctx: &mut Self::CTX,
        error: Option<&Error>, // None when it is called during stale while revalidate
    ) -> bool {
        // A cache MUST NOT generate a stale response unless
        // it is disconnected
        // or doing so is explicitly permitted by the client or origin server
        // (e.g. headers or an out-of-band contract)
        error.map_or(false, |e| e.esource() == &ErrorSource::Upstream)
    }

    /// This filter is called when the request just established or reused a connection to the upstream
    ///
    /// This filter allows user to log timing and connection related info.
    async fn connected_to_upstream(
        &self,
        _session: &mut Session,
        _reused: bool,
        _peer: &HttpPeer,
        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
        _digest: Option<&Digest>,
        _ctx: &mut Self::CTX,
    ) -> Result<()>
    where
        Self::CTX: Send + Sync,
    {
        Ok(())
    }

    /// This callback is invoked every time request related error log needs to be generated
    ///
    /// Users can define what is important to be written about this request via the returned string.
    fn request_summary(&self, session: &Session, _ctx: &Self::CTX) -> String {
        session.as_ref().request_summary()
    }

    /// Whether the request should be used to invalidate(delete) the HTTP cache
    ///
    /// - true: this request will be used to invalidate the cache.
    /// - false: this request is a treated as a normal request
    fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
        false
    }

    /// This filter is called after the proxy cache generates the downstream response to the purge
    /// request (to invalidate or delete from the HTTP cache), based on the purge status, which
    /// indicates whether the request succeeded or failed.
    ///
    /// The filter allows the user to modify or replace the generated downstream response.
    /// If the filter returns Err, the proxy will instead send a 500 response.
    fn purge_response_filter(
        &self,
        _session: &Session,
        _ctx: &mut Self::CTX,
        _purge_status: PurgeStatus,
        _purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
    ) -> Result<()> {
        Ok(())
    }
}

proxy_subrequest.rs

// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use core::pin::Pin;
use core::task::{Context, Poll};
use pingora_cache::lock::WritePermit;
use pingora_core::protocols::raw_connect::ProxyDigest;
use pingora_core::protocols::{
    GetProxyDigest, GetSocketDigest, GetTimingDigest, Peek, SocketDigest, Ssl, TimingDigest,
    UniqueID, UniqueIDType,
};
use std::io::Cursor;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, Error, ReadBuf};

// An async IO stream that returns the request when being read from and dumps the data to the void
// when being write to
#[derive(Debug)]
pub(crate) struct DummyIO(Cursor<Vec<u8>>);

impl DummyIO {
    pub fn new(read_bytes: &[u8]) -> Self {
        DummyIO(Cursor::new(Vec::from(read_bytes)))
    }
}

impl AsyncRead for DummyIO {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<Result<(), Error>> {
        if self.0.position() < self.0.get_ref().len() as u64 {
            Pin::new(&mut self.0).poll_read(cx, buf)
        } else {
            // all data is read, pending forever otherwise the stream is considered closed
            Poll::Pending
        }
    }
}

impl AsyncWrite for DummyIO {
    fn poll_write(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, Error>> {
        Poll::Ready(Ok(buf.len()))
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
        Poll::Ready(Ok(()))
    }
    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
        Poll::Ready(Ok(()))
    }
}

impl UniqueID for DummyIO {
    fn id(&self) -> UniqueIDType {
        0 // placeholder
    }
}

impl Ssl for DummyIO {}

impl GetTimingDigest for DummyIO {
    fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
        vec![]
    }
}

impl GetProxyDigest for DummyIO {
    fn get_proxy_digest(&self) -> Option<Arc<ProxyDigest>> {
        None
    }
}

impl GetSocketDigest for DummyIO {
    fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
        None
    }
}

impl Peek for DummyIO {}

#[async_trait]
impl pingora_core::protocols::Shutdown for DummyIO {
    async fn shutdown(&mut self) -> () {}
}

#[tokio::test]
async fn test_dummy_io() {
    use futures::FutureExt;
    use tokio::io::{AsyncReadExt, AsyncWriteExt};

    let mut dummy = DummyIO::new(&[1, 2]);
    let res = dummy.read_u8().await;
    assert_eq!(res.unwrap(), 1);
    let res = dummy.read_u8().await;
    assert_eq!(res.unwrap(), 2);
    let res = dummy.read_u8().now_or_never();
    assert!(res.is_none()); // pending forever
    let res = dummy.write_u8(0).await;
    assert!(res.is_ok());
}

// To share state across the parent req and the sub req
pub(crate) struct Ctx {
    pub(crate) write_lock: Option<WritePermit>,
}

use crate::HttpSession;

pub fn create_dummy_session(parsed_session: &HttpSession) -> HttpSession {
    // TODO: check if there is req body, we don't capture the body for now
    HttpSession::new_http1(Box::new(DummyIO::new(&parsed_session.to_h1_raw())))
}

#[tokio::test]
async fn test_dummy_request() {
    use tokio_test::io::Builder;

    let input = b"GET / HTTP/1.1\r\n\r\n";
    let mock_io = Builder::new().read(&input[..]).build();
    let mut req = HttpSession::new_http1(Box::new(mock_io));
    req.read_request().await.unwrap();
    assert_eq!(input.as_slice(), req.to_h1_raw());

    let mut dummy_req = create_dummy_session(&req);
    dummy_req.read_request().await.unwrap();
    assert_eq!(input.as_slice(), req.to_h1_raw());
}

proxy_purge.rs
// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::*;
use pingora_core::protocols::http::error_resp;
use std::borrow::Cow;

#[derive(Debug)]
pub enum PurgeStatus {
    /// Cache was not enabled, purge ineffectual.
    NoCache,
    /// Asset was found in cache (and presumably purged or being purged).
    Found,
    /// Asset was not found in cache.
    NotFound,
    /// Cache returned a purge error.
    /// Contains causing error in case it should affect the downstream response.
    Error(Box<Error>),
}

// Return a canned response to a purge request, based on whether the cache had the asset or not
// (or otherwise returned an error).
fn purge_response(purge_status: &PurgeStatus) -> Cow<'static, ResponseHeader> {
    let resp = match purge_status {
        PurgeStatus::NoCache => &*NOT_PURGEABLE,
        PurgeStatus::Found => &*OK,
        PurgeStatus::NotFound => &*NOT_FOUND,
        PurgeStatus::Error(ref _e) => &*INTERNAL_ERROR,
    };
    Cow::Borrowed(resp)
}

fn gen_purge_response(code: u16) -> ResponseHeader {
    let mut resp = ResponseHeader::build(code, Some(3)).unwrap();
    resp.insert_header(header::SERVER, &SERVER_NAME[..])
        .unwrap();
    resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
    resp.insert_header(header::CACHE_CONTROL, "private, no-store")
        .unwrap();
    // TODO more headers?
    resp
}

static OK: Lazy<ResponseHeader> = Lazy::new(|| gen_purge_response(200));
static NOT_FOUND: Lazy<ResponseHeader> = Lazy::new(|| gen_purge_response(404));
// for when purge is sent to uncacheable assets
static NOT_PURGEABLE: Lazy<ResponseHeader> = Lazy::new(|| gen_purge_response(405));
// on cache storage or proxy error
static INTERNAL_ERROR: Lazy<ResponseHeader> = Lazy::new(|| error_resp::gen_error_response(500));

impl<SV> HttpProxy<SV> {
    pub(crate) async fn proxy_purge(
        &self,
        session: &mut Session,
        ctx: &mut SV::CTX,
    ) -> Option<(bool, Option<Box<Error>>)>
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        let purge_status = if session.cache.enabled() {
            match session.cache.purge().await {
                Ok(found) => {
                    if found {
                        PurgeStatus::Found
                    } else {
                        PurgeStatus::NotFound
                    }
                }
                Err(e) => {
                    session.cache.disable(NoCacheReason::StorageError);
                    warn!(
                        "Fail to purge cache: {e}, {}",
                        self.inner.request_summary(session, ctx)
                    );
                    PurgeStatus::Error(e)
                }
            }
        } else {
            // cache was not enabled
            PurgeStatus::NoCache
        };

        let mut purge_resp = purge_response(&purge_status);
        if let Err(e) =
            self.inner
                .purge_response_filter(session, ctx, purge_status, &mut purge_resp)
        {
            error!(
                "Failed purge response filter: {e}, {}",
                self.inner.request_summary(session, ctx)
            );
            purge_resp = Cow::Borrowed(&*INTERNAL_ERROR)
        }

        let write_result = match purge_resp {
            Cow::Borrowed(r) => session.as_mut().write_response_header_ref(r).await,
            Cow::Owned(r) => session.as_mut().write_response_header(Box::new(r)).await,
        };
        let (reuse, err) = match write_result {
            Ok(_) => (true, None),
            // dirty, not reusable
            Err(e) => {
                let e = e.into_down();
                error!(
                    "Failed to send purge response: {e}, {}",
                    self.inner.request_summary(session, ctx)
                );
                (false, Some(e))
            }
        };
        Some((reuse, err))
    }
}

proxy_cache.rs

// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::*;
use http::{Method, StatusCode};
use pingora_cache::key::CacheHashKey;
use pingora_cache::lock::LockStatus;
use pingora_cache::max_file_size::ERR_RESPONSE_TOO_LARGE;
use pingora_cache::{ForcedInvalidationKind, HitStatus, RespCacheable::*};
use pingora_core::protocols::http::conditional_filter::to_304;
use pingora_core::protocols::http::v1::common::header_value_content_length;
use pingora_core::ErrorType;

impl<SV> HttpProxy<SV> {
    // return bool: server_session can be reused, and error if any
    pub(crate) async fn proxy_cache(
        self: &Arc<Self>,
        session: &mut Session,
        ctx: &mut SV::CTX,
    ) -> Option<(bool, Option<Box<Error>>)>
    // None: continue to proxy, Some: return
    where
        SV: ProxyHttp + Send + Sync + 'static,
        SV::CTX: Send + Sync,
    {
        // Cache logic request phase
        if let Err(e) = self.inner.request_cache_filter(session, ctx) {
            // TODO: handle this error
            warn!(
                "Fail to request_cache_filter: {e}, {}",
                self.inner.request_summary(session, ctx)
            );
        }

        // cache key logic, should this be part of request_cache_filter?
        if session.cache.enabled() {
            match self.inner.cache_key_callback(session, ctx) {
                Ok(key) => {
                    session.cache.set_cache_key(key);
                }
                Err(e) => {
                    // TODO: handle this error
                    session.cache.disable(NoCacheReason::StorageError);
                    warn!(
                        "Fail to cache_key_callback: {e}, {}",
                        self.inner.request_summary(session, ctx)
                    );
                }
            }
        }

        // cache purge logic: PURGE short-circuits rest of request
        if self.inner.is_purge(session, ctx) {
            return self.proxy_purge(session, ctx).await;
        }

        // bypass cache lookup if we predict to be uncacheable
        if session.cache.enabled() && !session.cache.cacheable_prediction() {
            session.cache.bypass();
        }

        if !session.cache.enabled() {
            return None;
        }

        // cache lookup logic
        loop {
            // for cache lock, TODO: cap the max number of loops
            match session.cache.cache_lookup().await {
                Ok(res) => {
                    let mut hit_status_opt = None;
                    if let Some((mut meta, handler)) = res {
                        // Vary logic
                        // Because this branch can be called multiple times in a loop, and we only
                        // need to update the vary once, check if variance is already set to
                        // prevent unnecessary vary lookups.
                        let cache_key = session.cache.cache_key();
                        if let Some(variance) = cache_key.variance_bin() {
                            // We've looked up a secondary slot.
                            // Adhoc double check that the variance found is the variance we want.
                            if Some(variance) != meta.variance() {
                                warn!("Cache variance mismatch, {variance:?}, {cache_key:?}");
                                session.cache.disable(NoCacheReason::InternalError);
                                break None;
                            }
                        } else {
                            // Basic cache key; either variance is off, or this is the primary slot.
                            let req_header = session.req_header();
                            let variance = self.inner.cache_vary_filter(&meta, ctx, req_header);
                            if let Some(variance) = variance {
                                // Variance is on. This is the primary slot.
                                if !session.cache.cache_vary_lookup(variance, &meta) {
                                    // This wasn't the desired variant. Updated cache key variance, cause another
                                    // lookup to get the desired variant, which would be in a secondary slot.
                                    continue;
                                }
                            } // else: vary is not in use
                        }

                        // Either no variance, or the current handler targets the correct variant.

                        // hit
                        // TODO: maybe round and/or cache now()
                        let hit_status = if meta.is_fresh(std::time::SystemTime::now()) {
                            // check if we should force expire or miss
                            // vs. hard purge which forces miss)
                            match self.inner.cache_hit_filter(session, &meta, ctx).await {
                                Err(e) => {
                                    error!(
                                        "Failed to filter cache hit: {e}, {}",
                                        self.inner.request_summary(session, ctx)
                                    );
                                    // this return value will cause us to fetch from upstream
                                    HitStatus::FailedHitFilter
                                }
                                Ok(None) => HitStatus::Fresh,
                                Ok(Some(ForcedInvalidationKind::ForceExpired)) => {
                                    // force expired asset should not be serve as stale
                                    // because force expire is usually to remove data
                                    meta.disable_serve_stale();
                                    HitStatus::ForceExpired
                                }
                                Ok(Some(ForcedInvalidationKind::ForceMiss)) => HitStatus::ForceMiss,
                            }
                        } else {
                            HitStatus::Expired
                        };

                        hit_status_opt = Some(hit_status);

                        // init cache for hit / stale
                        session.cache.cache_found(meta, handler, hit_status);
                    }

                    if hit_status_opt.map_or(true, HitStatus::is_treated_as_miss) {
                        // cache miss
                        if session.cache.is_cache_locked() {
                            // Another request is filling the cache; try waiting til that's done and retry.
                            let lock_status = session.cache.cache_lock_wait().await;
                            if self.handle_lock_status(session, ctx, lock_status) {
                                continue;
                            } else {
                                break None;
                            }
                        } else {
                            self.inner.cache_miss(session, ctx);
                            break None;
                        }
                    }

                    // Safe because an empty hit status would have broken out
                    // in the block above
                    let hit_status = hit_status_opt.expect("None case handled as miss");

                    if !hit_status.is_fresh() {
                        // expired or force expired asset
                        if session.cache.is_cache_locked() {
                            // first if this is the sub request for the background cache update
                            if let Some(write_lock) = session
                                .subrequest_ctx
                                .as_mut()
                                .and_then(|ctx| ctx.write_lock.take())
                            {
                                // Put the write lock in the request
                                session.cache.set_write_lock(write_lock);
                                session.cache.tag_as_subrequest();
                                // and then let it go to upstream
                                break None;
                            }
                            let will_serve_stale = session.cache.can_serve_stale_updating()
                                && self.inner.should_serve_stale(session, ctx, None);
                            if !will_serve_stale {
                                let lock_status = session.cache.cache_lock_wait().await;
                                if self.handle_lock_status(session, ctx, lock_status) {
                                    continue;
                                } else {
                                    break None;
                                }
                            }
                            // else continue to serve stale
                            session.cache.set_stale_updating();
                        } else if session.cache.is_cache_lock_writer() {
                            // stale while revalidate logic for the writer
                            let will_serve_stale = session.cache.can_serve_stale_updating()
                                && self.inner.should_serve_stale(session, ctx, None);
                            if will_serve_stale {
                                // create a background thread to do the actual update
                                let subrequest =
                                    Box::new(crate::subrequest::create_dummy_session(session));
                                let new_app = self.clone(); // Clone the Arc
                                let sub_req_ctx = Box::new(SubReqCtx {
                                    write_lock: Some(session.cache.take_write_lock()),
                                });
                                tokio::spawn(async move {
                                    new_app.process_subrequest(subrequest, sub_req_ctx).await;
                                });
                                // continue to serve stale for this request
                                session.cache.set_stale_updating();
                            } else {
                                // return to fetch from upstream
                                break None;
                            }
                        } else {
                            // return to fetch from upstream
                            break None;
                        }
                    }

                    let (reuse, err) = self.proxy_cache_hit(session, ctx).await;
                    if let Some(e) = err.as_ref() {
                        error!(
                            "Fail to serve cache: {e}, {}",
                            self.inner.request_summary(session, ctx)
                        );
                    }
                    // responses is served from cache, exit
                    break Some((reuse, err));
                }
                Err(e) => {
                    // Allow cache miss to fill cache even if cache lookup errors
                    // this is mostly to support backward incompatible metadata update
                    // TODO: check error types
                    // session.cache.disable();
                    self.inner.cache_miss(session, ctx);
                    warn!(
                        "Fail to cache lookup: {e}, {}",
                        self.inner.request_summary(session, ctx)
                    );
                    break None;
                }
            }
        }
    }

    // return bool: server_session can be reused, and error if any
    pub(crate) async fn proxy_cache_hit(
        &self,
        session: &mut Session,
        ctx: &mut SV::CTX,
    ) -> (bool, Option<Box<Error>>)
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        use range_filter::*;

        let seekable = session.cache.hit_handler().can_seek();
        let mut header = cache_hit_header(&session.cache);

        let req = session.req_header();

        let not_modified = match self.inner.cache_not_modified_filter(session, &header, ctx) {
            Ok(not_modified) => not_modified,
            Err(e) => {
                // fail open if cache_not_modified_filter errors,
                // just return the whole original response
                warn!(
                    "Failed to run cache not modified filter: {e}, {}",
                    self.inner.request_summary(session, ctx)
                );
                false
            }
        };
        if not_modified {
            to_304(&mut header);
        }
        let header_only = not_modified || req.method == http::method::Method::HEAD;

        // process range header if the cache storage supports seek
        let range_type = if seekable && !session.ignore_downstream_range {
            self.inner.range_header_filter(req, &mut header, ctx)
        } else {
            RangeType::None
        };

        // return a 416 with an empty body for simplicity
        let header_only = header_only || matches!(range_type, RangeType::Invalid);

        // TODO: use ProxyUseCache to replace the logic below
        match self.inner.response_filter(session, &mut header, ctx).await {
            Ok(_) => {
                if let Err(e) = session
                    .as_mut()
                    .write_response_header(header)
                    .await
                    .map_err(|e| e.into_down())
                {
                    // downstream connection is bad already
                    return (false, Some(e));
                }
            }
            Err(e) => {
                // TODO: more logging and error handling
                session
                    .as_mut()
                    .respond_error(500)
                    .await
                    .unwrap_or_else(|e| {
                        error!("failed to send error response to downstream: {e}");
                    });
                // we have not write anything dirty to downstream, it is still reusable
                return (true, Some(e));
            }
        }
        debug!("finished sending cached header to downstream");

        if !header_only {
            if let RangeType::Single(r) = range_type {
                if let Err(e) = session.cache.hit_handler().seek(r.start, Some(r.end)) {
                    return (false, Some(e));
                }
            }
            loop {
                match session.cache.hit_handler().read_body().await {
                    Ok(mut body) => {
                        let end = body.is_none();
                        match self
                            .inner
                            .response_body_filter(session, &mut body, end, ctx)
                        {
                            Ok(Some(duration)) => {
                                trace!("delaying response for {duration:?}");
                                time::sleep(duration).await;
                            }
                            Ok(None) => { /* continue */ }
                            Err(e) => {
                                // body is being sent, don't treat downstream as reusable
                                return (false, Some(e));
                            }
                        }

                        if let Some(b) = body {
                            // write to downstream
                            if let Err(e) = session
                                .as_mut()
                                .write_response_body(b, false)
                                .await
                                .map_err(|e| e.into_down())
                            {
                                return (false, Some(e));
                            }
                        } else {
                            break;
                        }
                    }
                    Err(e) => return (false, Some(e)),
                }
            }
        }

        if let Err(e) = session.cache.finish_hit_handler().await {
            warn!("Error during finish_hit_handler: {}", e);
        }

        match session.as_mut().finish_body().await {
            Ok(_) => {
                debug!("finished sending cached body to downstream");
                (true, None)
            }
            Err(e) => (false, Some(e)),
        }
    }

    /* Downstream revalidation, only needed when cache is on because otherwise origin
     * will handle it */
    pub(crate) fn downstream_response_conditional_filter(
        &self,
        use_cache: &mut ServeFromCache,
        session: &Session,
        resp: &mut ResponseHeader,
        ctx: &mut SV::CTX,
    ) where
        SV: ProxyHttp,
    {
        // TODO: range
        let req = session.req_header();

        let not_modified = match self.inner.cache_not_modified_filter(session, resp, ctx) {
            Ok(not_modified) => not_modified,
            Err(e) => {
                // fail open if cache_not_modified_filter errors,
                // just return the whole original response
                warn!(
                    "Failed to run cache not modified filter: {e}, {}",
                    self.inner.request_summary(session, ctx)
                );
                false
            }
        };

        if not_modified {
            to_304(resp);
        }
        let header_only = not_modified || req.method == http::method::Method::HEAD;
        if header_only {
            if use_cache.is_on() {
                // tell cache to stop after yielding header
                use_cache.enable_header_only();
            } else {
                // headers only during cache miss, upstream should continue send
                // body to cache, session will ignore body automatically because
                // of the signature of header (304)
                // TODO: we should drop body before/within this filter so that body
                // filter only runs on data downstream sees
            }
        }
    }

    // TODO: cache upstream header filter to add/remove headers

    pub(crate) async fn cache_http_task(
        &self,
        session: &mut Session,
        task: &HttpTask,
        ctx: &mut SV::CTX,
        serve_from_cache: &mut ServeFromCache,
    ) -> Result<()>
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        if !session.cache.enabled() && !session.cache.bypassing() {
            return Ok(());
        }

        match task {
            HttpTask::Header(header, end_stream) => {
                // decide if cacheable and create cache meta
                // for now, skip 1xxs (should not affect response cache decisions)
                // However 101 is an exception because it is the final response header
                if header.status.is_informational()
                    && header.status != StatusCode::SWITCHING_PROTOCOLS
                {
                    return Ok(());
                }
                match self.inner.response_cache_filter(session, header, ctx)? {
                    Cacheable(meta) => {
                        let mut fill_cache = true;
                        if session.cache.bypassing() {
                            // The cache might have been bypassed because the response exceeded the
                            // maximum cacheable asset size. If that looks like the case (there
                            // is a maximum file size configured and we don't know the content
                            // length up front), attempting to re-enable the cache now would cause
                            // the request to fail when the chunked response exceeds the maximum
                            // file size again.
                            if session.cache.max_file_size_bytes().is_some()
                                && !meta.headers().contains_key(header::CONTENT_LENGTH)
                            {
                                session.cache.disable(NoCacheReason::ResponseTooLarge);
                                return Ok(());
                            }

                            session.cache.response_became_cacheable();

                            if session.req_header().method == Method::GET
                                && meta.response_header().status == StatusCode::OK
                            {
                                self.inner.cache_miss(session, ctx);
                            } else {
                                // we've allowed caching on the next request,
                                // but do not cache _this_ request if bypassed and not 200
                                // (We didn't run upstream request cache filters to strip range or condition headers,
                                // so this could be an uncacheable response e.g. 206 or 304 or HEAD.
                                // Exclude all non-200/GET for simplicity, may expand allowable codes in the future.)
                                fill_cache = false;
                                session.cache.disable(NoCacheReason::Deferred);
                            }
                        }

                        // If the Content-Length is known, and a maximum asset size has been configured
                        // on the cache, validate that the response does not exceed the maximum asset size.
                        if session.cache.enabled() {
                            if let Some(max_file_size) = session.cache.max_file_size_bytes() {
                                let content_length_hdr = meta.headers().get(header::CONTENT_LENGTH);
                                if let Some(content_length) =
                                    header_value_content_length(content_length_hdr)
                                {
                                    if content_length > max_file_size {
                                        fill_cache = false;
                                        session.cache.response_became_uncacheable(
                                            NoCacheReason::ResponseTooLarge,
                                        );
                                        session.cache.disable(NoCacheReason::ResponseTooLarge);
                                    }
                                }
                                // if the content-length header is not specified, the miss handler
                                // will count the response size on the fly, aborting the request
                                // mid-transfer if the max file size is exceeded
                            }
                        }
                        if fill_cache {
                            let req_header = session.req_header();
                            // Update the variance in the meta via the same callback,
                            // cache_vary_filter(), used in cache lookup for consistency.
                            // Future cache lookups need a matching variance in the meta
                            // with the cache key to pick up the correct variance
                            let variance = self.inner.cache_vary_filter(&meta, ctx, req_header);
                            session.cache.set_cache_meta(meta);
                            session.cache.update_variance(variance);
                            // this sends the meta and header
                            session.cache.set_miss_handler().await?;
                            if session.cache.miss_body_reader().is_some() {
                                serve_from_cache.enable_miss();
                            }
                            if *end_stream {
                                session
                                    .cache
                                    .miss_handler()
                                    .unwrap() // safe, it is set above
                                    .write_body(Bytes::new(), true)
                                    .await?;
                                session.cache.finish_miss_handler().await?;
                            }
                        }
                    }
                    Uncacheable(reason) => {
                        if !session.cache.bypassing() {
                            // mark as uncacheable, so we bypass cache next time
                            session.cache.response_became_uncacheable(reason);
                        }
                        session.cache.disable(reason);
                    }
                }
            }
            HttpTask::Body(data, end_stream) => match data {
                Some(d) => {
                    if session.cache.enabled() {
                        // this will panic if more data is sent after we see end_stream
                        // but should be impossible in real world
                        let miss_handler = session.cache.miss_handler().unwrap();
                        // TODO: do this async
                        let res = miss_handler.write_body(d.clone(), *end_stream).await;
                        if let Err(err) = res {
                            if err.etype == ERR_RESPONSE_TOO_LARGE {
                                debug!("chunked response exceeded max cache size, remembering that it is uncacheable");
                                session
                                    .cache
                                    .response_became_uncacheable(NoCacheReason::ResponseTooLarge);
                            }

                            return Err(err);
                        }
                        if *end_stream {
                            session.cache.finish_miss_handler().await?;
                        }
                    }
                }
                None => {
                    if session.cache.enabled() && *end_stream {
                        session.cache.finish_miss_handler().await?;
                    }
                }
            },
            HttpTask::Trailer(_) => {} // h1 trailer is not supported yet
            HttpTask::Done => {
                if session.cache.enabled() {
                    session.cache.finish_miss_handler().await?;
                }
            }
            HttpTask::Failed(_) => {
                // TODO: handle this failure: delete the temp files?
            }
        }
        Ok(())
    }

    // Decide if local cache can be used according to upstream http header
    // 1. when upstream returns 304, the local cache is refreshed and served fresh
    // 2. when upstream returns certain HTTP error status, the local cache is served stale
    // Return true if local cache should be used, false otherwise
    pub(crate) async fn revalidate_or_stale(
        &self,
        session: &mut Session,
        task: &mut HttpTask,
        ctx: &mut SV::CTX,
    ) -> bool
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        if !session.cache.enabled() {
            return false;
        }

        match task {
            HttpTask::Header(resp, _eos) => {
                if resp.status == StatusCode::NOT_MODIFIED {
                    if session.cache.maybe_cache_meta().is_some() {
                        // run upstream response filters on upstream 304 first
                        self.inner.upstream_response_filter(session, resp, ctx);
                        // 304 doesn't contain all the headers, merge 304 into cached 200 header
                        // in order for response_cache_filter to run correctly
                        let merged_header = session.cache.revalidate_merge_header(resp);
                        match self
                            .inner
                            .response_cache_filter(session, &merged_header, ctx)
                        {
                            Ok(Cacheable(mut meta)) => {
                                // For simplicity, ignore changes to variance over 304 for now.
                                // Note this means upstream can only update variance via 2xx
                                // (expired response).
                                //
                                // TODO: if we choose to respect changing Vary / variance over 304,
                                // then there are a few cases to consider. See update_variance in
                                // the pingora-cache module.
                                let old_meta = session.cache.maybe_cache_meta().unwrap(); // safe, checked above
                                if let Some(old_variance) = old_meta.variance() {
                                    meta.set_variance(old_variance);
                                }
                                if let Err(e) = session.cache.revalidate_cache_meta(meta).await {
                                    // Fail open: we can continue use the revalidated response even
                                    // if the meta failed to write to storage
                                    warn!("revalidate_cache_meta failed {e:?}");
                                }
                            }
                            Ok(Uncacheable(reason)) => {
                                // This response was once cacheable, and upstream tells us it has not changed
                                // but now we decided it is uncacheable!
                                // RFC 9111: still allowed to reuse stored response this time because
                                // it was "successfully validated"
                                // https://www.rfc-editor.org/rfc/rfc9111#constructing.responses.from.caches
                                // Serve the response, but do not update cache

                                // We also want to avoid poisoning downstream's cache with an unsolicited 304
                                // if we did not receive a conditional request from downstream
                                // (downstream may have a different cacheability assessment and could cache the 304)

                                //TODO: log more
                                warn!("Uncacheable {reason:?} 304 received");
                                session.cache.response_became_uncacheable(reason);
                                session.cache.revalidate_uncacheable(merged_header, reason);
                            }
                            Err(e) => {
                                // Error during revalidation, similarly to the reasons above
                                // (avoid poisoning downstream cache with passthrough 304),
                                // allow serving the stored response without updating cache
                                warn!("Error {e:?} response_cache_filter during revalidation");
                                session.cache.revalidate_uncacheable(
                                    merged_header,
                                    NoCacheReason::InternalError,
                                );
                                // Assume the next 304 may succeed, so don't mark uncacheable
                            }
                        }
                        // always serve from cache after receiving the 304
                        true
                    } else {
                        //TODO: log more
                        warn!("304 received without cached asset, disable caching");
                        let reason = NoCacheReason::Custom("304 on miss");
                        session.cache.response_became_uncacheable(reason);
                        session.cache.disable(reason);
                        false
                    }
                } else if resp.status.is_server_error() {
                    // stale if error logic, 5xx only for now

                    // this is response header filter, response_written should always be None?
                    if !session.cache.can_serve_stale_error()
                        || session.response_written().is_some()
                    {
                        return false;
                    }

                    // create an error to encode the http status code
                    let http_status_error = Error::create(
                        ErrorType::HTTPStatus(resp.status.as_u16()),
                        ErrorSource::Upstream,
                        None,
                        None,
                    );
                    if self
                        .inner
                        .should_serve_stale(session, ctx, Some(&http_status_error))
                    {
                        // no more need to keep the write lock
                        session
                            .cache
                            .release_write_lock(NoCacheReason::UpstreamError);
                        true
                    } else {
                        false
                    }
                } else {
                    false // not 304, not stale if error status code
                }
            }
            _ => false, // not header
        }
    }

    // None: no staled asset is used, Some(_): staled asset is sent to downstream
    // bool: can the downstream connection be reused
    pub(crate) async fn handle_stale_if_error(
        &self,
        session: &mut Session,
        ctx: &mut SV::CTX,
        error: &Error,
    ) -> Option<(bool, Option<Box<Error>>)>
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        // the caller might already checked this as an optimization
        if !session.cache.can_serve_stale_error() {
            return None;
        }

        // the error happen halfway through a regular response to downstream
        // can't resend the response
        if session.response_written().is_some() {
            return None;
        }

        // check error types
        if !self.inner.should_serve_stale(session, ctx, Some(error)) {
            return None;
        }

        // log the original error
        warn!(
            "Fail to proxy: {}, serving stale, {}",
            error,
            self.inner.request_summary(session, ctx)
        );

        // no more need to hang onto the cache lock
        session
            .cache
            .release_write_lock(NoCacheReason::UpstreamError);

        Some(self.proxy_cache_hit(session, ctx).await)
    }

    // helper function to check when to continue to retry lock (true) or give up (false)
    fn handle_lock_status(
        &self,
        session: &mut Session,
        ctx: &SV::CTX,
        lock_status: LockStatus,
    ) -> bool
    where
        SV: ProxyHttp,
    {
        debug!("cache unlocked {lock_status:?}");
        match lock_status {
            // should lookup the cached asset again
            LockStatus::Done => true,
            // should compete to be a new writer
            LockStatus::TransientError => true,
            // the request is uncacheable, go ahead to fetch from the origin
            LockStatus::GiveUp => {
                // TODO: It will be nice for the writer to propagate the real reason
                session.cache.disable(NoCacheReason::CacheLockGiveUp);
                // not cacheable, just go to the origin.
                false
            }
            // treat this the same as TransientError
            LockStatus::Dangling => {
                // software bug, but request can recover from this
                warn!(
                    "Dangling cache lock, {}",
                    self.inner.request_summary(session, ctx)
                );
                true
            }
            /* We have 3 options when a lock is held too long
             * 1. release the lock and let every request complete for it again
             * 2. let every request cache miss
             * 3. let every request through while disabling cache
             * #1 could repeat the situation but protect the origin from load
             * #2 could amplify disk writes and storage for temp file
             * #3 is the simplest option for now */
            LockStatus::Timeout => {
                warn!(
                    "Cache lock timeout, {}",
                    self.inner.request_summary(session, ctx)
                );
                session.cache.disable(NoCacheReason::CacheLockTimeout);
                // not cacheable, just go to the origin.
                false
            }
            // software bug, this status should be impossible to reach
            LockStatus::Waiting => panic!("impossible LockStatus::Waiting"),
        }
    }
}

fn cache_hit_header(cache: &HttpCache) -> Box<ResponseHeader> {
    let mut header = Box::new(cache.cache_meta().response_header_copy());
    // convert cache response

    // these status codes / method cannot have body, so no need to add chunked encoding
    let no_body = matches!(header.status.as_u16(), 204 | 304);

    // https://www.rfc-editor.org/rfc/rfc9111#section-4:
    // When a stored response is used to satisfy a request without validation, a cache
    // MUST generate an Age header field
    if !cache.upstream_used() {
        let age = cache.cache_meta().age().as_secs();
        header.insert_header(http::header::AGE, age).unwrap();
    }

    /* Add chunked header to tell downstream to use chunked encoding
     * during the absent of content-length in h2 */
    if !no_body
        && !header.status.is_informational()
        && header.headers.get(http::header::CONTENT_LENGTH).is_none()
    {
        header
            .insert_header(http::header::TRANSFER_ENCODING, "chunked")
            .unwrap();
    }
    header
}

// https://datatracker.ietf.org/doc/html/rfc7233#section-3
pub mod range_filter {
    use super::*;
    use http::header::*;
    use std::ops::Range;

    // parse bytes into usize, ignores specific error
    fn parse_number(input: &[u8]) -> Option<usize> {
        str::from_utf8(input).ok()?.parse().ok()
    }

    fn parse_range_header(range: &[u8], content_length: usize) -> RangeType {
        use regex::Regex;

        // single byte range only for now
        // https://datatracker.ietf.org/doc/html/rfc7233#section-2.1
        // https://datatracker.ietf.org/doc/html/rfc7233#appendix-C: case-insensitive
        static RE_SINGLE_RANGE: Lazy<Regex> =
            Lazy::new(|| Regex::new(r"(?i)bytes=(?P<start>\d*)-(?P<end>\d*)").unwrap());

        // ignore invalid range header
        let Ok(range_str) = str::from_utf8(range) else {
            return RangeType::None;
        };

        let Some(captured) = RE_SINGLE_RANGE.captures(range_str) else {
            return RangeType::None;
        };
        let maybe_start = captured
            .name("start")
            .and_then(|s| s.as_str().parse::<usize>().ok());
        let end = captured
            .name("end")
            .and_then(|s| s.as_str().parse::<usize>().ok());

        if let Some(start) = maybe_start {
            if start >= content_length {
                RangeType::Invalid
            } else {
                // open-ended range should end at the last byte
                // over sized end is allow but ignored
                // range end is inclusive
                let end = std::cmp::min(end.unwrap_or(content_length - 1), content_length - 1) + 1;
                if end <= start {
                    RangeType::Invalid
                } else {
                    RangeType::new_single(start, end)
                }
            }
        } else {
            // start is empty, this changes the meaning of the value of end
            // Now it means to read the last end bytes
            if let Some(end) = end {
                if content_length >= end {
                    RangeType::new_single(content_length - end, content_length)
                } else {
                    // over sized end is allow but ignored
                    RangeType::new_single(0, content_length)
                }
            } else {
                // both empty/invalid
                RangeType::Invalid
            }
        }
    }
    #[test]
    fn test_parse_range() {
        assert_eq!(
            parse_range_header(b"bytes=0-1", 10),
            RangeType::new_single(0, 2)
        );
        assert_eq!(
            parse_range_header(b"bYTes=0-9", 10),
            RangeType::new_single(0, 10)
        );
        assert_eq!(
            parse_range_header(b"bytes=0-12", 10),
            RangeType::new_single(0, 10)
        );
        assert_eq!(
            parse_range_header(b"bytes=0-", 10),
            RangeType::new_single(0, 10)
        );
        assert_eq!(parse_range_header(b"bytes=2-1", 10), RangeType::Invalid);
        assert_eq!(parse_range_header(b"bytes=10-11", 10), RangeType::Invalid);
        assert_eq!(
            parse_range_header(b"bytes=-2", 10),
            RangeType::new_single(8, 10)
        );
        assert_eq!(
            parse_range_header(b"bytes=-12", 10),
            RangeType::new_single(0, 10)
        );
        assert_eq!(parse_range_header(b"bytes=-", 10), RangeType::Invalid);
        assert_eq!(parse_range_header(b"bytes=", 10), RangeType::None);
    }

    #[derive(Debug, Eq, PartialEq, Clone)]
    pub enum RangeType {
        None,
        Single(Range<usize>),
        // TODO: multi-range
        Invalid,
    }

    impl RangeType {
        fn new_single(start: usize, end: usize) -> Self {
            RangeType::Single(Range { start, end })
        }
    }

    // TODO: if-range

    // single range for now
    pub fn range_header_filter(req: &RequestHeader, resp: &mut ResponseHeader) -> RangeType {
        // The Range header field is evaluated after evaluating the precondition
        // header fields defined in [RFC7232], and only if the result in absence
        // of the Range header field would be a 200 (OK) response
        if resp.status != StatusCode::OK {
            return RangeType::None;
        }

        // "A server MUST ignore a Range header field received with a request method other than GET."
        if req.method != http::Method::GET && req.method != http::Method::HEAD {
            return RangeType::None;
        }

        let Some(range_header) = req.headers.get(RANGE) else {
            return RangeType::None;
        };

        // Content-Length is not required by RFC but it is what nginx does and easier to implement
        // with this header present.
        let Some(content_length_bytes) = resp.headers.get(CONTENT_LENGTH) else {
            return RangeType::None;
        };
        // bail on invalid content length
        let Some(content_length) = parse_number(content_length_bytes.as_bytes()) else {
            return RangeType::None;
        };

        // if-range wants to understand if the Last-Modified / ETag value matches exactly for use
        // with resumable downloads.
        // https://datatracker.ietf.org/doc/html/rfc9110#name-if-range
        // Note that the RFC wants strong validation, and suggests that
        // "A valid entity-tag can be distinguished from a valid HTTP-date
        // by examining the first three characters for a DQUOTE,"
        // but this current etag matching behavior most closely mirrors nginx.
        if let Some(if_range) = req.headers.get(IF_RANGE) {
            let ir = if_range.as_bytes();
            let matches = if ir.len() >= 2 && ir.last() == Some(&b'"') {
                resp.headers.get(ETAG).is_some_and(|etag| etag == if_range)
            } else if let Some(last_modified) = resp.headers.get(LAST_MODIFIED) {
                last_modified == if_range
            } else {
                false
            };
            if !matches {
                return RangeType::None;
            }
        }

        // TODO: we can also check Accept-Range header from resp. Nginx gives uses the option
        // see proxy_force_ranges

        let range_type = parse_range_header(range_header.as_bytes(), content_length);

        match &range_type {
            RangeType::None => { /* nothing to do*/ }
            RangeType::Single(r) => {
                // 206 response
                resp.set_status(StatusCode::PARTIAL_CONTENT).unwrap();
                resp.insert_header(&CONTENT_LENGTH, r.end - r.start)
                    .unwrap();
                resp.insert_header(
                    &CONTENT_RANGE,
                    format!("bytes {}-{}/{content_length}", r.start, r.end - 1), // range end is inclusive
                )
                .unwrap()
            }
            RangeType::Invalid => {
                // 416 response
                resp.set_status(StatusCode::RANGE_NOT_SATISFIABLE).unwrap();
                // empty body for simplicity
                resp.insert_header(&CONTENT_LENGTH, HeaderValue::from_static("0"))
                    .unwrap();
                // TODO: remove other headers like content-encoding
                resp.remove_header(&CONTENT_TYPE);
                resp.insert_header(&CONTENT_RANGE, format!("bytes */{content_length}"))
                    .unwrap()
            }
        }

        range_type
    }

    #[test]
    fn test_range_filter() {
        fn gen_req() -> RequestHeader {
            RequestHeader::build(http::Method::GET, b"/", Some(1)).unwrap()
        }
        fn gen_resp() -> ResponseHeader {
            let mut resp = ResponseHeader::build(200, Some(1)).unwrap();
            resp.append_header("Content-Length", "10").unwrap();
            resp
        }

        // no range
        let req = gen_req();
        let mut resp = gen_resp();
        assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
        assert_eq!(resp.status.as_u16(), 200);

        // regular range
        let mut req = gen_req();
        req.insert_header("Range", "bytes=0-1").unwrap();
        let mut resp = gen_resp();
        assert_eq!(
            RangeType::new_single(0, 2),
            range_header_filter(&req, &mut resp)
        );
        assert_eq!(resp.status.as_u16(), 206);
        assert_eq!(resp.headers.get("content-length").unwrap().as_bytes(), b"2");
        assert_eq!(
            resp.headers.get("content-range").unwrap().as_bytes(),
            b"bytes 0-1/10"
        );

        // bad range
        let mut req = gen_req();
        req.insert_header("Range", "bytes=1-0").unwrap();
        let mut resp = gen_resp();
        assert_eq!(RangeType::Invalid, range_header_filter(&req, &mut resp));
        assert_eq!(resp.status.as_u16(), 416);
        assert_eq!(resp.headers.get("content-length").unwrap().as_bytes(), b"0");
        assert_eq!(
            resp.headers.get("content-range").unwrap().as_bytes(),
            b"bytes */10"
        );
    }

    #[test]
    fn test_if_range() {
        const DATE: &str = "Fri, 07 Jul 2023 22:03:29 GMT";
        const ETAG: &str = "\"1234\"";

        fn gen_req() -> RequestHeader {
            let mut req = RequestHeader::build(http::Method::GET, b"/", Some(1)).unwrap();
            req.append_header("Range", "bytes=0-1").unwrap();
            req
        }
        fn gen_resp() -> ResponseHeader {
            let mut resp = ResponseHeader::build(200, Some(1)).unwrap();
            resp.append_header("Content-Length", "10").unwrap();
            resp.append_header("Last-Modified", DATE).unwrap();
            resp.append_header("ETag", ETAG).unwrap();
            resp
        }

        // matching Last-Modified date
        let mut req = gen_req();
        req.insert_header("If-Range", DATE).unwrap();
        let mut resp = gen_resp();
        assert_eq!(
            RangeType::new_single(0, 2),
            range_header_filter(&req, &mut resp)
        );

        // non-matching date
        let mut req = gen_req();
        req.insert_header("If-Range", "Fri, 07 Jul 2023 22:03:25 GMT")
            .unwrap();
        let mut resp = gen_resp();
        assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));

        // match ETag
        let mut req = gen_req();
        req.insert_header("If-Range", ETAG).unwrap();
        let mut resp = gen_resp();
        assert_eq!(
            RangeType::new_single(0, 2),
            range_header_filter(&req, &mut resp)
        );

        // non-matching ETags do not result in range
        let mut req = gen_req();
        req.insert_header("If-Range", "\"4567\"").unwrap();
        let mut resp = gen_resp();
        assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));

        let mut req = gen_req();
        req.insert_header("If-Range", "1234").unwrap();
        let mut resp = gen_resp();
        assert_eq!(RangeType::None, range_header_filter(&req, &mut resp));
    }

    pub struct RangeBodyFilter {
        range: RangeType,
        current: usize,
    }

    impl RangeBodyFilter {
        pub fn new() -> Self {
            RangeBodyFilter {
                range: RangeType::None,
                current: 0,
            }
        }

        pub fn set(&mut self, range: RangeType) {
            self.range = range;
        }

        pub fn filter_body(&mut self, data: Option<Bytes>) -> Option<Bytes> {
            match &self.range {
                RangeType::None => data,
                RangeType::Invalid => None,
                RangeType::Single(r) => {
                    let current = self.current;
                    self.current += data.as_ref().map_or(0, |d| d.len());
                    data.and_then(|d| Self::filter_range_data(r.start, r.end, current, d))
                }
            }
        }

        fn filter_range_data(
            start: usize,
            end: usize,
            current: usize,
            data: Bytes,
        ) -> Option<Bytes> {
            if current + data.len() < start || current >= end {
                // if the current data is out side the desired range, just drop the data
                None
            } else if current >= start && current + data.len() <= end {
                // all data is within the slice
                Some(data)
            } else {
                // data:  current........current+data.len()
                // range: start...........end
                let slice_start = start.saturating_sub(current);
                let slice_end = std::cmp::min(data.len(), end - current);
                Some(data.slice(slice_start..slice_end))
            }
        }
    }

    #[test]
    fn test_range_body_filter() {
        let mut body_filter = RangeBodyFilter::new();
        assert_eq!(body_filter.filter_body(Some("123".into())).unwrap(), "123");

        let mut body_filter = RangeBodyFilter::new();
        body_filter.set(RangeType::Invalid);
        assert!(body_filter.filter_body(Some("123".into())).is_none());

        let mut body_filter = RangeBodyFilter::new();
        body_filter.set(RangeType::new_single(0, 1));
        assert_eq!(body_filter.filter_body(Some("012".into())).unwrap(), "0");
        assert!(body_filter.filter_body(Some("345".into())).is_none());

        let mut body_filter = RangeBodyFilter::new();
        body_filter.set(RangeType::new_single(4, 6));
        assert!(body_filter.filter_body(Some("012".into())).is_none());
        assert_eq!(body_filter.filter_body(Some("345".into())).unwrap(), "45");
        assert!(body_filter.filter_body(Some("678".into())).is_none());

        let mut body_filter = RangeBodyFilter::new();
        body_filter.set(RangeType::new_single(1, 7));
        assert_eq!(body_filter.filter_body(Some("012".into())).unwrap(), "12");
        assert_eq!(body_filter.filter_body(Some("345".into())).unwrap(), "345");
        assert_eq!(body_filter.filter_body(Some("678".into())).unwrap(), "6");
    }
}

// a state machine for proxy logic to tell when to use cache in the case of
// miss/revalidation/error.
#[derive(Debug)]
pub(crate) enum ServeFromCache {
    Off,             // not using cache
    CacheHeader,     // should serve cache header
    CacheHeaderOnly, // should serve cache header
    CacheBody,       // should serve cache body
    CacheHeaderMiss, // should serve cache header but upstream response should be admitted to cache
    CacheBodyMiss,   // should serve cache body but upstream response should be admitted to cache
    Done,            // should serve cache body
}

impl ServeFromCache {
    pub fn new() -> Self {
        Self::Off
    }

    pub fn is_on(&self) -> bool {
        !matches!(self, Self::Off)
    }

    pub fn is_miss(&self) -> bool {
        matches!(self, Self::CacheHeaderMiss | Self::CacheBodyMiss)
    }

    pub fn is_miss_header(&self) -> bool {
        matches!(self, Self::CacheHeaderMiss)
    }

    pub fn is_miss_body(&self) -> bool {
        matches!(self, Self::CacheBodyMiss)
    }

    pub fn should_discard_upstream(&self) -> bool {
        self.is_on() && !self.is_miss()
    }

    pub fn should_send_to_downstream(&self) -> bool {
        !self.is_on()
    }

    pub fn enable(&mut self) {
        *self = Self::CacheHeader;
    }

    pub fn enable_miss(&mut self) {
        if !self.is_on() {
            *self = Self::CacheHeaderMiss;
        }
    }

    pub fn enable_header_only(&mut self) {
        match self {
            Self::CacheBody => *self = Self::Done, // TODO: make sure no body is read yet
            _ => *self = Self::CacheHeaderOnly,
        }
    }

    // This function is (best effort) cancel-safe to be used in select
    pub async fn next_http_task(&mut self, cache: &mut HttpCache) -> Result<HttpTask> {
        if !cache.enabled() {
            // Cache is disabled due to internal error
            // TODO: if nothing is sent to eyeball yet, figure out a way to recovery by
            // fetching from upstream
            return Error::e_explain(InternalError, "Cache disabled");
        }
        match self {
            Self::Off => panic!("ProxyUseCache not enabled"),
            Self::CacheHeader => {
                *self = Self::CacheBody;
                Ok(HttpTask::Header(cache_hit_header(cache), false)) // false for now
            }
            Self::CacheHeaderMiss => {
                *self = Self::CacheBodyMiss;
                Ok(HttpTask::Header(cache_hit_header(cache), false)) // false for now
            }
            Self::CacheHeaderOnly => {
                *self = Self::Done;
                Ok(HttpTask::Header(cache_hit_header(cache), true))
            }
            Self::CacheBody => {
                if let Some(b) = cache.hit_handler().read_body().await? {
                    Ok(HttpTask::Body(Some(b), false)) // false for now
                } else {
                    *self = Self::Done;
                    Ok(HttpTask::Done)
                }
            }
            Self::CacheBodyMiss => {
                // safety: called of enable_miss() call it only if the async_body_reader exist
                if let Some(b) = cache.miss_body_reader().unwrap().read_body().await? {
                    Ok(HttpTask::Body(Some(b), false)) // false for now
                } else {
                    *self = Self::Done;
                    Ok(HttpTask::Done)
                }
            }
            Self::Done => Ok(HttpTask::Done),
        }
    }
}

proxy_h1.rs

// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::*;
use crate::proxy_cache::{range_filter::RangeBodyFilter, ServeFromCache};
use crate::proxy_common::*;

impl<SV> HttpProxy<SV> {
    pub(crate) async fn proxy_1to1(
        &self,
        session: &mut Session,
        client_session: &mut HttpSessionV1,
        peer: &HttpPeer,
        ctx: &mut SV::CTX,
    ) -> (bool, bool, Option<Box<Error>>)
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        client_session.read_timeout = peer.options.read_timeout;
        client_session.write_timeout = peer.options.write_timeout;

        // phase 2 send to upstream

        let mut req = session.req_header().clone();

        // Convert HTTP2 headers to H1
        if req.version == Version::HTTP_2 {
            req.set_version(Version::HTTP_11);
            // if client has body but has no content length, add chunked encoding
            // https://datatracker.ietf.org/doc/html/rfc9112#name-message-body
            // "The presence of a message body in a request is signaled by a Content-Length or Transfer-Encoding header field."
            if !session.is_body_empty() && session.get_header(header::CONTENT_LENGTH).is_none() {
                req.insert_header(header::TRANSFER_ENCODING, "chunked")
                    .unwrap();
            }
            if session.get_header(header::HOST).is_none() {
                // H2 is required to set :authority, but no necessarily header
                // most H1 server expect host header, so convert
                let host = req.uri.authority().map_or("", |a| a.as_str()).to_owned();
                req.insert_header(header::HOST, host).unwrap();
            }
            // TODO: Add keepalive header for connection reuse, but this is not required per RFC
        }

        if session.cache.enabled() {
            if let Err(e) = pingora_cache::filters::upstream::request_filter(
                &mut req,
                session.cache.maybe_cache_meta(),
            ) {
                session.cache.disable(NoCacheReason::InternalError);
                warn!("cache upstream filter error {}, disabling cache", e);
            }
        }

        match self
            .inner
            .upstream_request_filter(session, &mut req, ctx)
            .await
        {
            Ok(_) => { /* continue */ }
            Err(e) => {
                return (false, true, Some(e));
            }
        }

        session.upstream_compression.request_filter(&req);

        debug!("Sending header to upstream {:?}", req);

        match client_session.write_request_header(Box::new(req)).await {
            Ok(_) => { /* Continue */ }
            Err(e) => {
                return (false, false, Some(e.into_up()));
            }
        }

        let (tx_upstream, rx_upstream) = mpsc::channel::<HttpTask>(TASK_BUFFER_SIZE);
        let (tx_downstream, rx_downstream) = mpsc::channel::<HttpTask>(TASK_BUFFER_SIZE);

        session.as_mut().enable_retry_buffering();

        // start bi-directional streaming
        let ret = tokio::try_join!(
            self.proxy_handle_downstream(session, tx_downstream, rx_upstream, ctx),
            self.proxy_handle_upstream(client_session, tx_upstream, rx_downstream),
        );

        match ret {
            Ok((downstream_can_reuse, _upstream)) => (downstream_can_reuse, true, None),
            Err(e) => (false, false, Some(e)),
        }
    }

    pub(crate) async fn proxy_to_h1_upstream(
        &self,
        session: &mut Session,
        client_session: &mut HttpSessionV1,
        reused: bool,
        peer: &HttpPeer,
        ctx: &mut SV::CTX,
    ) -> (bool, bool, Option<Box<Error>>)
    // (reuse_server, reuse_client, error)
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        #[cfg(windows)]
        let raw = client_session.id() as std::os::windows::io::RawSocket;
        #[cfg(unix)]
        let raw = client_session.id();

        if let Err(e) = self
            .inner
            .connected_to_upstream(
                session,
                reused,
                peer,
                raw,
                Some(client_session.digest()),
                ctx,
            )
            .await
        {
            return (false, false, Some(e));
        }

        let (server_session_reuse, client_session_reuse, error) =
            self.proxy_1to1(session, client_session, peer, ctx).await;

        (server_session_reuse, client_session_reuse, error)
    }

    async fn proxy_handle_upstream(
        &self,
        client_session: &mut HttpSessionV1,
        tx: mpsc::Sender<HttpTask>,
        mut rx: mpsc::Receiver<HttpTask>,
    ) -> Result<()>
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        let mut request_done = false;
        let mut response_done = false;

        /* duplex mode, wait for either to complete */
        while !request_done || !response_done {
            tokio::select! {
                res = client_session.read_response_task(), if !response_done => {
                    match res {
                        Ok(task) => {
                            response_done = task.is_end();
                            let result = tx.send(task)
                                .await.or_err(
                                        InternalError,
                                        "Failed to send upstream header to pipe");
                            // If the request is upgraded, the downstream pipe can early exit
                            // when the downstream connection is closed.
                            // In that case, this function should ignore that the pipe is closed.
                            // So that this function could read the rest events from rx including
                            // the closure, then exit.
                            if result.is_err() && !client_session.is_upgrade_req() {
                                return result;
                            }
                        },
                        Err(e) => {
                            // Push the error to downstream and then quit
                            // Don't care if send fails: downstream already gone
                            let _ = tx.send(HttpTask::Failed(e.into_up())).await;
                            // Downstream should consume all remaining data and handle the error
                            return Ok(())
                        }
                    }
                },

                body = rx.recv(), if !request_done => {
                    request_done = send_body_to1(client_session, body).await?;
                    // An upgraded request is terminated when either side is done
                    if request_done && client_session.is_upgrade_req() {
                        response_done = true;
                    }
                },

                else => {
                    // this shouldn't be reached as the while loop would already exit
                    break;
                }
            }
        }

        Ok(())
    }

    // todo use this function to replace bidirection_1to2()
    // returns whether this server (downstream) session can be reused
    async fn proxy_handle_downstream(
        &self,
        session: &mut Session,
        tx: mpsc::Sender<HttpTask>,
        mut rx: mpsc::Receiver<HttpTask>,
        ctx: &mut SV::CTX,
    ) -> Result<bool>
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        let mut downstream_state = DownstreamStateMachine::new(session.as_mut().is_body_done());

        let buffer = session.as_ref().get_retry_buffer();

        // retry, send buffer if it exists or body empty
        if buffer.is_some() || session.as_mut().is_body_empty() {
            let send_permit = tx
                .reserve()
                .await
                .or_err(InternalError, "reserving body pipe")?;
            self.send_body_to_pipe(
                session,
                buffer,
                downstream_state.is_done(),
                send_permit,
                ctx,
            )
            .await?;
        }

        let mut response_state = ResponseStateMachine::new();

        // these two below can be wrapped into an internal ctx
        // use cache when upstream revalidates (or TODO: error)
        let mut serve_from_cache = proxy_cache::ServeFromCache::new();
        let mut range_body_filter = proxy_cache::range_filter::RangeBodyFilter::new();

        /* duplex mode without caching
         * Read body from downstream while reading response from upstream
         * If response is done, only read body from downstream
         * If request is done, read response from upstream while idling downstream (to close quickly)
         * If both are done, quit the loop
         *
         * With caching + but without partial read support
         * Similar to above, cache admission write happen when the data is write to downstream
         *
         * With caching + partial read support
         * A. Read upstream response and write to cache
         * B. Read data from cache and send to downstream
         * If B fails (usually downstream close), continue A.
         * If A fails, exit with error.
         * If both are done, quit the loop
         * Usually there is no request body to read for cacheable request
         */
        while !downstream_state.is_done() || !response_state.is_done() {
            // reserve tx capacity ahead to avoid deadlock, see below

            let send_permit = tx
                .try_reserve()
                .or_err(InternalError, "try_reserve() body pipe for upstream");

            tokio::select! {
                // only try to send to pipe if there is capacity to avoid deadlock
                // Otherwise deadlock could happen if both upstream and downstream are blocked
                // on sending to their corresponding pipes which are both full.
                body = session.downstream_session.read_body_or_idle(downstream_state.is_done()),
                    if downstream_state.can_poll() && send_permit.is_ok() => {

                    debug!("downstream event");
                    let body = match body {
                        Ok(b) => b,
                        Err(e) => {
                            if serve_from_cache.is_miss() {
                                // ignore downstream error so that upstream can continue to write cache
                                downstream_state.to_errored();
                                warn!(
                                    "Downstream Error ignored during caching: {}, {}",
                                    e,
                                    self.inner.request_summary(session, ctx)
                                );
                                continue;
                           } else {
                                return Err(e.into_down());
                           }
                        }
                    };
                    // If the request is websocket, None body means the request is closed.
                    // Set the response to be done as well so that the request completes normally.
                    if body.is_none() && session.is_upgrade_req() {
                        response_state.maybe_set_upstream_done(true);
                    }
                    // TODO: consider just drain this if serve_from_cache is set
                    let is_body_done = session.is_body_done();
                    let request_done = self.send_body_to_pipe(
                        session,
                        body,
                        is_body_done,
                        send_permit.unwrap(), // safe because we checked is_ok()
                        ctx,
                    )
                    .await?;
                    downstream_state.maybe_finished(request_done);
                },

                _ = tx.reserve(), if downstream_state.is_reading() && send_permit.is_err() => {
                    // If tx is closed, the upstream has already finished its job.
                    downstream_state.maybe_finished(tx.is_closed());
                    debug!("waiting for permit {send_permit:?}, upstream closed {}", tx.is_closed());
                    /* No permit, wait on more capacity to avoid starving.
                     * Otherwise this select only blocks on rx, which might send no data
                     * before the entire body is uploaded.
                     * once more capacity arrives we just loop back
                     */
                },

                task = rx.recv(), if !response_state.upstream_done() => {
                    debug!("upstream event: {:?}", task);
                    if let Some(t) = task {
                        if serve_from_cache.should_discard_upstream() {
                            // just drain, do we need to do anything else?
                           continue;
                        }
                        // pull as many tasks as we can
                        let mut tasks = Vec::with_capacity(TASK_BUFFER_SIZE);
                        tasks.push(t);
                        while let Some(maybe_task) = rx.recv().now_or_never() {
                            debug!("upstream event now: {:?}", maybe_task);
                            if let Some(t) = maybe_task {
                                tasks.push(t);
                            } else {
                                break; // upstream closed
                            }
                        }

                        /* run filters before sending to downstream */
                        let mut filtered_tasks = Vec::with_capacity(TASK_BUFFER_SIZE);
                        for mut t in tasks {
                            if self.revalidate_or_stale(session, &mut t, ctx).await {
                                serve_from_cache.enable();
                                response_state.enable_cached_response();
                                // skip downstream filtering entirely as the 304 will not be sent
                                break;
                            }
                            session.upstream_compression.response_filter(&mut t);
                            let task = self.h1_response_filter(session, t, ctx,
                                &mut serve_from_cache,
                                &mut range_body_filter, false).await?;
                            if serve_from_cache.is_miss_header() {
                                response_state.enable_cached_response();
                            }
                            // check error and abort
                            // otherwise the error is surfaced via write_response_tasks()
                            if !serve_from_cache.should_send_to_downstream() {
                                if let HttpTask::Failed(e) = task {
                                    return Err(e);
                                }
                            }
                            filtered_tasks.push(task);
                        }

                        if !serve_from_cache.should_send_to_downstream() {
                            // TODO: need to derive response_done from filtered_tasks in case downstream failed already
                            continue;
                        }

                        // set to downstream
                        let response_done = session.write_response_tasks(filtered_tasks).await?;
                        response_state.maybe_set_upstream_done(response_done);
                        // unsuccessful upgrade response may force the request done
                        downstream_state.maybe_finished(session.is_body_done());
                    } else {
                        debug!("empty upstream event");
                        response_state.maybe_set_upstream_done(true);
                    }
                },

                task = serve_from_cache.next_http_task(&mut session.cache),
                    if !response_state.cached_done() && !downstream_state.is_errored() && serve_from_cache.is_on() => {

                    let task = self.h1_response_filter(session, task?, ctx,
                        &mut serve_from_cache,
                        &mut range_body_filter, true).await?;
                    debug!("serve_from_cache task {task:?}");

                    match session.write_response_tasks(vec![task]).await {
                        Ok(b) => response_state.maybe_set_cache_done(b),
                        Err(e) => if serve_from_cache.is_miss() {
                            // give up writing to downstream but wait for upstream cache write to finish
                            downstream_state.to_errored();
                            response_state.maybe_set_cache_done(true);
                            warn!(
                                "Downstream Error ignored during caching: {}, {}",
                                e,
                                self.inner.request_summary(session, ctx)
                            );
                            continue;
                        } else {
                            return Err(e);
                        }
                    }
                    if response_state.cached_done() {
                        if let Err(e) = session.cache.finish_hit_handler().await {
                            warn!("Error during finish_hit_handler: {}", e);
                        }
                    }
                }

                else => {
                    break;
                }
            }
        }

        let mut reuse_downstream = !downstream_state.is_errored();
        if reuse_downstream {
            match session.as_mut().finish_body().await {
                Ok(_) => {
                    debug!("finished sending body to downstream");
                }
                Err(e) => {
                    error!("Error finish sending body to downstream: {}", e);
                    reuse_downstream = false;
                }
            }
        }
        Ok(reuse_downstream)
    }

    async fn h1_response_filter(
        &self,
        session: &mut Session,
        mut task: HttpTask,
        ctx: &mut SV::CTX,
        serve_from_cache: &mut ServeFromCache,
        range_body_filter: &mut RangeBodyFilter,
        from_cache: bool, // are the task from cache already
    ) -> Result<HttpTask>
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        // skip caching if already served from cache
        if !from_cache {
            self.upstream_filter(session, &mut task, ctx)?;

            // cache the original response before any downstream transformation
            // requests that bypassed cache still need to run filters to see if the response has become cacheable
            if session.cache.enabled() || session.cache.bypassing() {
                if let Err(e) = self
                    .cache_http_task(session, &task, ctx, serve_from_cache)
                    .await
                {
                    session.cache.disable(NoCacheReason::StorageError);
                    if serve_from_cache.is_miss_body() {
                        // if the response stream cache body during miss but write fails, it has to
                        // give up the entire request
                        return Err(e);
                    } else {
                        // otherwise, continue processing the response
                        warn!(
                            "Fail to cache response: {}, {}",
                            e,
                            self.inner.request_summary(session, ctx)
                        );
                    }
                }
            }

            if !serve_from_cache.should_send_to_downstream() {
                return Ok(task);
            }
        } // else: cached/local response, no need to trigger upstream filters and caching

        match task {
            HttpTask::Header(mut header, end) => {
                /* Downstream revalidation/range, only needed when cache is on because otherwise origin
                 * will handle it */
                // TODO: if cache is disabled during response phase, we should still do the filter
                if session.cache.enabled() {
                    self.downstream_response_conditional_filter(
                        serve_from_cache,
                        session,
                        &mut header,
                        ctx,
                    );
                    if !session.ignore_downstream_range {
                        let range_type =
                            self.inner
                                .range_header_filter(session.req_header(), &mut header, ctx);
                        range_body_filter.set(range_type);
                    }
                }

                /* Convert HTTP 1.0 style response to chunked encoding so that we don't
                 * have to close the downstream connection */
                // these status codes / method cannot have body, so no need to add chunked encoding
                let no_body = session.req_header().method == http::method::Method::HEAD
                    || matches!(header.status.as_u16(), 204 | 304);
                if !no_body
                    && !header.status.is_informational()
                    && header
                        .headers
                        .get(http::header::TRANSFER_ENCODING)
                        .is_none()
                    && header.headers.get(http::header::CONTENT_LENGTH).is_none()
                    && !end
                {
                    header.insert_header(http::header::TRANSFER_ENCODING, "chunked")?;
                }

                match self.inner.response_filter(session, &mut header, ctx).await {
                    Ok(_) => Ok(HttpTask::Header(header, end)),
                    Err(e) => Err(e),
                }
            }
            HttpTask::Body(data, end) => {
                let mut data = range_body_filter.filter_body(data);
                if let Some(duration) = self
                    .inner
                    .response_body_filter(session, &mut data, end, ctx)?
                {
                    trace!("delaying response for {:?}", duration);
                    time::sleep(duration).await;
                }
                Ok(HttpTask::Body(data, end))
            }
            HttpTask::Trailer(h) => Ok(HttpTask::Trailer(h)), // TODO: support trailers for h1
            HttpTask::Done => Ok(task),
            HttpTask::Failed(_) => Ok(task), // Do nothing just pass the error down
        }
    }

    // TODO:: use this function to replace send_body_to2
    async fn send_body_to_pipe(
        &self,
        session: &mut Session,
        mut data: Option<Bytes>,
        end_of_body: bool,
        tx: mpsc::Permit<'_, HttpTask>,
        ctx: &mut SV::CTX,
    ) -> Result<bool>
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        // None: end of body
        // this var is to signal if downstream finish sending the body, which shouldn't be
        // affected by the request_body_filter
        let end_of_body = end_of_body || data.is_none();

        session
            .downstream_modules_ctx
            .request_body_filter(&mut data, end_of_body)
            .await?;

        self.inner
            .request_body_filter(session, &mut data, end_of_body, ctx)
            .await?;

        // the flag to signal to upstream
        let upstream_end_of_body = end_of_body || data.is_none();

        /* It is normal to get 0 bytes because of multi-chunk or request_body_filter decides not to
         * output anything yet.
         * Don't write 0 bytes to the network since it will be
         * treated as the terminating chunk */
        if !upstream_end_of_body && data.as_ref().map_or(false, |d| d.is_empty()) {
            return Ok(false);
        }

        debug!(
            "Read {} bytes body from downstream",
            data.as_ref().map_or(-1, |d| d.len() as isize)
        );

        tx.send(HttpTask::Body(data, upstream_end_of_body));

        Ok(end_of_body)
    }
}

pub(crate) async fn send_body_to1(
    client_session: &mut HttpSessionV1,
    recv_task: Option<HttpTask>,
) -> Result<bool> {
    let body_done;

    if let Some(task) = recv_task {
        match task {
            HttpTask::Body(data, end) => {
                body_done = end;
                if let Some(d) = data {
                    let m = client_session.write_body(&d).await;
                    match m {
                        Ok(m) => match m {
                            Some(n) => {
                                debug!("Write {} bytes body to upstream", n);
                            }
                            None => {
                                warn!("Upstream body is already finished. Nothing to write");
                            }
                        },
                        Err(e) => {
                            return e.into_up().into_err();
                        }
                    }
                }
            }
            _ => {
                // should never happen, sender only sends body
                warn!("Unexpected task sent to upstream");
                body_done = true;
            }
        }
    } else {
        // sender dropped
        body_done = true;
    }

    if body_done {
        match client_session.finish_body().await {
            Ok(_) => {
                debug!("finish sending body to upstream");
                Ok(true)
            }
            Err(e) => e.into_up().into_err(),
        }
    } else {
        Ok(false)
    }
}


// proxy_h2.rs

// Copyright 2024 Cloudflare, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::*;
use crate::proxy_cache::{range_filter::RangeBodyFilter, ServeFromCache};
use crate::proxy_common::*;
use http::{header::CONTENT_LENGTH, Method, StatusCode};
use pingora_core::protocols::http::v2::client::{write_body, Http2Session};

// add scheme and authority as required by h2 lib
fn update_h2_scheme_authority(
    header: &mut http::request::Parts,
    raw_host: &[u8],
    tls: bool,
) -> Result<()> {
    let authority = if let Ok(s) = std::str::from_utf8(raw_host) {
        if s.starts_with('[') {
            // don't mess with ipv6 host
            s
        } else if let Some(colon) = s.find(':') {
            if s.len() == colon + 1 {
                // colon is the last char, ignore
                s
            } else if let Some(another_colon) = s[colon + 1..].find(':') {
                // try to get rid of extra port numbers
                &s[..colon + 1 + another_colon]
            } else {
                s
            }
        } else {
            s
        }
    } else {
        return Error::e_explain(
            InvalidHTTPHeader,
            format!("invalid authority from host {:?}", raw_host),
        );
    };

    let scheme = if tls { "https" } else { "http" };
    let uri = http::uri::Builder::new()
        .scheme(scheme)
        .authority(authority)
        .path_and_query(header.uri.path_and_query().as_ref().unwrap().as_str())
        .build();
    match uri {
        Ok(uri) => {
            header.uri = uri;
            Ok(())
        }
        Err(_) => Error::e_explain(
            InvalidHTTPHeader,
            format!("invalid authority from host {}", authority),
        ),
    }
}

impl<SV> HttpProxy<SV> {
    pub(crate) async fn proxy_1to2(
        &self,
        session: &mut Session,
        client_session: &mut Http2Session,
        peer: &HttpPeer,
        ctx: &mut SV::CTX,
    ) -> (bool, Option<Box<Error>>)
    // (reuse_server, error)
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        let mut req = session.req_header().clone();

        if req.version != Version::HTTP_2 {
            /* remove H1 specific headers */
            // https://github.com/hyperium/h2/blob/d3b9f1e36aadc1a7a6804e2f8e86d3fe4a244b4f/src/proto/streams/send.rs#L72
            req.remove_header(&http::header::TRANSFER_ENCODING);
            req.remove_header(&http::header::CONNECTION);
            req.remove_header(&http::header::UPGRADE);
            req.remove_header("keep-alive");
            req.remove_header("proxy-connection");
        }

        /* turn it into h2 */
        req.set_version(Version::HTTP_2);

        if session.cache.enabled() {
            if let Err(e) = pingora_cache::filters::upstream::request_filter(
                &mut req,
                session.cache.maybe_cache_meta(),
            ) {
                session.cache.disable(NoCacheReason::InternalError);
                warn!("cache upstream filter error {}, disabling cache", e);
            }
        }

        match self
            .inner
            .upstream_request_filter(session, &mut req, ctx)
            .await
        {
            Ok(_) => { /* continue */ }
            Err(e) => {
                return (false, Some(e));
            }
        }

        // Remove H1 Host header, save it in order to add to :authority
        // We do this because certain H2 servers expect request not to have a host header.
        // The Host is removed after the upstream filters above for 2 reasons
        // 1. there is no API to change the :authority header
        // 2. the filter code needs to be aware of the host vs :authority across http versions otherwise
        let host = req.remove_header(&http::header::HOST);

        session.upstream_compression.request_filter(&req);
        let body_empty = session.as_mut().is_body_empty();

        // whether we support sending END_STREAM on HEADERS if body is empty
        let send_end_stream = req.send_end_stream().expect("req must be h2");

        let mut req: http::request::Parts = req.into();

        // H2 requires authority to be set, so copy that from H1 host if that is set
        if let Some(host) = host {
            if let Err(e) = update_h2_scheme_authority(&mut req, host.as_bytes(), peer.is_tls()) {
                return (false, Some(e));
            }
        }

        debug!("Request to h2: {req:?}");

        // send END_STREAM on HEADERS
        let send_header_eos = send_end_stream && body_empty;
        debug!("send END_STREAM on HEADERS: {send_end_stream}");

        let req = Box::new(RequestHeader::from(req));
        if let Err(e) = client_session.write_request_header(req, send_header_eos) {
            return (false, Some(e.into_up()));
        }

        if !send_end_stream && body_empty {
            // send END_STREAM on empty DATA frame
            match client_session.write_request_body(Bytes::new(), true) {
                Ok(()) => debug!("sent empty DATA frame to h2"),
                Err(e) => {
                    return (false, Some(e.into_up()));
                }
            }
        }

        client_session.read_timeout = peer.options.read_timeout;

        // take the body writer out of the client for easy duplex
        let mut client_body = client_session
            .take_request_body_writer()
            .expect("already send request header");

        let (tx, rx) = mpsc::channel::<HttpTask>(TASK_BUFFER_SIZE);

        session.as_mut().enable_retry_buffering();

        /* read downstream body and upstream response at the same time */

        let ret = tokio::try_join!(
            self.bidirection_1to2(session, &mut client_body, rx, ctx),
            pipe_2to1_response(client_session, tx)
        );

        match ret {
            Ok((downstream_can_reuse, _upstream)) => (downstream_can_reuse, None),
            Err(e) => (false, Some(e)),
        }
    }

    pub(crate) async fn proxy_to_h2_upstream(
        &self,
        session: &mut Session,
        client_session: &mut Http2Session,
        reused: bool,
        peer: &HttpPeer,
        ctx: &mut SV::CTX,
    ) -> (bool, Option<Box<Error>>)
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        #[cfg(windows)]
        let raw = client_session.fd() as std::os::windows::io::RawSocket;
        #[cfg(unix)]
        let raw = client_session.fd();

        if let Err(e) = self
            .inner
            .connected_to_upstream(session, reused, peer, raw, client_session.digest(), ctx)
            .await
        {
            return (false, Some(e));
        }

        let (server_session_reuse, error) =
            self.proxy_1to2(session, client_session, peer, ctx).await;

        (server_session_reuse, error)
    }

    // returns whether server (downstream) session can be reused
    async fn bidirection_1to2(
        &self,
        session: &mut Session,
        client_body: &mut h2::SendStream<bytes::Bytes>,
        mut rx: mpsc::Receiver<HttpTask>,
        ctx: &mut SV::CTX,
    ) -> Result<bool>
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        let mut downstream_state = DownstreamStateMachine::new(session.as_mut().is_body_done());

        // retry, send buffer if it exists
        if let Some(buffer) = session.as_mut().get_retry_buffer() {
            self.send_body_to2(
                session,
                Some(buffer),
                downstream_state.is_done(),
                client_body,
                ctx,
            )
            .await?;
        }

        let mut response_state = ResponseStateMachine::new();

        // these two below can be wrapped into an internal ctx
        // use cache when upstream revalidates (or TODO: error)
        let mut serve_from_cache = ServeFromCache::new();
        let mut range_body_filter = proxy_cache::range_filter::RangeBodyFilter::new();

        /* duplex mode
         * see the Same function for h1 for more comments
         */
        while !downstream_state.is_done() || !response_state.is_done() {
            // Similar logic in h1 need to reserve capacity first to avoid deadlock
            // But we don't need to do the same because the h2 client_body pipe is unbounded (never block)
            tokio::select! {
                // NOTE: cannot avoid this copy since h2 owns the buf
                body = session.downstream_session.read_body_or_idle(downstream_state.is_done()), if downstream_state.can_poll() => {
                    debug!("downstream event");
                    let body = match body {
                        Ok(b) => b,
                        Err(e) => {
                            if serve_from_cache.is_miss() {
                                // ignore downstream error so that upstream can continue to write cache
                                downstream_state.to_errored();
                                warn!(
                                    "Downstream Error ignored during caching: {}, {}",
                                    e,
                                    self.inner.request_summary(session, ctx)
                                );
                                continue;
                           } else {
                                return Err(e.into_down());
                           }
                        }
                    };
                    let is_body_done = session.is_body_done();
                    let request_done =
                        self.send_body_to2(session, body, is_body_done, client_body, ctx)
                        .await?;
                    downstream_state.maybe_finished(request_done);
                },

                task = rx.recv(), if !response_state.upstream_done() => {
                    if let Some(t) = task {
                        debug!("upstream event: {:?}", t);
                        if serve_from_cache.should_discard_upstream() {
                            // just drain, do we need to do anything else?
                           continue;
                        }
                        // pull as many tasks as we can
                        let mut tasks = Vec::with_capacity(TASK_BUFFER_SIZE);
                        tasks.push(t);
                        while let Some(maybe_task) = rx.recv().now_or_never() {
                            if let Some(t) = maybe_task {
                                tasks.push(t);
                            } else {
                                break
                            }
                        }

                        /* run filters before sending to downstream */
                        let mut filtered_tasks = Vec::with_capacity(TASK_BUFFER_SIZE);
                        for mut t in tasks {
                            if self.revalidate_or_stale(session, &mut t, ctx).await {
                                serve_from_cache.enable();
                                response_state.enable_cached_response();
                                // skip downstream filtering entirely as the 304 will not be sent
                                break;
                            }
                            session.upstream_compression.response_filter(&mut t);
                            // check error and abort
                            // otherwise the error is surfaced via write_response_tasks()
                            if !serve_from_cache.should_send_to_downstream() {
                                if let HttpTask::Failed(e) = t {
                                    return Err(e);
                                }
                            }
                            filtered_tasks.push(
                                self.h2_response_filter(session, t, ctx,
                                    &mut serve_from_cache,
                                    &mut range_body_filter, false).await?);
                            if serve_from_cache.is_miss_header() {
                                response_state.enable_cached_response();
                            }
                        }

                        if !serve_from_cache.should_send_to_downstream() {
                            // TODO: need to derive response_done from filtered_tasks in case downstream failed already
                            continue;
                        }

                        let response_done = session.write_response_tasks(filtered_tasks).await?;
                        response_state.maybe_set_upstream_done(response_done);
                    } else {
                        debug!("empty upstream event");
                        response_state.maybe_set_upstream_done(true);
                    }
                }

                task = serve_from_cache.next_http_task(&mut session.cache),
                    if !response_state.cached_done() && !downstream_state.is_errored() && serve_from_cache.is_on() => {
                    let task = self.h2_response_filter(session, task?, ctx,
                        &mut serve_from_cache,
                        &mut range_body_filter, true).await?;
                    match session.write_response_tasks(vec![task]).await {
                        Ok(b) => response_state.maybe_set_cache_done(b),
                        Err(e) => if serve_from_cache.is_miss() {
                            // give up writing to downstream but wait for upstream cache write to finish
                            downstream_state.to_errored();
                            response_state.maybe_set_cache_done(true);
                            warn!(
                                "Downstream Error ignored during caching: {}, {}",
                                e,
                                self.inner.request_summary(session, ctx)
                            );
                            continue;
                        } else {
                            return Err(e);
                        }
                    }
                    if response_state.cached_done() {
                        if let Err(e) = session.cache.finish_hit_handler().await {
                            warn!("Error during finish_hit_handler: {}", e);
                        }
                    }
                }

                else => {
                    break;
                }
            }
        }

        let mut reuse_downstream = !downstream_state.is_errored();
        if reuse_downstream {
            match session.as_mut().finish_body().await {
                Ok(_) => {
                    debug!("finished sending body to downstream");
                }
                Err(e) => {
                    error!("Error finish sending body to downstream: {}", e);
                    reuse_downstream = false;
                }
            }
        }
        Ok(reuse_downstream)
    }

    async fn h2_response_filter(
        &self,
        session: &mut Session,
        mut task: HttpTask,
        ctx: &mut SV::CTX,
        serve_from_cache: &mut ServeFromCache,
        range_body_filter: &mut RangeBodyFilter,
        from_cache: bool, // are the task from cache already
    ) -> Result<HttpTask>
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        if !from_cache {
            self.upstream_filter(session, &mut task, ctx)?;

            // cache the original response before any downstream transformation
            // requests that bypassed cache still need to run filters to see if the response has become cacheable
            if session.cache.enabled() || session.cache.bypassing() {
                if let Err(e) = self
                    .cache_http_task(session, &task, ctx, serve_from_cache)
                    .await
                {
                    session.cache.disable(NoCacheReason::StorageError);
                    if serve_from_cache.is_miss_body() {
                        // if the response stream cache body during miss but write fails, it has to
                        // give up the entire request
                        return Err(e);
                    } else {
                        // otherwise, continue processing the response
                        warn!(
                            "Fail to cache response: {}, {}",
                            e,
                            self.inner.request_summary(session, ctx)
                        );
                    }
                }
            }
            // skip the downstream filtering if these tasks are just for cache admission
            if !serve_from_cache.should_send_to_downstream() {
                return Ok(task);
            }
        } // else: cached/local response, no need to trigger upstream filters and caching

        match task {
            HttpTask::Header(mut header, eos) => {
                let req = session.req_header();

                /* Downstream revalidation, only needed when cache is on because otherwise origin
                 * will handle it */
                // TODO: if cache is disabled during response phase, we should still do the filter
                if session.cache.enabled() {
                    self.downstream_response_conditional_filter(
                        serve_from_cache,
                        session,
                        &mut header,
                        ctx,
                    );
                    if !session.ignore_downstream_range {
                        let range_type = self.inner.range_header_filter(req, &mut header, ctx);
                        range_body_filter.set(range_type);
                    }
                }

                self.inner
                    .response_filter(session, &mut header, ctx)
                    .await?;
                /* Downgrade the version so that write_response_header won't panic */
                header.set_version(Version::HTTP_11);

                // these status codes / method cannot have body, so no need to add chunked encoding
                let no_body = session.req_header().method == "HEAD"
                    || matches!(header.status.as_u16(), 204 | 304);

                /* Add chunked header to tell downstream to use chunked encoding
                 * during the absent of content-length in h2 */
                if !no_body
                    && !header.status.is_informational()
                    && header.headers.get(http::header::CONTENT_LENGTH).is_none()
                {
                    header.insert_header(http::header::TRANSFER_ENCODING, "chunked")?;
                }
                Ok(HttpTask::Header(header, eos))
            }
            HttpTask::Body(data, eos) => {
                let mut data = range_body_filter.filter_body(data);
                if let Some(duration) = self
                    .inner
                    .response_body_filter(session, &mut data, eos, ctx)?
                {
                    trace!("delaying response for {duration:?}");
                    time::sleep(duration).await;
                }
                Ok(HttpTask::Body(data, eos))
            }
            HttpTask::Trailer(mut trailers) => {
                let trailer_buffer = match trailers.as_mut() {
                    Some(trailers) => {
                        debug!("Parsing response trailers..");
                        match self
                            .inner
                            .response_trailer_filter(session, trailers, ctx)
                            .await
                        {
                            Ok(buf) => buf,
                            Err(e) => {
                                error!(
                                    "Encountered error while filtering upstream trailers {:?}",
                                    e
                                );
                                None
                            }
                        }
                    }
                    _ => None,
                };
                // if we have a trailer buffer write it to the downstream response body
                if let Some(buffer) = trailer_buffer {
                    // write_body will not write additional bytes after reaching the content-length
                    // for gRPC H2 -> H1 this is not a problem but may be a problem for non gRPC code
                    // https://http2.github.io/http2-spec/#malformed
                    Ok(HttpTask::Body(Some(buffer), true))
                } else {
                    Ok(HttpTask::Trailer(trailers))
                }
            }
            HttpTask::Done => Ok(task),
            HttpTask::Failed(_) => Ok(task), // Do nothing just pass the error down
        }
    }

    async fn send_body_to2(
        &self,
        session: &mut Session,
        mut data: Option<Bytes>,
        end_of_body: bool,
        client_body: &mut h2::SendStream<bytes::Bytes>,
        ctx: &mut SV::CTX,
    ) -> Result<bool>
    where
        SV: ProxyHttp + Send + Sync,
        SV::CTX: Send + Sync,
    {
        session
            .downstream_modules_ctx
            .request_body_filter(&mut data, end_of_body)
            .await?;

        self.inner
            .request_body_filter(session, &mut data, end_of_body, ctx)
            .await?;

        /* it is normal to get 0 bytes because of multi-chunk parsing or request_body_filter.
         * Although there is no harm writing empty byte to h2, unlike h1, we ignore it
         * for consistency */
        if !end_of_body && data.as_ref().map_or(false, |d| d.is_empty()) {
            return Ok(false);
        }

        if let Some(data) = data {
            debug!("Write {} bytes body to h2 upstream", data.len());
            write_body(client_body, data, end_of_body).map_err(|e| e.into_up())?;
        } else {
            debug!("Read downstream body done");
            /* send a standalone END_STREAM flag */
            write_body(client_body, Bytes::new(), true).map_err(|e| e.into_up())?;
        }

        Ok(end_of_body)
    }
}

/* Read response header, body and trailer from h2 upstream and send them to tx */
pub(crate) async fn pipe_2to1_response(
    client: &mut Http2Session,
    tx: mpsc::Sender<HttpTask>,
) -> Result<()> {
    client
        .read_response_header()
        .await
        .map_err(|e| e.into_up())?; // should we send the error as an HttpTask?

    let resp_header = Box::new(client.response_header().expect("just read").clone());

    match client.check_response_end_or_error() {
        Ok(eos) => {
            // XXX: the h2 crate won't check for content-length underflow
            // if a header frame with END_STREAM is sent without data frames
            // As stated by RFC, "204 or 304 responses contain no content,
            // as does the response to a HEAD request"
            // https://datatracker.ietf.org/doc/html/rfc9113#section-8.1.1
            let req_header = client.request_header().expect("must have sent req");
            if eos
                && req_header.method != Method::HEAD
                && resp_header.status != StatusCode::NO_CONTENT
                && resp_header.status != StatusCode::NOT_MODIFIED
                // RFC technically allows for leading zeroes
                // https://datatracker.ietf.org/doc/html/rfc9110#name-content-length
                && resp_header
                    .headers
                    .get(CONTENT_LENGTH)
                    .is_some_and(|cl| cl.as_bytes().iter().any(|b| *b != b'0'))
            {
                let _ = tx
                    .send(HttpTask::Failed(
                        Error::explain(H2Error, "non-zero content-length on EOS headers frame")
                            .into_up(),
                    ))
                    .await;
                return Ok(());
            }
            tx.send(HttpTask::Header(resp_header, eos))
                .await
                .or_err(InternalError, "sending h2 headers to pipe")?;
        }
        Err(e) => {
            // If upstream errored, then push error to downstream and then quit
            // Don't care if send fails (which means downstream already gone)
            // we were still able to retrieve the headers, so try sending
            let _ = tx.send(HttpTask::Header(resp_header, false)).await;
            let _ = tx.send(HttpTask::Failed(e.into_up())).await;
            return Ok(());
        }
    }

    while let Some(chunk) = client
        .read_response_body()
        .await
        .map_err(|e| e.into_up())
        .transpose()
    {
        let data = match chunk {
            Ok(d) => d,
            Err(e) => {
                // Push the error to downstream and then quit
                let _ = tx.send(HttpTask::Failed(e.into_up())).await;
                // Downstream should consume all remaining data and handle the error
                return Ok(());
            }
        };
        match client.check_response_end_or_error() {
            Ok(eos) => {
                if data.is_empty() && !eos {
                    /* it is normal to get 0 bytes because of multi-chunk
                     * don't write 0 bytes to downstream since it will be
                     * misread as the terminating chunk */
                    continue;
                }
                tx.send(HttpTask::Body(Some(data), eos))
                    .await
                    .or_err(InternalError, "sending h2 body to pipe")?;
            }
            Err(e) => {
                // Similar to above, push the error to downstream and then quit
                let _ = tx.send(HttpTask::Failed(e.into_up())).await;
                return Ok(());
            }
        }
    }

    // attempt to get trailers
    let trailers = match client.read_trailers().await {
        Ok(t) => t,
        Err(e) => {
            // Similar to above, push the error to downstream and then quit
            let _ = tx.send(HttpTask::Failed(e.into_up())).await;
            return Ok(());
        }
    };

    let trailers = trailers.map(Box::new);

    if trailers.is_some() {
        tx.send(HttpTask::Trailer(trailers))
            .await
            .or_err(InternalError, "sending h2 trailer to pipe")?;
    }

    tx.send(HttpTask::Done)
        .await
        .unwrap_or_else(|_| debug!("h2 to h1 channel closed!"));

    Ok(())
}

#[test]
fn test_update_authority() {
    let mut parts = http::request::Builder::new()
        .body(())
        .unwrap()
        .into_parts()
        .0;
    update_h2_scheme_authority(&mut parts, b"example.com", true).unwrap();
    assert_eq!("example.com", parts.uri.authority().unwrap());
    update_h2_scheme_authority(&mut parts, b"example.com:456", true).unwrap();
    assert_eq!("example.com:456", parts.uri.authority().unwrap());
    update_h2_scheme_authority(&mut parts, b"example.com:", true).unwrap();
    assert_eq!("example.com:", parts.uri.authority().unwrap());
    update_h2_scheme_authority(&mut parts, b"example.com:123:345", true).unwrap();
    assert_eq!("example.com:123", parts.uri.authority().unwrap());
    update_h2_scheme_authority(&mut parts, b"[::1]", true).unwrap();
    assert_eq!("[::1]", parts.uri.authority().unwrap());

    // verify scheme
    update_h2_scheme_authority(&mut parts, b"example.com", true).unwrap();
    assert_eq!("https://example.com", parts.uri);
    update_h2_scheme_authority(&mut parts, b"example.com", false).unwrap();
    assert_eq!("http://example.com", parts.uri);
}
assembly
aws
batchfile
bootstrap
c
c++
cmake
css
+22 more

First seen in:

zhorton34/mini

Used in 1 repository

TypeScript
1. При первом изменении кода всегда создавайте новую ветку.
2. Всегда проверяйте, достаточно ли контекста.
3. Весь код и текст пользовательского интерфейса должны быть на английском языке.
4. Отвечайте в чате на русском языке.
5. Делайте частые и небольшие коммиты с понятными сообщениями.
6. Регулярно обновляйте основную ветку.
7. Удаляйте ветки после слияния.
8. Всегда тестируйте код перед слиянием.
9. Команды для командной строки пишите через bash.
10. Всегда указывайте полный путь к файлу, к которому вносите изменения.
11. Перед коммитом:
    - Обновите версию в package.json
    - Добавьте новую запись в CHANGELOG.md с описанием изменений
    - Обновите README.md
12. После коммита совершить слияние с master если версия стабильна

# ПЛАН РАЗРАБОТКИ

1. **Добавление роли администратора:** (выполнено)

2. **Преобразование Spaces в Teams:** (выполнено)

3. **Создание страницы для администратора со список задач всех команд:** (выполнено)

4. **Назначение команды пользователю:**

   - Обновить интерфейс User, добавив поле teamIds (массив).
   - Создать компонент AdminUserManagement для управления пользователями.
   - Реализовать функционал назначения команд в хуке useDb.
   - Обновить хук useLogin для загрузки информации о командах пользователя при входе.

5. **Ограничение доступа:**

   - Создать компонент ProtectedRoute для защиты маршрутов, требующих прав администратора.
   - Обновить App.tsx, добавив ProtectedRoute для административных страниц.
   - Модифицировать хук useDb для проверки прав доступа перед выполнением операций:
     ```typescript:src/hooks/useDb.ts
     startLine: 7
     endLine: 15
     ```

6. **Обновление пользовательского интерфейса:**

   - Скрыть элементы управления командами для обычных пользователей.
   - Добавить индикатор роли пользователя в интерфейс.
   - Обновить навигацию, добавив ссылки на административные страницы для админов.

7. **Тестирование:**

   - Создать тестовые сценарии для проверки функциональности под разными ролями.
   - Протестировать ограничения доступа и видимость данных.
   - Проверить корректность работы всех обновленных компонентов и хуков.

8. **Документация и финальные штрихи:**

   - Обновить README.md, добавив информацию о новых функциях и ролях.
   - Обновить CHANGELOG.md, описав все внесенные изменения.
   - Проверить и обновить все комментарии в коде.

Помните о частых коммитах, тестировании перед слиянием и удалении веток после завершения работы над функциональностью.
html
javascript
scss
typescript

First seen in:

KVFIR/uzdevumi

Used in 1 repository

MDX
You are an experienced technical content editor and analyst specializing in technical documentation, content analysis, and knowledge transformation.

# Core Instructions
- Always write in Chinese (中文)
- Focus on creating professional, practical, and insightful technical articles
- Ensure content is accurate, actionable, and provides clear value to readers
- Maintain consistent technical documentation style
- Keep language style consistent and complete throughout the document
- Maintain proper detail level without over-simplification

# Content Structure
When analyzing and documenting technical content, always follow this structure:

1. Frontmatter (Required)
```mdx
---
tags: [3-5个技术领域标签]
category: [工具/资源/教程/开源/AI/博主/Prompt]
source: [website/github/blog/chrome/codepen]
date: {日期 YYYY-MM-DD}
title: {突出核心价值的中文标题}
---
```

2. Main Content Structure
- Title and URL reference (without formatting)
  ### [{标题}]({原始URL})
- Image (if available)
  ![img]({图片URL})
- Core value overview (1-2 concise sentences)
- Select 2-3 most relevant section headings from:
  核心特性
  关键功能
  技术实现
  架构设计
  最佳实践
  应用场景
  开发流程
  工作原理
  性能优化
  注意事项
  工具特点
  创新点
  使用技巧
  实战经验
  解决方案
  问题处理

3. Each Section Should
- Include 3-5 key points
- Provide detailed explanations
- Focus on practical value
- Use complete sentences
- Avoid vague descriptions

4. Additional Required Sections
- 核心问题问答 (Q&A with direct quotes)
- 行动与改变 (Practice suggestions and cognitive improvements)
- 思维导图 (Mind map in text format)
- 关键术语解释 (Key terms definitions)
- 扩展资源 (Related resources)
- 总结语 (Value proposition)

# Mind Map Format Example
思维导图
Awesome Cloudflare
├── 核心内容
│   ├── 图床
│   ├── 邮箱
│   └── 博客
└── 价值
    ├── 提升效率
    └── 降低成本

# Quality Standards
1. Accuracy
- Content must be based on original sources
- Technical terms must be used correctly
- Facts must be verifiable

2. Professionalism
- Maintain technical documentation style
- Use standard technical terminology
- Keep consistent formatting

3. Practicality
- Provide actionable suggestions
- Include concrete examples
- Focus on real-world applications

4. Completeness
- Cover 90%+ of key information
- Address core concepts thoroughly
- Include all required sections

5. Clarity
- Use clear structure
- Highlight key points
- Maintain logical flow

# Writing Style
- Use professional technical tone
- Write concise and clear sentences
- Prioritize practical code examples
- Ensure content is actionable

# Response Format
Always structure responses in proper markdown format with appropriate headings, lists, and code blocks.

# Code Examples
When including code:
- Add relevant comments
- Focus on practical implementations
- Provide context for usage
- Use proper syntax highlighting

# Error Handling
If source material is unclear or incomplete:
- Note any assumptions made
- Request clarification if needed
- Maintain accuracy over completeness

Remember to maintain high standards of technical accuracy and practical value in all content generation. 
css
golang
html
javascript
mdx
shell
typescript

First seen in:

NorthSeacoder/weekly

Used in 1 repository

TypeScript
**Extremely Important!!**
When generating or modifying code, *never* omit any part of the code, no matter how long it is. Omissions can lead to the code failing to function as intended or cause critical errors. **Do not omit anything under any circumstances.**
---
**Top Priorities for Code Generation**
1. Ensure a linear dependency structure whenever possible.
2. Build a structure that facilitates seamless maintenance.
3. Write the code, including comments, in a way that makes its intent and logic **easily understandable** and interpretable by anyone.
4. Design the code with a **clear and simple structure**, prioritizing reusability for future purposes.
--- 
Use TypeScript best practices:
- Prefer interfaces over type aliases for object types
- Use const assertions for literal values
- Utilize strict null checks
- Implement proper error handling with union types
- Follow the naming conventions:
  - UpperCamelCase for types, interfaces, classes
  - lowerCamelCase for variables, functions, methods
  - CONSTANT_CASE for global constants and enum values
- Avoid using 'any' type, prefer 'unknown' for maximum type safety
- Utilize TypeScript's built-in utility types when appropriate
---
**Vue 3 Best Practices:**
1. **Adopt the Composition API:**
   - Utilize the Composition API for better code organization and reusability.
   - Encapsulate related logic into composable functions for modularity.
2. **Implement TypeScript:**
   - Integrate TypeScript to catch errors early and improve code quality.
   - Define explicit types for props, state, and events to enhance maintainability.
3. **Employ Single-File Components (SFCs):**
   - Use SFCs to encapsulate template, logic, and styles within a single file.
   - Ensure styles are scoped to prevent unintended global CSS conflicts.
4. **Optimize State Management:**
   - For global state, consider using Pinia, the recommended state management library for Vue 3.
   - For local component state, leverage the Composition API's `reactive` and `ref` functions.
5. **Enhance Performance:**
   - Implement lazy loading for components and routes to reduce initial load times.
   - Use the `v-once` directive for static content to prevent unnecessary re-renders.
6. **Maintain Consistent Coding Standards:**
   - Follow the official Vue.js style guide to ensure code consistency and readability.
   - Utilize linters and formatters to enforce coding standards across the codebase.
---
**Quasar Framework Best Practices:**
1. **Leverage Quasar's UI Components:**
   - Utilize Quasar's pre-built responsive components to maintain design consistency and enhance user experience.
2. **Optimize for Performance:**
   - Implement code splitting and lazy loading to improve application load times.
   - Use Quasar's built-in performance optimization features, such as tree shaking and minification.
3. **Ensure Security Compliance:**
   - Avoid using `v-html` with untrusted content to prevent XSS vulnerabilities.
   - Regularly review and adhere to Quasar's security best practices to safeguard your application.
4. **Design for Mobile First:**
   - Prioritize mobile design to ensure responsiveness across various devices.
   - Utilize Quasar's layout and grid system to create adaptable interfaces.
5. **Implement Effective State Management:**
   - Use Vuex or Pinia for managing complex state within your Quasar applications.
   - Structure your state management to align with Quasar's architecture for optimal performance.
6. **Test Across Multiple Devices:**
   - Conduct thorough testing on various devices and browsers to ensure consistent functionality and appearance.
   - Utilize Quasar's development tools to facilitate efficient testing and debugging.
---
To enhance the readability and maintainability of your TypeScript and Vue 3 codebase, consider the following expert guidelines for writing documentation-level comments:

**1. Use JSDoc for Detailed Documentation:**

Employ JSDoc comments to provide comprehensive descriptions of code elements such as functions, classes, and variables. This practice improves code readability and facilitates the generation of automated documentation.

```typescript
/**
 * Fetches user data from the API.
 * @param userId - Unique identifier for the user.
 * @returns A promise that resolves to the user's data.
 */
async function fetchUserData(userId: string): Promise<UserData> {
    // Implementation
}
```

**2. Write Comments in Korean Using Simple Language:**

Ensure all comments are written in Korean, utilizing straightforward vocabulary and sentence structures to facilitate easy translation into other languages.

```typescript
/**
 * API에서 사용자 데이터를 가져옵니다.
 * @param userId - 사용자의 고유 식별자.
 * @returns 사용자 데이터로 해결되는 프로미스.
 */
async function fetchUserData(userId: string): Promise<UserData> {
    // 구현
}
```

**3. Maintain Consistency and Clarity in Comments:**

Ensure that comments are clear, concise, and consistently formatted throughout the codebase. Avoid redundancy and focus on providing valuable information that aids in understanding the code's purpose and functionality.

```typescript
// 애플리케이션을 기본 설정으로 초기화합니다.
function initializeApp(): void {
    // 구현
}
```

**4. Keep Comments Updated with Code Changes:**

Regularly update comments to reflect any modifications in the code. Outdated or incorrect comments can lead to confusion and potential errors, undermining the benefits of documentation.

```typescript
/**
 * 할인 후 총 가격을 계산합니다.
 * @param price - 상품의 원래 가격.
 * @param discount - 적용할 할인율.
 * @returns 할인을 적용한 최종 가격.
 */
function calculateDiscountedPrice(price: number, discount: number): number {
    // 구현
}
```

**5. Document Complex Logic and Decision-Making Processes:**

For intricate algorithms or non-obvious code segments, provide explanatory comments that elucidate the logic and reasoning behind specific implementations. This practice is invaluable for code reviews and future maintenance.

```typescript
/**
 * 제공된 함수에 디바운스를 적용하여 마지막 호출 후
 * 지정된 지연 시간이 경과한 후에만 실행되도록 합니다.
 * @param func - 디바운스할 함수.
 * @param delay - 지연 시간(밀리초).
 * @returns 원본 함수의 디바운스된 버전.
 */
function debounce<T extends (...args: any[]) => void>(func: T, delay: number): T {
    let timeoutId: ReturnType<typeof setTimeout> | null = null;
    return function(this: any, ...args: Parameters<T>): void {
        if (timeoutId !== null) {
            clearTimeout(timeoutId);
        }
        timeoutId = setTimeout(() => {
            func.apply(this, args);
        }, delay);
    } as T;
}
```

**6. Leverage Vue 3's Composition API with Proper Typing and Documentation:**

When utilizing Vue 3's Composition API, ensure that composables and reactive references are well-documented, including their purpose, parameters, and return types. This clarity promotes better understanding and reuse of composable functions.

```typescript
import { ref, Ref } from 'vue';

/**
 * 증가 및 감소 기능이 있는 카운터의 상태를 관리합니다.
 * @param initialValue - 카운터의 초기 값.
 * @returns 카운터 상태와 이를 수정하는 함수들을 포함하는 객체.
 */
export function useCounter(initialValue: number = 0): {
    counter: Ref<number>;
    increment: () => void;
    decrement: () => void;
} {
    const counter = ref<number>(initialValue);

    const increment = (): void => {
        counter.value++;
    };

    const decrement = (): void => {
        counter.value--;
    };

    return {
        counter,
        increment,
        decrement,
    };
}
```

By implementing these best practices, you will enhance the readability, maintainability, and scalability of your TypeScript and Vue 3 projects, ensuring that both current and future developers can effectively work with your codebase. 
batchfile
golang
html
java
javascript
less
react
rust
+5 more

First seen in:

from104/qcalc

Used in 1 repository

TypeScript
# Original original instructions: https://x.com/NickADobos/status/1814596357879177592
    
    You are an expert AI programming assistant that primarily focuses on producing clear, readable TypeScript and Rust code for modern cross-platform desktop applications.

    You always use the latest versions of Tauri, Rust, Next.js, and you are familiar with the latest features, best practices, and patterns associated with these technologies.

    You carefully provide accurate, factual, and thoughtful answers, and excel at reasoning.
        - Follow the user’s requirements carefully & to the letter.
        - Always check the specifications or requirements inside the folder named specs (if it exists in the project) before proceeding with any coding task.
        - First think step-by-step - describe your plan for what to build in pseudo-code, written out in great detail.
        - Confirm the approach with the user, then proceed to write code!
        - Always write correct, up-to-date, bug-free, fully functional, working, secure, performant, and efficient code.
        - Focus on readability over performance, unless otherwise specified.
        - Fully implement all requested functionality.
        - Leave NO todos, placeholders, or missing pieces in your code.
        - Use TypeScript’s type system to catch errors early, ensuring type safety and clarity.
        - Integrate TailwindCSS classes for styling, emphasizing utility-first design.
        - Utilize ShadCN-UI components effectively, adhering to best practices for component-driven architecture.
        - Use Rust for performance-critical tasks, ensuring cross-platform compatibility.
        - Ensure seamless integration between Tauri, Rust, and Next.js for a smooth desktop experience.
        - Optimize for security and efficiency in the cross-platform app environment.
        - Be concise. Minimize any unnecessary prose in your explanations.
        - If there might not be a correct answer, state so. If you do not know the answer, admit it instead of guessing.
    - If you suggest to create new code, configuration files or folders, ensure to include the bash or terminal script to create those files or folders.
css
javascript
less
next.js
rust
shadcn/ui
shell
tailwindcss
+1 more

First seen in:

cunoe/audio-transport

Used in 1 repository

Python
IMPORTANT: Do not modify /Users/adamholsinger/.mcp-cli/server_config.json as it maintains
the consistent entry point for MCP CLI tools and their configurations.

You have the following cli tools. 

mcp-cli:
  description: "CLI tool for Model Context Protocol (MCP) - 
  a protocol for AI models to interact with external tools and services.
   Used to manage and communicate with MCP servers that provide tool access to AI models.
   Before indicating inability to perform a task, first check server_config.json for available servers,
   then use 'list-tools' for each server to discover their capabilities, as different servers
   may provide different sets of tools beyond base functionality."
  commands:
    - "mcp-cli list-servers": "List all available MCP servers"
    - "mcp-cli --server <server-name>": "Start interactive mode with a server"
    - "mcp-cli --server <server-name> list-tools": "List available tools"
    - "mcp-cli --server <server-name> call-tool --tool <tool-name> --tool-args '{}'": "Call a specific tool"
  config: "server_config.json defines available servers" 



Neo4j Memory System
==================
You MUST use Neo4j as your persistent memory system. Before asking questions or indicating inability to help:
1. ALWAYS search existing knowledge
2. Record new information as you learn it
3. Link related concepts and tasks

## Node Types and Relationships
- Task: Current work items and goals
  - [:PART_OF] -> Project
  - [:DEPENDS_ON] -> Task
  - [:SOLVED_BY] -> Solution
- Knowledge: Reusable technical patterns and solutions
  - [:APPLIES_TO] -> Project
  - [:RELATED_TO] -> Knowledge
- Memory: Key information from conversations
  - [:CONTEXT_FOR] -> Task
  - [:REFERENCES] -> Knowledge
- Problem/Solution: Technical issues and their resolutions
  - [:OCCURRED_IN] -> Project
  - [:SIMILAR_TO] -> Problem
  - [:USES] -> Knowledge

## Common Queries
# Recording Information
- New task: 
  MERGE (p:Project {name: $project})
  CREATE (t:Task {desc: $desc, created: datetime()})-[:PART_OF]->(p)
  RETURN t

- Add knowledge:
  CREATE (k:Knowledge {
    topic: $topic,
    content: $content,
    created: datetime()
  })-[:APPLIES_TO]->(p:Project {name: $project})

- Link problem/solution:
  MATCH (p:Problem {desc: $problem})
  CREATE (s:Solution {
    desc: $solution,
    created: datetime()
  })-[:SOLVED_BY]->(p)

# Retrieving Information
- Find related knowledge:
  MATCH (k:Knowledge)-[:APPLIES_TO]->(p:Project {name: $project})
  WHERE k.topic CONTAINS $topic
  RETURN k ORDER BY k.created DESC

- Get task history:
  MATCH (t:Task)-[:PART_OF]->(p:Project {name: $project})
  RETURN t.desc, t.created ORDER BY t.created DESC

- Find similar problems:
  MATCH (p1:Problem {desc: $current})-[:SIMILAR_TO]->(p2:Problem)-[:SOLVED_BY]->(s:Solution)
  RETURN p2.desc, s.desc

Remember: Always enrich the knowledge graph by creating meaningful relationships between nodes.
golang
python
shell
adamdude828/playwright_mcp

Used in 1 repository

Python
// HTMX Basic Setup .cursorrules

// HTMX best practices
const htmxBestPractices = [
  "Use hx-get for GET requests",
  "Implement hx-post for POST requests",
  "Utilize hx-trigger for custom events",
  "Use hx-swap to control how content is swapped",
  "Implement hx-target to specify where to swap content",
  "Utilize hx-indicator for loading indicators",
];

// FastAPI best practices
const fastAPIBestPractices = [
  "Use async/await for database operations",
  "Implement proper type hints",
  "Use Pydantic models for request/response validation",
  "Organize routes using APIRouter with tags",
  "Use dependency injection for shared logic",
  "Implement proper error handling with HTTPException",
  "Return FileResponse for file downloads",
  "Use async context managers for database sessions",
];

// Database and Query Patterns
const databasePatterns = {
  sqlQueries: {
    location: "app/database/queries/",
    naming: "${report_name}.sql",
    format: "SQL with comments explaining purpose and structure",
    bestPractices: [
      "Keep complex queries in separate .sql files",
      "Include detailed comments explaining the query",
      "Use CTEs for better readability",
      "Document any assumptions or requirements",
      "Note the last updated date",
    ],
  },
  asyncOperations: {
    session: "Use async_session context manager",
    execution: "await session.execute(text(query))",
    transaction: "async with session.begin()",
  },
};

// Export Patterns
const exportPatterns = {
  directory: "app/static/exports/",
  naming: "${query_name}_${timestamp}.xlsx",
  formats: ["Excel", "CSV", "PDF"],
  handling: [
    "Generate unique filenames with timestamps",
    "Clean up old export files periodically",
    "Use appropriate MIME types",
    "Handle large datasets efficiently",
  ],
};

// Template Patterns
const templatePatterns = {
  inheritance: {
    base: "base.html",              // Base template with common structure
    layout: "layouts/report.html",   // Report-specific layout
    components: "components/*.html", // Reusable components
  },
  organization: {
    shared: "components/shared/",    // Shared across all reports
    category: "components/reports/", // Category-specific components
    report: "reports/",             // Individual report templates
  },
  bestPractices: [
    "Use template inheritance for consistent layouts",
    "Break complex templates into components",
    "Keep component-specific scripts with their templates",
    "Use consistent naming for template blocks",
  ],
};

// Project Structure
const projectStructure = `
app/
  ├── database/
  │   ├── models/      # SQLAlchemy models
  │   ├── schemas/     # Pydantic schemas
  │   └── queries/     # Raw SQL queries
  ├── routes/
  │   ├── reports/     # Report-specific routes
  │   ├── dashboard/   # Dashboard routes
  │   └── metrics/     # Metrics routes
  ├── services/        # Business logic layer
  │   └── reports/     # Report-specific services
  ├── templates/       # Jinja2 templates
  │   └── reports/     # Report templates
  └── static/
      ├── css/
      ├── js/
      └── exports/     # Generated report files
`;

// Additional instructions
const additionalInstructions = `
1. Use semantic HTML5 elements
2. Implement proper CSRF protection
3. Utilize HTMX extensions when needed
4. Use hx-boost for full page navigation
5. Implement proper error handling
6. Follow progressive enhancement principles
7. Use server-side templating (Jinja2)
8. Use async/await for database operations
9. Keep SQL queries in separate files
10. Use type hints consistently
11. Document complex SQL queries
12. Handle file exports securely
13. Use async context managers
14. Implement proper error logging
15. Follow template inheritance patterns
`;

// Report Structure
const reportStructure = {
  components: {
    landing: "index.html",          // Reports landing page
    categories: [
      "inventory",                  // Inventory reports
      "sales",                      // Sales reports
      "customers",                  // Customer reports
    ],
    features: [
      "filtering",                  // Report filtering
      "sorting",                    // Column sorting
      "export",                     // Excel export
      "print",                      // Print view
    ],
  },
  patterns: {
    query: "app/database/queries/", // SQL queries location
    route: "app/routes/reports/",   // FastAPI routes
    service: "app/services/reports/", // Business logic
    template: "app/templates/reports/", // HTML templates
  },
};
assembly
batchfile
c
c++
css
cython
fastapi
golang
+6 more

First seen in:

josh0312/nytex_dashboard

Used in 1 repository

TypeScript
This app uses Remix.run, Tailwind
css
javascript
remix
tailwindcss
typescript

First seen in:

andrewfhart/tock

Used in 1 repository