Metadata-Version: 2.4
Name: asgi-correlation-id
Version: 5.0.0
Summary: Middleware correlating project logs to individual requests
Keywords: asgi,fastapi,starlette,async,correlation,correlation-id,request-id,x-request-id,tracing,logging,middleware,sentry
Author: Sondre Lillebø Gundersen
Author-email: Sondre Lillebø Gundersen <sondrelg@live.no>
License-Expression: MIT
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Web Environment
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Internet :: WWW/HTTP
Classifier: Topic :: Internet :: WWW/HTTP :: Dynamic Content
Classifier: Topic :: Software Development
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Dist: starlette>=0.18
Requires-Dist: sentry-sdk>=2.12.0 ; extra == 'sentry'
Maintainer: Jonas Krüger Svensson
Maintainer-email: Jonas Krüger Svensson <jonas-ks@hotmail.com>
Requires-Python: >=3.10
Project-URL: Homepage, https://github.com/snok/asgi-correlation-id
Project-URL: Repository, https://github.com/snok/asgi-correlation-id
Provides-Extra: sentry
Description-Content-Type: text/markdown

[![pypi](https://img.shields.io/pypi/v/asgi-correlation-id)](https://pypi.org/project/asgi-correlation-id/)
[![test](https://github.com/snok/asgi-correlation-id/actions/workflows/test.yml/badge.svg)](https://github.com/snok/asgi-correlation-id/actions/workflows/test.yml)
[![codecov](https://codecov.io/gh/snok/asgi-correlation-id/branch/main/graph/badge.svg?token=1aXlWPm2gb)](https://codecov.io/gh/snok/asgi-correlation-id)

# ASGI Correlation ID middleware

Middleware for reading or generating correlation IDs for each incoming request. Correlation IDs can then be added to your
logs, making it simple to retrieve all logs generated from a single HTTP request.

When the middleware detects a correlation ID HTTP header in an incoming request, the ID is stored. If no header is
found, a correlation ID is generated for the request instead.

The middleware checks for the `X-Request-ID` header by default, but can be set to any key.
`X-Correlation-ID` is also pretty commonly used.

## Example

Once logging is configured, your output will go from this:

```
INFO    ... project.views  This is an info log
WARNING ... project.models This is a warning log
INFO    ... project.views  This is an info log
INFO    ... project.views  This is an info log
WARNING ... project.models This is a warning log
WARNING ... project.models This is a warning log
```

to this:

```docker
INFO    ... [773fa6885] project.views  This is an info log
WARNING ... [773fa6885] project.models This is a warning log
INFO    ... [0d1c3919e] project.views  This is an info log
INFO    ... [99d44111e] project.views  This is an info log
WARNING ... [0d1c3919e] project.models This is a warning log
WARNING ... [99d44111e] project.models This is a warning log
```

Now we're actually able to see which logs are related.

# Installation

```
pip install asgi-correlation-id
```

# Setup

To set up the package, you need to add the middleware and configure logging.

## Adding the middleware

The middleware can be added like this:

```python
from fastapi import FastAPI

from asgi_correlation_id import CorrelationIdMiddleware

app = FastAPI()
app.add_middleware(CorrelationIdMiddleware)
```

or any other way your framework allows.

For [Starlette](https://github.com/encode/starlette) apps, just substitute `FastAPI` with `Starlette` in all examples.

## Configure logging

This section assumes you have already started configuring logging in your project. If this is not the case, check out
the section on [setting up logging from scratch](#setting-up-logging-from-scratch) instead.

To set up logging of the correlation ID, you simply have to add the log-filter the package provides.

If your current log-config looked like this:

```python
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'web': {
            'class': 'logging.Formatter',
            'datefmt': '%H:%M:%S',
            'format': '%(levelname)s ... %(name)s %(message)s',
        },
    },
    'handlers': {
        'web': {
            'class': 'logging.StreamHandler',
            'formatter': 'web',
        },
    },
    'loggers': {
        'my_project': {
            'handlers': ['web'],
            'level': 'DEBUG',
            'propagate': True,
        },
    },
}
```

You simply have to add the filter, like this:

```diff
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
+   'filters': {
+       'correlation_id': {
+           '()': 'asgi_correlation_id.CorrelationIdFilter',
+           'uuid_length': 32,
+           'default_value': '-',
+       },
+   },
    'formatters': {
        'web': {
            'class': 'logging.Formatter',
            'datefmt': '%H:%M:%S',
+           'format': '%(levelname)s ... [%(correlation_id)s] %(name)s %(message)s',
        },
    },
    'handlers': {
        'web': {
            'class': 'logging.StreamHandler',
+           'filters': ['correlation_id'],
            'formatter': 'web',
        },
    },
    'loggers': {
        'my_project': {
            'handlers': ['web'],
            'level': 'DEBUG',
            'propagate': True,
        },
    },
}
```

If you're using a json log-formatter, just add `correlation-id: %(correlation_id)s` to your list of properties.

## Middleware configuration

The middleware can be configured in a few ways, but there are no required arguments.

```python
app.add_middleware(
    CorrelationIdMiddleware,
    header_name='X-Request-ID',
    update_request_header=True,
    generator=lambda: uuid4().hex,
    validator=is_valid_uuid4,
    transformer=lambda a: a,
)
```

Configurable middleware arguments include:

**header_name**

- Type: `str`
- Default: `X-Request-ID`
- Description: The header name decides which HTTP header value to read correlation IDs from. `X-Request-ID` and
  `X-Correlation-ID` are common choices.

**update_request_header**

- Type: `bool`
- Default: `True`
- Description: Whether to update incoming request's header value with the generated correlation ID. This is to support
  use cases where it's relied on the presence of the request header (like various tracing middlewares).

**generator**

- Type: `Callable[[], str]`
- Default: `lambda: uuid4().hex`
- Description: The generator function is responsible for generating new correlation IDs when no ID is received from an
  incoming request's headers. We use UUIDs by default, but if you prefer, you could use libraries
  like [nanoid](https://github.com/puyuan/py-nanoid) or your own custom function.

**validator**

- Type: `Callable[[str], bool]`
- Default: `is_valid_uuid4`
- Description: The validator function is used when reading incoming HTTP header values. By default, we accept any
  32-character hex string that can be parsed as a UUID — this includes standard UUIDv4s, nginx-generated request IDs
  (`$request_id`), and OpenTelemetry trace IDs. Non-hex or malformed values are rejected and a new ID is generated
  instead. If you prefer to allow any header value, set this to `None`. For stricter UUIDv4 validation, pass your own
  validator.

**transformer**

- Type: `Callable[[str], str]`
- Default: `lambda a: a`
- Description: Most users won't need a transformer, and by default we do nothing.
  The argument was added for cases where users might want to alter incoming or generated ID values in some way. It
  provides a mechanism for transforming an incoming ID in a way you see fit. See the middleware code for more context.

## CORS

If you are using cross-origin resource sharing ([CORS](https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS)), e.g.
you are making requests to an API from a frontend JavaScript code served from a different origin, you have to ensure
two things:

- permit correlation ID header in the incoming requests' HTTP headers so the value can be reused by the middleware,
- add the correlation ID header to the allowlist in responses' HTTP headers so it can be accessed by the browser.

This can be best accomplished by using a dedicated middleware for your framework of choice. Here are some examples.

### Starlette

Docs: https://www.starlette.io/middleware/#corsmiddleware

```python
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware


middleware = [
    Middleware(
        CORSMiddleware,
        allow_origins=['*'],
        allow_methods=['*'],
        allow_headers=['X-Requested-With', 'X-Request-ID'],
        expose_headers=['X-Request-ID']
    )
]

app = Starlette(..., middleware=middleware)
```

### FastAPI

Docs: https://fastapi.tiangolo.com/tutorial/cors/

```python
from app.main import app
from fastapi.middleware.cors import CORSMiddleware


app.add_middleware(
    CORSMiddleware,
    allow_origins=['*'],
    allow_methods=['*'],
    allow_headers=['X-Requested-With', 'X-Request-ID'],
    expose_headers=['X-Request-ID']
)
```

For more details on the topic, refer to the [CORS protocol](https://fetch.spec.whatwg.org/#http-cors-protocol).

## Exception handling

By default, the `X-Request-ID` response header will be included in all responses from the server, *except* in the case
of unhandled server errors. If you wish to include request IDs in the case of a `500` error you can add a custom
exception handler.

Here are some simple examples to help you get started. See each framework's documentation for more info.

### Starlette

Docs: https://www.starlette.io/exceptions/

```python
from starlette.requests import Request
from starlette.responses import PlainTextResponse
from starlette.applications import Starlette

from asgi_correlation_id import correlation_id


async def custom_exception_handler(request: Request, exc: Exception) -> PlainTextResponse:
    return PlainTextResponse(
        "Internal Server Error",
        status_code=500,
        headers={'X-Request-ID': correlation_id.get() or ""}
    )


app = Starlette(
    ...,
    exception_handlers={500: custom_exception_handler}
)
```

#### FastAPI

Docs: https://fastapi.tiangolo.com/tutorial/handling-errors/

```python
from app.main import app
from fastapi import HTTPException, Request
from fastapi.exception_handlers import http_exception_handler
from fastapi.responses import JSONResponse

from asgi_correlation_id import correlation_id


@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse:
    return await http_exception_handler(
        request,
        HTTPException(
            500,
            'Internal server error',
            headers={'X-Request-ID': correlation_id.get() or ""}
        ))
```

If you are using CORS, you also have to include the `Access-Control-Allow-Origin` and `Access-Control-Expose-Headers`
headers in the error response. For more details, see the [CORS section](#cors) above.

# Setting up logging from scratch

If your project does not have logging configured, this section will explain how to get started. If you want even more
details, take a look
at [this blogpost](https://medium.com/@sondrelg_12432/setting-up-request-id-logging-for-your-fastapi-application-4dc190aac0ea)
.

The Python [docs](https://docs.python.org/3/library/logging.config.html) explain there are a few configuration functions
you may use for simpler setup. For this example we will use `dictConfig`, because that's what, e.g., Django users should
find most familiar, but the different configuration methods are interchangable, so if you want to use another method,
just browse the python docs and change the configuration method as you please.

The benefit of `dictConfig` is that it lets you specify your entire logging configuration in a single data structure,
and it lets you add conditional logic to it. The following example shows how to set up both console and JSON logging:

```python
from logging.config import dictConfig

from app.core.config import settings


def configure_logging() -> None:
    dictConfig(
        {
            'version': 1,
            'disable_existing_loggers': False,
            'filters': {  # correlation ID filter must be added here to make the %(correlation_id)s formatter work
                'correlation_id': {
                    '()': 'asgi_correlation_id.CorrelationIdFilter',
                    'uuid_length': 8 if not settings.ENVIRONMENT == 'local' else 32,
                    'default_value': '-',
                },
            },
            'formatters': {
                'console': {
                    'class': 'logging.Formatter',
                    'datefmt': '%H:%M:%S',
                    # formatter decides how our console logs look, and what info is included.
                    # adding %(correlation_id)s to this format is what make correlation IDs appear in our logs
                    'format': '%(levelname)s:\t\b%(asctime)s %(name)s:%(lineno)d [%(correlation_id)s] %(message)s',
                },
            },
            'handlers': {
                'console': {
                    'class': 'logging.StreamHandler',
                    # Filter must be declared in the handler, otherwise it won't be included
                    'filters': ['correlation_id'],
                    'formatter': 'console',
                },
            },
            # Loggers can be specified to set the log-level to log, and which handlers to use
            'loggers': {
                # project logger
                'app': {'handlers': ['console'], 'level': 'DEBUG', 'propagate': True},
                # third-party package loggers
                'databases': {'handlers': ['console'], 'level': 'WARNING'},
                'httpx': {'handlers': ['console'], 'level': 'INFO'},
                'asgi_correlation_id': {'handlers': ['console'], 'level': 'WARNING'},
            },
        }
    )
```

With the logging configuration defined within a function like this, all you have to do is make sure to run the function
on startup somehow, and logging should work for you. You can do this any way you'd like, but passing it to
the `FastAPI.on_startup` list of callables is a good starting point.

# Integration with structlog

[structlog](https://www.structlog.org/) is a Python library that enables structured logging.

It is trivial to configure with `asgi_correlation_id`:

```python
import logging
from typing import Any

import structlog
from asgi_correlation_id import correlation_id


def add_correlation(
    logger: logging.Logger, method_name: str, event_dict: dict[str, Any]
) -> dict[str, Any]:
    """Add request id to log message."""
    if request_id := correlation_id.get():
        event_dict["request_id"] = request_id
    return event_dict


structlog.configure(
    processors=[
        add_correlation,
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M.%S"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer(),
    ],
    wrapper_class=structlog.stdlib.BoundLogger,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)
```

# Integration with [SAQ](https://github.com/tobymao/saq)

If you're using [saq](https://github.com/tobymao/saq/), you
can easily transfer request IDs from the web server to your
workers by using the event hooks provided by the library:

```python
from uuid import uuid4

from asgi_correlation_id import correlation_id
from saq import Job, Queue


CID_TRANSFER_KEY = 'correlation_id'


async def before_enqueue(job: Job) -> None:
    """
    Transfer the correlation ID from the current context to the worker.

    This might be called from a web server or a worker process.
    """
    job.meta[CID_TRANSFER_KEY] = correlation_id.get() or uuid4()


async def before_process(ctx: dict) -> None:
    """
    Load correlation ID from the enqueueing process to this one.
    """
    correlation_id.set(ctx['job'].meta.get(CID_TRANSFER_KEY, uuid4()))


async def after_process(ctx: dict) -> None:
    """
    Reset correlation ID for this process.
    """
    correlation_id.set(None)

queue = Queue(...)
queue.register_before_enqueue(before_enqueue)

priority_settings = {
    ...,
    'queue': queue,
    'before_process': before_process,
    'after_process': after_process,
}
```

# Integration with [hypercorn](https://github.com/pgjones/hypercorn)
To add a correlation ID to your [hypercorn](https://github.com/pgjones/hypercorn) logs, you'll need to add a log filter and change the log formatting. Here's an example of how to configure hypercorn, if you're running a [FastAPI](https://fastapi.tiangolo.com/deployment/manually/) app:

```python
import asyncio
import logging
import os

import asgi_correlation_id
from fastapi import APIRouter, FastAPI
from hypercorn.asyncio import serve
from hypercorn.config import Config


def configure_logging():
    console_handler = logging.StreamHandler()
    console_handler.addFilter(asgi_correlation_id.CorrelationIdFilter())
    logging.basicConfig(
        handlers=[console_handler],
        level="INFO",
        format="%(levelname)s log [%(correlation_id)s] %(name)s %(message)s",
    )


fastapi_app = FastAPI(on_startup=[configure_logging])
router = APIRouter()


@router.get("/test")
async def test_get():
    logger = logging.getLogger()
    logger.info("test_get")


fastapi_app.include_router(router)
app = asgi_correlation_id.CorrelationIdMiddleware(fastapi_app)


if __name__ == "__main__":
    log_config = {
        "version": 1,
        "filters": {
            "correlation_id": {
                "()": "asgi_correlation_id.CorrelationIdFilter",
            },
        },
        "handlers": {
            "hypercorn.access": {
                "formatter": "hypercorn.access",
                "level": "INFO",
                "class": "logging.StreamHandler",
                "stream": "ext://sys.stdout",
                "filters": ["correlation_id"],
            },
        },
        "formatters": {
            "hypercorn.access": {
                "format": "%(message)s %(correlation_id)s",
            },
        },
        "loggers": {
            "hypercorn.access": {
                "handlers": ["hypercorn.access"],
                "level": "INFO",
            },
        },
    }

    config = Config()
    config.accesslog = "-"
    config.logconfig_dict = log_config
    asyncio.run(serve(app, config))
```

```
# run it
$ python3 test.py

# test it:
$ curl http://localhost:8080/test

# log on stdout:
INFO log [7e7ccfff352a428991920d1da2502674] root test_get
127.0.0.1:34754 - - [14/Dec/2023:10:34:08 +0100] "GET /test 1.1" 200 4 "-" "curl/7.76.1" 7e7ccfff352a428991920d1da2502674
```

# Integration with [Uvicorn](https://github.com/encode/uvicorn)
To add a correlation ID to your [uvicorn](https://github.com/encode/uvicorn) logs, you'll need to add a log filter and change the log formatting. Here's an example of how to configure uvicorn, if you're running a [FastAPI](https://fastapi.tiangolo.com/deployment/manually/) app:

```python
import logging
import os

import asgi_correlation_id
import uvicorn
from fastapi import APIRouter, FastAPI
from uvicorn.config import LOGGING_CONFIG


def configure_logging():
    console_handler = logging.StreamHandler()
    console_handler.addFilter(asgi_correlation_id.CorrelationIdFilter())
    logging.basicConfig(
        handlers=[console_handler],
        level="INFO",
        format="%(levelname)s log [%(correlation_id)s] %(name)s %(message)s",
    )


fastapi_app = FastAPI(on_startup=[configure_logging])
router = APIRouter()


@router.get("/test")
async def test_get():
    logger = logging.getLogger()
    logger.info("test_get")


fastapi_app.include_router(router)
app = asgi_correlation_id.CorrelationIdMiddleware(fastapi_app)


if __name__ == "__main__":
    LOGGING_CONFIG["filters"] = {
        "correlation_id": {
            "()": "asgi_correlation_id.CorrelationIdFilter",
        },
    }
    LOGGING_CONFIG["handlers"]["access"]["filters"] = ["correlation_id"]
    LOGGING_CONFIG["formatters"]["access"]["fmt"] = "%(levelname)s access [%(correlation_id)s] %(name)s %(message)s"
    uvicorn.run("test:app", port=8080, log_level=os.environ.get("LOGLEVEL", "DEBUG").lower())
```

```
# run it
python test.py

# test it
curl http://localhost:8080/test

# log on stdout
INFO log [16b61d57f9ff4a85ac80f5cd406e0aa2] root test_get
INFO access [16b61d57f9ff4a85ac80f5cd406e0aa2] uvicorn.access 127.0.0.1:24810 - "GET /test HTTP/1.1" 200
```

# Sentry

If your project has [sentry-sdk](https://pypi.org/project/sentry-sdk/)
installed, correlation IDs will automatically be added to Sentry events as a `transaction_id`.

See
this [blogpost](https://blog.sentry.io/2019/04/04/trace-errors-through-stack-using-unique-identifiers-in-sentry#1-generate-a-unique-identifier-and-set-as-a-sentry-tag-on-issuing-service)
for a little bit of detail. The transaction ID is displayed in the event detail view in Sentry and is just an easy way
to connect logs to a Sentry event.

# Integration with Celery

Celery workers run as separate processes, so correlation IDs are lost when spawning background tasks from requests.
However, you can transfer correlation IDs to workers using Celery signals.

Add the following to your Celery configuration (e.g., `celery.py`):

```python
from uuid import uuid4

from celery.signals import before_task_publish, task_postrun, task_prerun

from asgi_correlation_id import correlation_id

CORRELATION_ID_HEADER = 'CORRELATION_ID'


@before_task_publish.connect(weak=False)
def transfer_correlation_id(headers: dict[str, str], **kwargs) -> None:
    """Transfer correlation ID from the request to the Celery task headers."""
    cid = correlation_id.get()
    if cid:
        headers[CORRELATION_ID_HEADER] = cid


@task_prerun.connect(weak=False)
def load_correlation_id(task, **kwargs) -> None:
    """Load correlation ID from task headers, or generate a new one."""
    id_value = task.request.get(CORRELATION_ID_HEADER)
    if id_value:
        correlation_id.set(id_value)
    else:
        correlation_id.set(uuid4().hex)


@task_postrun.connect(weak=False)
def cleanup_correlation_id(**kwargs) -> None:
    """Clear the correlation ID after the task completes."""
    correlation_id.set(None)
```
