Compare commits
1 Commits
89379eca72
...
b037797a33
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b037797a33 |
1
src/__init__.py
Normal file
1
src/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
"""Schiphol flight data producer."""
|
||||
100
src/main.py
Normal file
100
src/main.py
Normal file
@ -0,0 +1,100 @@
|
||||
from src.settings import SchipholApiSettings, KafkaSettings
|
||||
from requests_ratelimiter import LimiterSession
|
||||
from kstreams import create_engine
|
||||
from kstreams.backends.kafka import Kafka
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.instrumentation.requests import RequestsInstrumentor
|
||||
import aiorun
|
||||
import json
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def start():
|
||||
# Initialize OpenTelemetry tracing (minimal, env-driven)
|
||||
resource = Resource.create({"service.name": "schiphol-producer"})
|
||||
provider = TracerProvider(resource=resource)
|
||||
trace.set_tracer_provider(provider)
|
||||
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
|
||||
RequestsInstrumentor().instrument()
|
||||
tracer = trace.get_tracer("schiphol")
|
||||
|
||||
api_settings = SchipholApiSettings()
|
||||
kafka_settings = KafkaSettings()
|
||||
|
||||
backend = Kafka(bootstrap_servers=kafka_settings.bootstrap_servers)
|
||||
stream_engine = create_engine(title="schiphol-flights-engine", backend=backend)
|
||||
await stream_engine.start()
|
||||
|
||||
session = LimiterSession(per_second=1)
|
||||
session.headers.update(
|
||||
{
|
||||
"Accept": "application/json",
|
||||
"app_id": api_settings.api_id,
|
||||
"app_key": api_settings.api_key,
|
||||
"ResourceVersion": "v4",
|
||||
}
|
||||
)
|
||||
|
||||
next_url = api_settings.base_url
|
||||
total_flights = 0
|
||||
|
||||
while next_url:
|
||||
with tracer.start_as_current_span(
|
||||
"fetch_page", attributes={"http.url": next_url}
|
||||
):
|
||||
response = session.get(next_url)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
flights = data.get("flights", [])
|
||||
for flight in flights:
|
||||
flight_id = str(flight.get("id"))
|
||||
flight_bytes = json.dumps(flight, default=str).encode()
|
||||
|
||||
with tracer.start_as_current_span(
|
||||
"publish_flight",
|
||||
attributes={
|
||||
"messaging.system": "kafka",
|
||||
"messaging.destination": kafka_settings.topic,
|
||||
"messaging.kafka.message_key": flight_id,
|
||||
},
|
||||
):
|
||||
await stream_engine.send(
|
||||
kafka_settings.topic,
|
||||
value=flight_bytes,
|
||||
key=flight_id,
|
||||
)
|
||||
|
||||
total_flights += len(flights)
|
||||
|
||||
next_url = None
|
||||
if "link" in response.headers:
|
||||
for link in response.headers["link"].split(","):
|
||||
if 'rel="next"' in link:
|
||||
next_url = link.split(";")[0].strip().strip("<>")
|
||||
break
|
||||
|
||||
logger.info(
|
||||
f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'"
|
||||
)
|
||||
|
||||
await stream_engine.stop()
|
||||
|
||||
|
||||
async def shutdown(loop):
|
||||
logger.info("Shutdown")
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
43
src/middelware.py
Normal file
43
src/middelware.py
Normal file
@ -0,0 +1,43 @@
|
||||
# middleware.py
|
||||
from kstreams import middleware, ConsumerRecord
|
||||
from schema_registry.client import AsyncSchemaRegistryClient
|
||||
from schema_registry.serializers import AsyncJsonMessageSerializer
|
||||
from typing import Optional, Dict
|
||||
import logging
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConfluentJsonSchemaMiddleware(
|
||||
middleware.BaseMiddleware, AsyncJsonMessageSerializer
|
||||
):
|
||||
"""
|
||||
Middleware to deserialize JSON messages using Confluent Schema Registry
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
schema_registry_client: AsyncSchemaRegistryClient,
|
||||
reader_schema: Optional[Dict] = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self.schemaregistry_client = schema_registry_client
|
||||
self.reader_schema = reader_schema
|
||||
self.id_to_decoder: Dict = {}
|
||||
|
||||
async def __call__(self, cr: ConsumerRecord):
|
||||
"""
|
||||
Deserialize the event from bytes to dict
|
||||
"""
|
||||
try:
|
||||
data = await self.decode_message(cr.value)
|
||||
cr.value = data
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to deserialize message: {e}")
|
||||
# Optionally: handle error or pass through
|
||||
raise
|
||||
|
||||
return await self.next_call(cr)
|
||||
183
src/models.py
Normal file
183
src/models.py
Normal file
@ -0,0 +1,183 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date, datetime
|
||||
from enum import Enum
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class BaggageClaimType(BaseModel):
|
||||
belts: list[str] | None = None
|
||||
|
||||
|
||||
class RemarksType(BaseModel):
|
||||
remarks: list[str] | None = None
|
||||
|
||||
|
||||
class CodesharesType(BaseModel):
|
||||
codeshares: list[str] | None = None
|
||||
|
||||
|
||||
class PublicName(BaseModel):
|
||||
dutch: str | None = None
|
||||
english: str | None = None
|
||||
|
||||
|
||||
class Destination(BaseModel):
|
||||
city: str | None = None
|
||||
country: str | None = None
|
||||
iata: str | None = None
|
||||
publicName: PublicName | None = None
|
||||
|
||||
|
||||
class Airline(BaseModel):
|
||||
iata: str | None = None
|
||||
icao: str | None = None
|
||||
nvls: int | None = None
|
||||
publicName: str | None = None
|
||||
|
||||
|
||||
class AirlineList(BaseModel):
|
||||
airlines: list[Airline] | None = None
|
||||
|
||||
|
||||
class AircraftType(BaseModel):
|
||||
iataMain: str | None = None
|
||||
iataSub: str | None = None
|
||||
longDescription: str | None = None
|
||||
shortDescription: str | None = None
|
||||
|
||||
|
||||
class AircraftTypeList(BaseModel):
|
||||
aircraftTypes: list[AircraftType] | None = None
|
||||
|
||||
|
||||
class FlightDirection(Enum):
|
||||
A = "A"
|
||||
D = "D"
|
||||
|
||||
|
||||
class RouteType(BaseModel):
|
||||
destinations: list[str] | None = None
|
||||
eu: str | None = Field(
|
||||
None, description="S (Schengen), E (Europe) or N (non-Europe)"
|
||||
)
|
||||
visa: bool | None = Field(
|
||||
None, description="Indicates if a visum is required for destination"
|
||||
)
|
||||
|
||||
|
||||
class AircraftTypeType(BaseModel):
|
||||
iataMain: str | None = None
|
||||
iataSub: str | None = None
|
||||
|
||||
|
||||
class CheckinClassType(BaseModel):
|
||||
code: str | None = None
|
||||
description: str | None = None
|
||||
|
||||
|
||||
class TransferPositionsType(BaseModel):
|
||||
transferPositions: list[int] | None = None
|
||||
|
||||
|
||||
class PublicFlightStateType(BaseModel):
|
||||
flightStates: list[str] | None = None
|
||||
|
||||
|
||||
class DestinationList(BaseModel):
|
||||
destinations: list[Destination] | None = None
|
||||
|
||||
|
||||
class DeskType(BaseModel):
|
||||
checkinClass: CheckinClassType | None = None
|
||||
position: int | None = None
|
||||
|
||||
|
||||
class DesksType(BaseModel):
|
||||
desks: list[DeskType] | None = None
|
||||
|
||||
|
||||
class RowType(BaseModel):
|
||||
position: str | None = None
|
||||
desks: DesksType | None = None
|
||||
|
||||
|
||||
class RowsType(BaseModel):
|
||||
rows: list[RowType] | None = None
|
||||
|
||||
|
||||
class CheckinAllocationType(BaseModel):
|
||||
endTime: datetime | None = None
|
||||
rows: RowsType | None = None
|
||||
startTime: datetime | None = None
|
||||
|
||||
|
||||
class CheckinAllocationsType(BaseModel):
|
||||
checkinAllocations: list[CheckinAllocationType] | None = None
|
||||
remarks: RemarksType | None = None
|
||||
|
||||
|
||||
class Flight(BaseModel):
|
||||
lastUpdatedAt: datetime | None = None
|
||||
actualLandingTime: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
actualOffBlockTime: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
aircraftRegistration: str | None = None
|
||||
aircraftType: AircraftTypeType | None = None
|
||||
baggageClaim: BaggageClaimType | None = None
|
||||
checkinAllocations: CheckinAllocationsType | None = None
|
||||
codeshares: CodesharesType | None = None
|
||||
estimatedLandingTime: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
expectedTimeBoarding: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
expectedTimeGateClosing: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
expectedTimeGateOpen: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
expectedTimeOnBelt: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
expectedSecurityFilter: str | None = Field(
|
||||
None, description="expected security filter"
|
||||
)
|
||||
flightDirection: FlightDirection | None = None
|
||||
flightName: str | None = None
|
||||
flightNumber: int | None = None
|
||||
gate: str | None = None
|
||||
pier: str | None = None
|
||||
id: str | None = None
|
||||
isOperationalFlight: bool | None = None
|
||||
mainFlight: str | None = None
|
||||
prefixIATA: str | None = None
|
||||
prefixICAO: str | None = None
|
||||
airlineCode: int | None = None
|
||||
publicEstimatedOffBlockTime: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
publicFlightState: PublicFlightStateType | None = None
|
||||
route: RouteType | None = None
|
||||
scheduleDateTime: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
scheduleDate: date | None = Field(None, description="yyyy-MM-dd")
|
||||
scheduleTime: str | None = Field(None, description="hh:mm:ss")
|
||||
serviceType: str | None = Field(
|
||||
None,
|
||||
description="The service type category of the commercial flight. For example: J = Passenger Line, C=Passenger Charter, F = Freight Line and H = Freight Charter etc.",
|
||||
)
|
||||
terminal: int | None = None
|
||||
transferPositions: TransferPositionsType | None = None
|
||||
schemaVersion: str | None = None
|
||||
|
||||
|
||||
class FlightList(BaseModel):
|
||||
flights: list[Flight] | None = None
|
||||
28
src/settings.py
Normal file
28
src/settings.py
Normal file
@ -0,0 +1,28 @@
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
|
||||
class SchipholApiSettings(BaseSettings):
|
||||
model_config = SettingsConfigDict(
|
||||
env_file=".env",
|
||||
env_file_encoding="utf-8",
|
||||
env_prefix="SCHIPHOL_",
|
||||
extra="ignore",
|
||||
)
|
||||
|
||||
api_id: str | None = ""
|
||||
api_key: str | None = ""
|
||||
base_url: str | None = ""
|
||||
|
||||
|
||||
class KafkaSettings(BaseSettings):
|
||||
model_config = SettingsConfigDict(
|
||||
env_file=".env",
|
||||
env_file_encoding="utf-8",
|
||||
env_prefix="KAFKA_",
|
||||
extra="ignore",
|
||||
)
|
||||
|
||||
bootstrap_servers: list[str] = []
|
||||
schema_registry_url: str = ""
|
||||
topic: str = "flights"
|
||||
group_id: str = "schiphol-flights-group"
|
||||
142
tools/create_schema.py
Normal file
142
tools/create_schema.py
Normal file
@ -0,0 +1,142 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import requests
|
||||
from typing import Dict, Any
|
||||
from copy import deepcopy
|
||||
|
||||
# Configuration
|
||||
SCHEMA_REGISTRY_URL = "http://192.168.1.78:8081"
|
||||
OPENAPI_FILE = "openapi.json"
|
||||
|
||||
def resolve_refs(schema: Any, all_schemas: Dict[str, Any], visited: set = None) -> Any:
|
||||
"""
|
||||
Recursively resolve $ref references in a schema.
|
||||
"""
|
||||
if visited is None:
|
||||
visited = set()
|
||||
|
||||
if isinstance(schema, dict):
|
||||
if "$ref" in schema:
|
||||
ref = schema["$ref"]
|
||||
|
||||
# Extract schema name from #/components/schemas/SchemaName
|
||||
if ref.startswith("#/components/schemas/"):
|
||||
schema_name = ref. split("/")[-1]
|
||||
|
||||
# Prevent infinite recursion
|
||||
if schema_name in visited:
|
||||
return {"type": "object", "description": f"Circular reference to {schema_name}"}
|
||||
|
||||
visited.add(schema_name)
|
||||
|
||||
if schema_name in all_schemas:
|
||||
# Recursively resolve the referenced schema
|
||||
resolved = resolve_refs(deepcopy(all_schemas[schema_name]), all_schemas, visited. copy())
|
||||
return resolved
|
||||
else:
|
||||
print(f" Warning: Referenced schema '{schema_name}' not found")
|
||||
return schema
|
||||
else:
|
||||
return schema
|
||||
else:
|
||||
# Recursively resolve refs in nested objects
|
||||
return {k: resolve_refs(v, all_schemas, visited. copy()) for k, v in schema.items()}
|
||||
elif isinstance(schema, list):
|
||||
return [resolve_refs(item, all_schemas, visited.copy()) for item in schema]
|
||||
else:
|
||||
return schema
|
||||
|
||||
def convert_openapi_to_json_schema(schema_def: Dict[str, Any], all_schemas: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Convert OpenAPI schema to JSON Schema format with resolved references.
|
||||
"""
|
||||
# First resolve all $ref references
|
||||
resolved_schema = resolve_refs(deepcopy(schema_def), all_schemas)
|
||||
|
||||
# Add JSON Schema metadata
|
||||
json_schema = {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
**resolved_schema
|
||||
}
|
||||
|
||||
return json_schema
|
||||
|
||||
def register_schema(subject: str, schema_def: Dict[str, Any], all_schemas: Dict[str, Any]) -> bool:
|
||||
"""Register a schema with the Schema Registry."""
|
||||
|
||||
try:
|
||||
# Convert to JSON Schema with resolved references
|
||||
json_schema = convert_openapi_to_json_schema(schema_def, all_schemas)
|
||||
|
||||
# Schema Registry expects the schema as a JSON string
|
||||
schema_str = json.dumps(json_schema)
|
||||
|
||||
# Prepare payload for Schema Registry
|
||||
payload = {
|
||||
"schemaType": "JSON",
|
||||
"schema": schema_str
|
||||
}
|
||||
|
||||
# Register schema
|
||||
url = f"{SCHEMA_REGISTRY_URL}/subjects/{subject}/versions"
|
||||
|
||||
response = requests.post(
|
||||
url,
|
||||
headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
|
||||
json=payload
|
||||
)
|
||||
|
||||
if response. status_code == 200:
|
||||
result = response.json()
|
||||
print(f"✓ Registered '{subject}'")
|
||||
print(f" Schema ID: {result['id']}")
|
||||
return True
|
||||
else:
|
||||
print(f"✗ Failed to register '{subject}'")
|
||||
print(f" Status: {response.status_code}")
|
||||
print(f" Error: {response.text}")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"✗ Error registering '{subject}': {str(e)}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
# Load OpenAPI file
|
||||
with open(OPENAPI_FILE, 'r') as f:
|
||||
openapi = json.load(f)
|
||||
|
||||
# Extract schemas
|
||||
schemas = openapi.get('components', {}).get('schemas', {})
|
||||
|
||||
if not schemas:
|
||||
print("No schemas found in OpenAPI file!")
|
||||
return
|
||||
|
||||
print(f"Found {len(schemas)} schemas in OpenAPI file\n")
|
||||
print("=" * 70)
|
||||
|
||||
# Register each schema
|
||||
success_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for schema_name, schema_def in schemas.items():
|
||||
# Create subject name
|
||||
subject = f"schiphol.{schema_name. lower()}-value"
|
||||
|
||||
if register_schema(subject, schema_def, schemas):
|
||||
success_count += 1
|
||||
else:
|
||||
failed_count += 1
|
||||
print()
|
||||
|
||||
print("=" * 70)
|
||||
print(f"\nRegistration complete!")
|
||||
print(f" ✓ Success: {success_count}")
|
||||
print(f" ✗ Failed: {failed_count}")
|
||||
print(f"\nTo list all registered subjects:")
|
||||
print(f" curl {SCHEMA_REGISTRY_URL}/subjects | jq .")
|
||||
print(f"\nTo view a specific schema:")
|
||||
print(f" curl {SCHEMA_REGISTRY_URL}/subjects/schiphol.flight-value/versions/latest | jq -r '.schema | fromjson'")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue
Block a user