Startup and shutdown in middleware

Is there a way to attach app startup / shutdown behaviours in a middleware? I used to do it using lifespan before ASGI 3.0 but it does not work anymore. Any tips?

Any particular reason not to use Starlette’s event hooks for it? https://www.starlette.io/events/

But yeah, sure you can write an ASGI middleware to deal with it.

I guess as a raw ASGI implementation it’d look something like this?..

class EventMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        if scope['type'] != 'lifespan':
            await self.app(scope, recieve, send)

        recieve = functools.partial(self.receive, parent=receive)
        await self.app(scope, receive, send)

    async def receive(self, parent):
        message = await parent()
        if message == 'lifespan.startup':
            await self.startup()
        elif message == 'lifespan.shutdown'
            await self.shutdown()
        return message

    async def startup(self):
        ...

    async def shutdown(self):
        ...
1 Like

Thanks Tom, works like a charm… Starlette event hooks are fine too :slight_smile: i was curious about how to encapsulate this behaviour in my middleware (instead of having a middleware + the event hooks).