Compare commits
1 Commits
89379eca72
...
b037797a33
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b037797a33 |
1
src/__init__.py
Normal file
1
src/__init__.py
Normal file
@ -0,0 +1 @@
|
|||||||
|
"""Schiphol flight data producer."""
|
||||||
100
src/main.py
Normal file
100
src/main.py
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
from src.settings import SchipholApiSettings, KafkaSettings
|
||||||
|
from requests_ratelimiter import LimiterSession
|
||||||
|
from kstreams import create_engine
|
||||||
|
from kstreams.backends.kafka import Kafka
|
||||||
|
from opentelemetry import trace
|
||||||
|
from opentelemetry.sdk.resources import Resource
|
||||||
|
from opentelemetry.sdk.trace import TracerProvider
|
||||||
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||||
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||||
|
from opentelemetry.instrumentation.requests import RequestsInstrumentor
|
||||||
|
import aiorun
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def start():
|
||||||
|
# Initialize OpenTelemetry tracing (minimal, env-driven)
|
||||||
|
resource = Resource.create({"service.name": "schiphol-producer"})
|
||||||
|
provider = TracerProvider(resource=resource)
|
||||||
|
trace.set_tracer_provider(provider)
|
||||||
|
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
|
||||||
|
RequestsInstrumentor().instrument()
|
||||||
|
tracer = trace.get_tracer("schiphol")
|
||||||
|
|
||||||
|
api_settings = SchipholApiSettings()
|
||||||
|
kafka_settings = KafkaSettings()
|
||||||
|
|
||||||
|
backend = Kafka(bootstrap_servers=kafka_settings.bootstrap_servers)
|
||||||
|
stream_engine = create_engine(title="schiphol-flights-engine", backend=backend)
|
||||||
|
await stream_engine.start()
|
||||||
|
|
||||||
|
session = LimiterSession(per_second=1)
|
||||||
|
session.headers.update(
|
||||||
|
{
|
||||||
|
"Accept": "application/json",
|
||||||
|
"app_id": api_settings.api_id,
|
||||||
|
"app_key": api_settings.api_key,
|
||||||
|
"ResourceVersion": "v4",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
next_url = api_settings.base_url
|
||||||
|
total_flights = 0
|
||||||
|
|
||||||
|
while next_url:
|
||||||
|
with tracer.start_as_current_span(
|
||||||
|
"fetch_page", attributes={"http.url": next_url}
|
||||||
|
):
|
||||||
|
response = session.get(next_url)
|
||||||
|
response.raise_for_status()
|
||||||
|
data = response.json()
|
||||||
|
|
||||||
|
flights = data.get("flights", [])
|
||||||
|
for flight in flights:
|
||||||
|
flight_id = str(flight.get("id"))
|
||||||
|
flight_bytes = json.dumps(flight, default=str).encode()
|
||||||
|
|
||||||
|
with tracer.start_as_current_span(
|
||||||
|
"publish_flight",
|
||||||
|
attributes={
|
||||||
|
"messaging.system": "kafka",
|
||||||
|
"messaging.destination": kafka_settings.topic,
|
||||||
|
"messaging.kafka.message_key": flight_id,
|
||||||
|
},
|
||||||
|
):
|
||||||
|
await stream_engine.send(
|
||||||
|
kafka_settings.topic,
|
||||||
|
value=flight_bytes,
|
||||||
|
key=flight_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
total_flights += len(flights)
|
||||||
|
|
||||||
|
next_url = None
|
||||||
|
if "link" in response.headers:
|
||||||
|
for link in response.headers["link"].split(","):
|
||||||
|
if 'rel="next"' in link:
|
||||||
|
next_url = link.split(";")[0].strip().strip("<>")
|
||||||
|
break
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
await stream_engine.stop()
|
||||||
|
|
||||||
|
|
||||||
|
async def shutdown(loop):
|
||||||
|
logger.info("Shutdown")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
43
src/middelware.py
Normal file
43
src/middelware.py
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
# middleware.py
|
||||||
|
from kstreams import middleware, ConsumerRecord
|
||||||
|
from schema_registry.client import AsyncSchemaRegistryClient
|
||||||
|
from schema_registry.serializers import AsyncJsonMessageSerializer
|
||||||
|
from typing import Optional, Dict
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ConfluentJsonSchemaMiddleware(
|
||||||
|
middleware.BaseMiddleware, AsyncJsonMessageSerializer
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Middleware to deserialize JSON messages using Confluent Schema Registry
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
schema_registry_client: AsyncSchemaRegistryClient,
|
||||||
|
reader_schema: Optional[Dict] = None,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.schemaregistry_client = schema_registry_client
|
||||||
|
self.reader_schema = reader_schema
|
||||||
|
self.id_to_decoder: Dict = {}
|
||||||
|
|
||||||
|
async def __call__(self, cr: ConsumerRecord):
|
||||||
|
"""
|
||||||
|
Deserialize the event from bytes to dict
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
data = await self.decode_message(cr.value)
|
||||||
|
cr.value = data
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to deserialize message: {e}")
|
||||||
|
# Optionally: handle error or pass through
|
||||||
|
raise
|
||||||
|
|
||||||
|
return await self.next_call(cr)
|
||||||
183
src/models.py
Normal file
183
src/models.py
Normal file
@ -0,0 +1,183 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import date, datetime
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
|
||||||
|
class BaggageClaimType(BaseModel):
|
||||||
|
belts: list[str] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class RemarksType(BaseModel):
|
||||||
|
remarks: list[str] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class CodesharesType(BaseModel):
|
||||||
|
codeshares: list[str] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class PublicName(BaseModel):
|
||||||
|
dutch: str | None = None
|
||||||
|
english: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class Destination(BaseModel):
|
||||||
|
city: str | None = None
|
||||||
|
country: str | None = None
|
||||||
|
iata: str | None = None
|
||||||
|
publicName: PublicName | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class Airline(BaseModel):
|
||||||
|
iata: str | None = None
|
||||||
|
icao: str | None = None
|
||||||
|
nvls: int | None = None
|
||||||
|
publicName: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class AirlineList(BaseModel):
|
||||||
|
airlines: list[Airline] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class AircraftType(BaseModel):
|
||||||
|
iataMain: str | None = None
|
||||||
|
iataSub: str | None = None
|
||||||
|
longDescription: str | None = None
|
||||||
|
shortDescription: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class AircraftTypeList(BaseModel):
|
||||||
|
aircraftTypes: list[AircraftType] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class FlightDirection(Enum):
|
||||||
|
A = "A"
|
||||||
|
D = "D"
|
||||||
|
|
||||||
|
|
||||||
|
class RouteType(BaseModel):
|
||||||
|
destinations: list[str] | None = None
|
||||||
|
eu: str | None = Field(
|
||||||
|
None, description="S (Schengen), E (Europe) or N (non-Europe)"
|
||||||
|
)
|
||||||
|
visa: bool | None = Field(
|
||||||
|
None, description="Indicates if a visum is required for destination"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class AircraftTypeType(BaseModel):
|
||||||
|
iataMain: str | None = None
|
||||||
|
iataSub: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class CheckinClassType(BaseModel):
|
||||||
|
code: str | None = None
|
||||||
|
description: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class TransferPositionsType(BaseModel):
|
||||||
|
transferPositions: list[int] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class PublicFlightStateType(BaseModel):
|
||||||
|
flightStates: list[str] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class DestinationList(BaseModel):
|
||||||
|
destinations: list[Destination] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class DeskType(BaseModel):
|
||||||
|
checkinClass: CheckinClassType | None = None
|
||||||
|
position: int | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class DesksType(BaseModel):
|
||||||
|
desks: list[DeskType] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class RowType(BaseModel):
|
||||||
|
position: str | None = None
|
||||||
|
desks: DesksType | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class RowsType(BaseModel):
|
||||||
|
rows: list[RowType] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class CheckinAllocationType(BaseModel):
|
||||||
|
endTime: datetime | None = None
|
||||||
|
rows: RowsType | None = None
|
||||||
|
startTime: datetime | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class CheckinAllocationsType(BaseModel):
|
||||||
|
checkinAllocations: list[CheckinAllocationType] | None = None
|
||||||
|
remarks: RemarksType | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class Flight(BaseModel):
|
||||||
|
lastUpdatedAt: datetime | None = None
|
||||||
|
actualLandingTime: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
|
actualOffBlockTime: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
|
aircraftRegistration: str | None = None
|
||||||
|
aircraftType: AircraftTypeType | None = None
|
||||||
|
baggageClaim: BaggageClaimType | None = None
|
||||||
|
checkinAllocations: CheckinAllocationsType | None = None
|
||||||
|
codeshares: CodesharesType | None = None
|
||||||
|
estimatedLandingTime: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
|
expectedTimeBoarding: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
|
expectedTimeGateClosing: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
|
expectedTimeGateOpen: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
|
expectedTimeOnBelt: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
|
expectedSecurityFilter: str | None = Field(
|
||||||
|
None, description="expected security filter"
|
||||||
|
)
|
||||||
|
flightDirection: FlightDirection | None = None
|
||||||
|
flightName: str | None = None
|
||||||
|
flightNumber: int | None = None
|
||||||
|
gate: str | None = None
|
||||||
|
pier: str | None = None
|
||||||
|
id: str | None = None
|
||||||
|
isOperationalFlight: bool | None = None
|
||||||
|
mainFlight: str | None = None
|
||||||
|
prefixIATA: str | None = None
|
||||||
|
prefixICAO: str | None = None
|
||||||
|
airlineCode: int | None = None
|
||||||
|
publicEstimatedOffBlockTime: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
|
publicFlightState: PublicFlightStateType | None = None
|
||||||
|
route: RouteType | None = None
|
||||||
|
scheduleDateTime: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
|
scheduleDate: date | None = Field(None, description="yyyy-MM-dd")
|
||||||
|
scheduleTime: str | None = Field(None, description="hh:mm:ss")
|
||||||
|
serviceType: str | None = Field(
|
||||||
|
None,
|
||||||
|
description="The service type category of the commercial flight. For example: J = Passenger Line, C=Passenger Charter, F = Freight Line and H = Freight Charter etc.",
|
||||||
|
)
|
||||||
|
terminal: int | None = None
|
||||||
|
transferPositions: TransferPositionsType | None = None
|
||||||
|
schemaVersion: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class FlightList(BaseModel):
|
||||||
|
flights: list[Flight] | None = None
|
||||||
28
src/settings.py
Normal file
28
src/settings.py
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||||
|
|
||||||
|
|
||||||
|
class SchipholApiSettings(BaseSettings):
|
||||||
|
model_config = SettingsConfigDict(
|
||||||
|
env_file=".env",
|
||||||
|
env_file_encoding="utf-8",
|
||||||
|
env_prefix="SCHIPHOL_",
|
||||||
|
extra="ignore",
|
||||||
|
)
|
||||||
|
|
||||||
|
api_id: str | None = ""
|
||||||
|
api_key: str | None = ""
|
||||||
|
base_url: str | None = ""
|
||||||
|
|
||||||
|
|
||||||
|
class KafkaSettings(BaseSettings):
|
||||||
|
model_config = SettingsConfigDict(
|
||||||
|
env_file=".env",
|
||||||
|
env_file_encoding="utf-8",
|
||||||
|
env_prefix="KAFKA_",
|
||||||
|
extra="ignore",
|
||||||
|
)
|
||||||
|
|
||||||
|
bootstrap_servers: list[str] = []
|
||||||
|
schema_registry_url: str = ""
|
||||||
|
topic: str = "flights"
|
||||||
|
group_id: str = "schiphol-flights-group"
|
||||||
142
tools/create_schema.py
Normal file
142
tools/create_schema.py
Normal file
@ -0,0 +1,142 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import json
|
||||||
|
import requests
|
||||||
|
from typing import Dict, Any
|
||||||
|
from copy import deepcopy
|
||||||
|
|
||||||
|
# Configuration
|
||||||
|
SCHEMA_REGISTRY_URL = "http://192.168.1.78:8081"
|
||||||
|
OPENAPI_FILE = "openapi.json"
|
||||||
|
|
||||||
|
def resolve_refs(schema: Any, all_schemas: Dict[str, Any], visited: set = None) -> Any:
|
||||||
|
"""
|
||||||
|
Recursively resolve $ref references in a schema.
|
||||||
|
"""
|
||||||
|
if visited is None:
|
||||||
|
visited = set()
|
||||||
|
|
||||||
|
if isinstance(schema, dict):
|
||||||
|
if "$ref" in schema:
|
||||||
|
ref = schema["$ref"]
|
||||||
|
|
||||||
|
# Extract schema name from #/components/schemas/SchemaName
|
||||||
|
if ref.startswith("#/components/schemas/"):
|
||||||
|
schema_name = ref. split("/")[-1]
|
||||||
|
|
||||||
|
# Prevent infinite recursion
|
||||||
|
if schema_name in visited:
|
||||||
|
return {"type": "object", "description": f"Circular reference to {schema_name}"}
|
||||||
|
|
||||||
|
visited.add(schema_name)
|
||||||
|
|
||||||
|
if schema_name in all_schemas:
|
||||||
|
# Recursively resolve the referenced schema
|
||||||
|
resolved = resolve_refs(deepcopy(all_schemas[schema_name]), all_schemas, visited. copy())
|
||||||
|
return resolved
|
||||||
|
else:
|
||||||
|
print(f" Warning: Referenced schema '{schema_name}' not found")
|
||||||
|
return schema
|
||||||
|
else:
|
||||||
|
return schema
|
||||||
|
else:
|
||||||
|
# Recursively resolve refs in nested objects
|
||||||
|
return {k: resolve_refs(v, all_schemas, visited. copy()) for k, v in schema.items()}
|
||||||
|
elif isinstance(schema, list):
|
||||||
|
return [resolve_refs(item, all_schemas, visited.copy()) for item in schema]
|
||||||
|
else:
|
||||||
|
return schema
|
||||||
|
|
||||||
|
def convert_openapi_to_json_schema(schema_def: Dict[str, Any], all_schemas: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Convert OpenAPI schema to JSON Schema format with resolved references.
|
||||||
|
"""
|
||||||
|
# First resolve all $ref references
|
||||||
|
resolved_schema = resolve_refs(deepcopy(schema_def), all_schemas)
|
||||||
|
|
||||||
|
# Add JSON Schema metadata
|
||||||
|
json_schema = {
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
**resolved_schema
|
||||||
|
}
|
||||||
|
|
||||||
|
return json_schema
|
||||||
|
|
||||||
|
def register_schema(subject: str, schema_def: Dict[str, Any], all_schemas: Dict[str, Any]) -> bool:
|
||||||
|
"""Register a schema with the Schema Registry."""
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Convert to JSON Schema with resolved references
|
||||||
|
json_schema = convert_openapi_to_json_schema(schema_def, all_schemas)
|
||||||
|
|
||||||
|
# Schema Registry expects the schema as a JSON string
|
||||||
|
schema_str = json.dumps(json_schema)
|
||||||
|
|
||||||
|
# Prepare payload for Schema Registry
|
||||||
|
payload = {
|
||||||
|
"schemaType": "JSON",
|
||||||
|
"schema": schema_str
|
||||||
|
}
|
||||||
|
|
||||||
|
# Register schema
|
||||||
|
url = f"{SCHEMA_REGISTRY_URL}/subjects/{subject}/versions"
|
||||||
|
|
||||||
|
response = requests.post(
|
||||||
|
url,
|
||||||
|
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
|
||||||
|
json=payload
|
||||||
|
)
|
||||||
|
|
||||||
|
if response. status_code == 200:
|
||||||
|
result = response.json()
|
||||||
|
print(f"✓ Registered '{subject}'")
|
||||||
|
print(f" Schema ID: {result['id']}")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print(f"✗ Failed to register '{subject}'")
|
||||||
|
print(f" Status: {response.status_code}")
|
||||||
|
print(f" Error: {response.text}")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
print(f"✗ Error registering '{subject}': {str(e)}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def main():
|
||||||
|
# Load OpenAPI file
|
||||||
|
with open(OPENAPI_FILE, 'r') as f:
|
||||||
|
openapi = json.load(f)
|
||||||
|
|
||||||
|
# Extract schemas
|
||||||
|
schemas = openapi.get('components', {}).get('schemas', {})
|
||||||
|
|
||||||
|
if not schemas:
|
||||||
|
print("No schemas found in OpenAPI file!")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"Found {len(schemas)} schemas in OpenAPI file\n")
|
||||||
|
print("=" * 70)
|
||||||
|
|
||||||
|
# Register each schema
|
||||||
|
success_count = 0
|
||||||
|
failed_count = 0
|
||||||
|
|
||||||
|
for schema_name, schema_def in schemas.items():
|
||||||
|
# Create subject name
|
||||||
|
subject = f"schiphol.{schema_name. lower()}-value"
|
||||||
|
|
||||||
|
if register_schema(subject, schema_def, schemas):
|
||||||
|
success_count += 1
|
||||||
|
else:
|
||||||
|
failed_count += 1
|
||||||
|
print()
|
||||||
|
|
||||||
|
print("=" * 70)
|
||||||
|
print(f"\nRegistration complete!")
|
||||||
|
print(f" ✓ Success: {success_count}")
|
||||||
|
print(f" ✗ Failed: {failed_count}")
|
||||||
|
print(f"\nTo list all registered subjects:")
|
||||||
|
print(f" curl {SCHEMA_REGISTRY_URL}/subjects | jq .")
|
||||||
|
print(f"\nTo view a specific schema:")
|
||||||
|
print(f" curl {SCHEMA_REGISTRY_URL}/subjects/schiphol.flight-value/versions/latest | jq -r '.schema | fromjson'")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Reference in New Issue
Block a user