from settings import SchipholApiSettings, KafkaSettings from requests_ratelimiter import LimiterSession from kstreams import create_engine from kstreams.backends.kafka import Kafka from opentelemetry import trace from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.requests import RequestsInstrumentor import aiorun import json import logging logger = logging.getLogger(__name__) async def start(): # Initialize OpenTelemetry tracing (minimal, env-driven) resource = Resource.create({"service.name": "schiphol-producer"}) provider = TracerProvider(resource=resource) trace.set_tracer_provider(provider) provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) RequestsInstrumentor().instrument() tracer = trace.get_tracer("schiphol") api_settings = SchipholApiSettings() kafka_settings = KafkaSettings() backend = Kafka(bootstrap_servers=kafka_settings.bootstrap_servers) stream_engine = create_engine(title="schiphol-flights-engine", backend=backend) await stream_engine.start() session = LimiterSession(per_second=1) session.headers.update( { "Accept": "application/json", "app_id": api_settings.api_id, "app_key": api_settings.api_key, "ResourceVersion": "v4", } ) next_url = api_settings.base_url total_flights = 0 while next_url: with tracer.start_as_current_span( "fetch_page", attributes={"http.url": next_url} ): response = session.get(next_url) response.raise_for_status() data = response.json() flights = data.get("flights", []) for flight in flights: flight_id = str(flight.get("id")) flight_bytes = json.dumps(flight, default=str).encode() with tracer.start_as_current_span( "publish_flight", attributes={ "messaging.system": "kafka", "messaging.destination": kafka_settings.topic, "messaging.kafka.message_key": flight_id, }, ): await stream_engine.send( kafka_settings.topic, value=flight_bytes, key=flight_id, ) total_flights += len(flights) next_url = None if "link" in response.headers: for link in response.headers["link"].split(","): if 'rel="next"' in link: next_url = link.split(";")[0].strip().strip("<>") break logger.info( f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'" ) await stream_engine.stop() async def shutdown(loop): logger.info("Shutdown") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)