# middleware.py from kstreams import middleware, ConsumerRecord from schema_registry.client import AsyncSchemaRegistryClient from schema_registry.serializers import AsyncJsonMessageSerializer from typing import Optional, Dict import logging logger = logging.getLogger(__name__) class ConfluentJsonSchemaMiddleware( middleware.BaseMiddleware, AsyncJsonMessageSerializer ): """ Middleware to deserialize JSON messages using Confluent Schema Registry """ def __init__( self, *, schema_registry_client: AsyncSchemaRegistryClient, reader_schema: Optional[Dict] = None, **kwargs, ): super().__init__(**kwargs) self.schemaregistry_client = schema_registry_client self.reader_schema = reader_schema self.id_to_decoder: Dict = {} async def __call__(self, cr: ConsumerRecord): """ Deserialize the event from bytes to dict """ try: data = await self.decode_message(cr.value) cr.value = data except Exception as e: logger.error(f"Failed to deserialize message: {e}") # Optionally: handle error or pass through raise return await self.next_call(cr)