Compare commits

...

1 Commits

Author SHA1 Message Date
Peter
b037797a33 OPT project structure
All checks were successful
SonarQube Scan / sonarqube (pull_request) Successful in 54s
2025-12-29 16:46:41 +01:00
6 changed files with 497 additions and 0 deletions

1
src/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Schiphol flight data producer."""

100
src/main.py Normal file
View File

@ -0,0 +1,100 @@
from src.settings import SchipholApiSettings, KafkaSettings
from requests_ratelimiter import LimiterSession
from kstreams import create_engine
from kstreams.backends.kafka import Kafka
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
import aiorun
import json
import logging
logger = logging.getLogger(__name__)
async def start():
# Initialize OpenTelemetry tracing (minimal, env-driven)
resource = Resource.create({"service.name": "schiphol-producer"})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
RequestsInstrumentor().instrument()
tracer = trace.get_tracer("schiphol")
api_settings = SchipholApiSettings()
kafka_settings = KafkaSettings()
backend = Kafka(bootstrap_servers=kafka_settings.bootstrap_servers)
stream_engine = create_engine(title="schiphol-flights-engine", backend=backend)
await stream_engine.start()
session = LimiterSession(per_second=1)
session.headers.update(
{
"Accept": "application/json",
"app_id": api_settings.api_id,
"app_key": api_settings.api_key,
"ResourceVersion": "v4",
}
)
next_url = api_settings.base_url
total_flights = 0
while next_url:
with tracer.start_as_current_span(
"fetch_page", attributes={"http.url": next_url}
):
response = session.get(next_url)
response.raise_for_status()
data = response.json()
flights = data.get("flights", [])
for flight in flights:
flight_id = str(flight.get("id"))
flight_bytes = json.dumps(flight, default=str).encode()
with tracer.start_as_current_span(
"publish_flight",
attributes={
"messaging.system": "kafka",
"messaging.destination": kafka_settings.topic,
"messaging.kafka.message_key": flight_id,
},
):
await stream_engine.send(
kafka_settings.topic,
value=flight_bytes,
key=flight_id,
)
total_flights += len(flights)
next_url = None
if "link" in response.headers:
for link in response.headers["link"].split(","):
if 'rel="next"' in link:
next_url = link.split(";")[0].strip().strip("<>")
break
logger.info(
f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'"
)
await stream_engine.stop()
async def shutdown(loop):
logger.info("Shutdown")
def main():
logging.basicConfig(level=logging.INFO)
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
if __name__ == "__main__":
main()

43
src/middelware.py Normal file
View File

@ -0,0 +1,43 @@
# middleware.py
from kstreams import middleware, ConsumerRecord
from schema_registry.client import AsyncSchemaRegistryClient
from schema_registry.serializers import AsyncJsonMessageSerializer
from typing import Optional, Dict
import logging
logger = logging.getLogger(__name__)
class ConfluentJsonSchemaMiddleware(
middleware.BaseMiddleware, AsyncJsonMessageSerializer
):
"""
Middleware to deserialize JSON messages using Confluent Schema Registry
"""
def __init__(
self,
*,
schema_registry_client: AsyncSchemaRegistryClient,
reader_schema: Optional[Dict] = None,
**kwargs,
):
super().__init__(**kwargs)
self.schemaregistry_client = schema_registry_client
self.reader_schema = reader_schema
self.id_to_decoder: Dict = {}
async def __call__(self, cr: ConsumerRecord):
"""
Deserialize the event from bytes to dict
"""
try:
data = await self.decode_message(cr.value)
cr.value = data
except Exception as e:
logger.error(f"Failed to deserialize message: {e}")
# Optionally: handle error or pass through
raise
return await self.next_call(cr)

183
src/models.py Normal file
View File

@ -0,0 +1,183 @@
from __future__ import annotations
from datetime import date, datetime
from enum import Enum
from pydantic import BaseModel, Field
class BaggageClaimType(BaseModel):
belts: list[str] | None = None
class RemarksType(BaseModel):
remarks: list[str] | None = None
class CodesharesType(BaseModel):
codeshares: list[str] | None = None
class PublicName(BaseModel):
dutch: str | None = None
english: str | None = None
class Destination(BaseModel):
city: str | None = None
country: str | None = None
iata: str | None = None
publicName: PublicName | None = None
class Airline(BaseModel):
iata: str | None = None
icao: str | None = None
nvls: int | None = None
publicName: str | None = None
class AirlineList(BaseModel):
airlines: list[Airline] | None = None
class AircraftType(BaseModel):
iataMain: str | None = None
iataSub: str | None = None
longDescription: str | None = None
shortDescription: str | None = None
class AircraftTypeList(BaseModel):
aircraftTypes: list[AircraftType] | None = None
class FlightDirection(Enum):
A = "A"
D = "D"
class RouteType(BaseModel):
destinations: list[str] | None = None
eu: str | None = Field(
None, description="S (Schengen), E (Europe) or N (non-Europe)"
)
visa: bool | None = Field(
None, description="Indicates if a visum is required for destination"
)
class AircraftTypeType(BaseModel):
iataMain: str | None = None
iataSub: str | None = None
class CheckinClassType(BaseModel):
code: str | None = None
description: str | None = None
class TransferPositionsType(BaseModel):
transferPositions: list[int] | None = None
class PublicFlightStateType(BaseModel):
flightStates: list[str] | None = None
class DestinationList(BaseModel):
destinations: list[Destination] | None = None
class DeskType(BaseModel):
checkinClass: CheckinClassType | None = None
position: int | None = None
class DesksType(BaseModel):
desks: list[DeskType] | None = None
class RowType(BaseModel):
position: str | None = None
desks: DesksType | None = None
class RowsType(BaseModel):
rows: list[RowType] | None = None
class CheckinAllocationType(BaseModel):
endTime: datetime | None = None
rows: RowsType | None = None
startTime: datetime | None = None
class CheckinAllocationsType(BaseModel):
checkinAllocations: list[CheckinAllocationType] | None = None
remarks: RemarksType | None = None
class Flight(BaseModel):
lastUpdatedAt: datetime | None = None
actualLandingTime: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
actualOffBlockTime: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
aircraftRegistration: str | None = None
aircraftType: AircraftTypeType | None = None
baggageClaim: BaggageClaimType | None = None
checkinAllocations: CheckinAllocationsType | None = None
codeshares: CodesharesType | None = None
estimatedLandingTime: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
expectedTimeBoarding: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
expectedTimeGateClosing: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
expectedTimeGateOpen: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
expectedTimeOnBelt: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
expectedSecurityFilter: str | None = Field(
None, description="expected security filter"
)
flightDirection: FlightDirection | None = None
flightName: str | None = None
flightNumber: int | None = None
gate: str | None = None
pier: str | None = None
id: str | None = None
isOperationalFlight: bool | None = None
mainFlight: str | None = None
prefixIATA: str | None = None
prefixICAO: str | None = None
airlineCode: int | None = None
publicEstimatedOffBlockTime: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
publicFlightState: PublicFlightStateType | None = None
route: RouteType | None = None
scheduleDateTime: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
scheduleDate: date | None = Field(None, description="yyyy-MM-dd")
scheduleTime: str | None = Field(None, description="hh:mm:ss")
serviceType: str | None = Field(
None,
description="The service type category of the commercial flight. For example: J = Passenger Line, C=Passenger Charter, F = Freight Line and H = Freight Charter etc.",
)
terminal: int | None = None
transferPositions: TransferPositionsType | None = None
schemaVersion: str | None = None
class FlightList(BaseModel):
flights: list[Flight] | None = None

28
src/settings.py Normal file
View File

@ -0,0 +1,28 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
class SchipholApiSettings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
env_prefix="SCHIPHOL_",
extra="ignore",
)
api_id: str | None = ""
api_key: str | None = ""
base_url: str | None = ""
class KafkaSettings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
env_prefix="KAFKA_",
extra="ignore",
)
bootstrap_servers: list[str] = []
schema_registry_url: str = ""
topic: str = "flights"
group_id: str = "schiphol-flights-group"

142
tools/create_schema.py Normal file
View File

@ -0,0 +1,142 @@
#!/usr/bin/env python3
import json
import requests
from typing import Dict, Any
from copy import deepcopy
# Configuration
SCHEMA_REGISTRY_URL = "http://192.168.1.78:8081"
OPENAPI_FILE = "openapi.json"
def resolve_refs(schema: Any, all_schemas: Dict[str, Any], visited: set = None) -> Any:
"""
Recursively resolve $ref references in a schema.
"""
if visited is None:
visited = set()
if isinstance(schema, dict):
if "$ref" in schema:
ref = schema["$ref"]
# Extract schema name from #/components/schemas/SchemaName
if ref.startswith("#/components/schemas/"):
schema_name = ref. split("/")[-1]
# Prevent infinite recursion
if schema_name in visited:
return {"type": "object", "description": f"Circular reference to {schema_name}"}
visited.add(schema_name)
if schema_name in all_schemas:
# Recursively resolve the referenced schema
resolved = resolve_refs(deepcopy(all_schemas[schema_name]), all_schemas, visited. copy())
return resolved
else:
print(f" Warning: Referenced schema '{schema_name}' not found")
return schema
else:
return schema
else:
# Recursively resolve refs in nested objects
return {k: resolve_refs(v, all_schemas, visited. copy()) for k, v in schema.items()}
elif isinstance(schema, list):
return [resolve_refs(item, all_schemas, visited.copy()) for item in schema]
else:
return schema
def convert_openapi_to_json_schema(schema_def: Dict[str, Any], all_schemas: Dict[str, Any]) -> Dict[str, Any]:
"""
Convert OpenAPI schema to JSON Schema format with resolved references.
"""
# First resolve all $ref references
resolved_schema = resolve_refs(deepcopy(schema_def), all_schemas)
# Add JSON Schema metadata
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
**resolved_schema
}
return json_schema
def register_schema(subject: str, schema_def: Dict[str, Any], all_schemas: Dict[str, Any]) -> bool:
"""Register a schema with the Schema Registry."""
try:
# Convert to JSON Schema with resolved references
json_schema = convert_openapi_to_json_schema(schema_def, all_schemas)
# Schema Registry expects the schema as a JSON string
schema_str = json.dumps(json_schema)
# Prepare payload for Schema Registry
payload = {
"schemaType": "JSON",
"schema": schema_str
}
# Register schema
url = f"{SCHEMA_REGISTRY_URL}/subjects/{subject}/versions"
response = requests.post(
url,
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
json=payload
)
if response. status_code == 200:
result = response.json()
print(f"✓ Registered '{subject}'")
print(f" Schema ID: {result['id']}")
return True
else:
print(f"✗ Failed to register '{subject}'")
print(f" Status: {response.status_code}")
print(f" Error: {response.text}")
return False
except Exception as e:
print(f"✗ Error registering '{subject}': {str(e)}")
return False
def main():
# Load OpenAPI file
with open(OPENAPI_FILE, 'r') as f:
openapi = json.load(f)
# Extract schemas
schemas = openapi.get('components', {}).get('schemas', {})
if not schemas:
print("No schemas found in OpenAPI file!")
return
print(f"Found {len(schemas)} schemas in OpenAPI file\n")
print("=" * 70)
# Register each schema
success_count = 0
failed_count = 0
for schema_name, schema_def in schemas.items():
# Create subject name
subject = f"schiphol.{schema_name. lower()}-value"
if register_schema(subject, schema_def, schemas):
success_count += 1
else:
failed_count += 1
print()
print("=" * 70)
print(f"\nRegistration complete!")
print(f" ✓ Success: {success_count}")
print(f" ✗ Failed: {failed_count}")
print(f"\nTo list all registered subjects:")
print(f" curl {SCHEMA_REGISTRY_URL}/subjects | jq .")
print(f"\nTo view a specific schema:")
print(f" curl {SCHEMA_REGISTRY_URL}/subjects/schiphol.flight-value/versions/latest | jq -r '.schema | fromjson'")
if __name__ == "__main__":
main()