Compare commits
No commits in common. "b4088c8a93515fc5e14916e6cd1c4565f74128f5" and "bb9d75a26d025c77b7ed524f5e90872dedd11eaf" have entirely different histories.
b4088c8a93
...
bb9d75a26d
@ -1,25 +0,0 @@
|
|||||||
name: Lint and Format
|
|
||||||
|
|
||||||
on:
|
|
||||||
pull_request:
|
|
||||||
branches:
|
|
||||||
- main
|
|
||||||
push:
|
|
||||||
branches:
|
|
||||||
- main
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
lint:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- name: Checkout code
|
|
||||||
uses: actions/checkout@v4
|
|
||||||
|
|
||||||
- name: Install uv
|
|
||||||
uses: astral-sh/setup-uv@v5
|
|
||||||
|
|
||||||
- name: Check formatting with ruff
|
|
||||||
run: uvx ruff format --check src/
|
|
||||||
|
|
||||||
- name: Lint with ruff
|
|
||||||
run: uvx ruff check src/
|
|
||||||
@ -27,18 +27,5 @@ schiphol = "src.main:main"
|
|||||||
requires = ["setuptools>=61.0"]
|
requires = ["setuptools>=61.0"]
|
||||||
build-backend = "setuptools.build_meta"
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
[tool.setuptools.packages.find]
|
[tool.setuptools]
|
||||||
where = ["."]
|
packages = ["src"]
|
||||||
include = ["src*"]
|
|
||||||
|
|
||||||
[tool.ruff]
|
|
||||||
line-length = 100
|
|
||||||
target-version = "py314"
|
|
||||||
|
|
||||||
[tool.ruff.lint]
|
|
||||||
select = ["E", "F", "I", "UP", "B", "SIM"]
|
|
||||||
ignore = []
|
|
||||||
|
|
||||||
[tool.ruff.format]
|
|
||||||
quote-style = "double"
|
|
||||||
indent-style = "space"
|
|
||||||
|
|||||||
24
src/main.py
24
src/main.py
@ -1,18 +1,16 @@
|
|||||||
import json
|
from src.settings import SchipholApiSettings, KafkaSettings
|
||||||
import logging
|
from requests_ratelimiter import LimiterSession
|
||||||
|
|
||||||
import aiorun
|
|
||||||
from kstreams import create_engine
|
from kstreams import create_engine
|
||||||
from kstreams.backends.kafka import Kafka
|
from kstreams.backends.kafka import Kafka
|
||||||
from opentelemetry import trace
|
from opentelemetry import trace
|
||||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
|
||||||
from opentelemetry.instrumentation.requests import RequestsInstrumentor
|
|
||||||
from opentelemetry.sdk.resources import Resource
|
from opentelemetry.sdk.resources import Resource
|
||||||
from opentelemetry.sdk.trace import TracerProvider
|
from opentelemetry.sdk.trace import TracerProvider
|
||||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||||
from requests_ratelimiter import LimiterSession
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||||
|
from opentelemetry.instrumentation.requests import RequestsInstrumentor
|
||||||
from src.settings import KafkaSettings, SchipholApiSettings
|
import aiorun
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -47,7 +45,9 @@ async def start():
|
|||||||
total_flights = 0
|
total_flights = 0
|
||||||
|
|
||||||
while next_url:
|
while next_url:
|
||||||
with tracer.start_as_current_span("fetch_page", attributes={"http.url": next_url}):
|
with tracer.start_as_current_span(
|
||||||
|
"fetch_page", attributes={"http.url": next_url}
|
||||||
|
):
|
||||||
response = session.get(next_url)
|
response = session.get(next_url)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
data = response.json()
|
data = response.json()
|
||||||
@ -80,7 +80,9 @@ async def start():
|
|||||||
next_url = link.split(";")[0].strip().strip("<>")
|
next_url = link.split(";")[0].strip().strip("<>")
|
||||||
break
|
break
|
||||||
|
|
||||||
logger.info(f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'")
|
logger.info(
|
||||||
|
f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'"
|
||||||
|
)
|
||||||
|
|
||||||
await stream_engine.stop()
|
await stream_engine.stop()
|
||||||
|
|
||||||
|
|||||||
@ -1,14 +1,17 @@
|
|||||||
# middleware.py
|
# middleware.py
|
||||||
import logging
|
from kstreams import middleware, ConsumerRecord
|
||||||
|
|
||||||
from kstreams import ConsumerRecord, middleware
|
|
||||||
from schema_registry.client import AsyncSchemaRegistryClient
|
from schema_registry.client import AsyncSchemaRegistryClient
|
||||||
from schema_registry.serializers import AsyncJsonMessageSerializer
|
from schema_registry.serializers import AsyncJsonMessageSerializer
|
||||||
|
from typing import Optional, Dict
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ConfluentJsonSchemaMiddleware(middleware.BaseMiddleware, AsyncJsonMessageSerializer):
|
class ConfluentJsonSchemaMiddleware(
|
||||||
|
middleware.BaseMiddleware, AsyncJsonMessageSerializer
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Middleware to deserialize JSON messages using Confluent Schema Registry
|
Middleware to deserialize JSON messages using Confluent Schema Registry
|
||||||
"""
|
"""
|
||||||
@ -17,13 +20,13 @@ class ConfluentJsonSchemaMiddleware(middleware.BaseMiddleware, AsyncJsonMessageS
|
|||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
schema_registry_client: AsyncSchemaRegistryClient,
|
schema_registry_client: AsyncSchemaRegistryClient,
|
||||||
reader_schema: dict | None = None,
|
reader_schema: Optional[Dict] = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self.schemaregistry_client = schema_registry_client
|
self.schemaregistry_client = schema_registry_client
|
||||||
self.reader_schema = reader_schema
|
self.reader_schema = reader_schema
|
||||||
self.id_to_decoder: dict = {}
|
self.id_to_decoder: Dict = {}
|
||||||
|
|
||||||
async def __call__(self, cr: ConsumerRecord):
|
async def __call__(self, cr: ConsumerRecord):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@ -59,8 +59,12 @@ class FlightDirection(Enum):
|
|||||||
|
|
||||||
class RouteType(BaseModel):
|
class RouteType(BaseModel):
|
||||||
destinations: list[str] | None = None
|
destinations: list[str] | None = None
|
||||||
eu: str | None = Field(None, description="S (Schengen), E (Europe) or N (non-Europe)")
|
eu: str | None = Field(
|
||||||
visa: bool | None = Field(None, description="Indicates if a visum is required for destination")
|
None, description="S (Schengen), E (Europe) or N (non-Europe)"
|
||||||
|
)
|
||||||
|
visa: bool | None = Field(
|
||||||
|
None, description="Indicates if a visum is required for destination"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class AircraftTypeType(BaseModel):
|
class AircraftTypeType(BaseModel):
|
||||||
@ -116,19 +120,35 @@ class CheckinAllocationsType(BaseModel):
|
|||||||
|
|
||||||
class Flight(BaseModel):
|
class Flight(BaseModel):
|
||||||
lastUpdatedAt: datetime | None = None
|
lastUpdatedAt: datetime | None = None
|
||||||
actualLandingTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
actualLandingTime: datetime | None = Field(
|
||||||
actualOffBlockTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
|
actualOffBlockTime: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
aircraftRegistration: str | None = None
|
aircraftRegistration: str | None = None
|
||||||
aircraftType: AircraftTypeType | None = None
|
aircraftType: AircraftTypeType | None = None
|
||||||
baggageClaim: BaggageClaimType | None = None
|
baggageClaim: BaggageClaimType | None = None
|
||||||
checkinAllocations: CheckinAllocationsType | None = None
|
checkinAllocations: CheckinAllocationsType | None = None
|
||||||
codeshares: CodesharesType | None = None
|
codeshares: CodesharesType | None = None
|
||||||
estimatedLandingTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
estimatedLandingTime: datetime | None = Field(
|
||||||
expectedTimeBoarding: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
expectedTimeGateClosing: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
)
|
||||||
expectedTimeGateOpen: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
expectedTimeBoarding: datetime | None = Field(
|
||||||
expectedTimeOnBelt: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
expectedSecurityFilter: str | None = Field(None, description="expected security filter")
|
)
|
||||||
|
expectedTimeGateClosing: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
|
expectedTimeGateOpen: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
|
expectedTimeOnBelt: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
|
expectedSecurityFilter: str | None = Field(
|
||||||
|
None, description="expected security filter"
|
||||||
|
)
|
||||||
flightDirection: FlightDirection | None = None
|
flightDirection: FlightDirection | None = None
|
||||||
flightName: str | None = None
|
flightName: str | None = None
|
||||||
flightNumber: int | None = None
|
flightNumber: int | None = None
|
||||||
@ -145,12 +165,14 @@ class Flight(BaseModel):
|
|||||||
)
|
)
|
||||||
publicFlightState: PublicFlightStateType | None = None
|
publicFlightState: PublicFlightStateType | None = None
|
||||||
route: RouteType | None = None
|
route: RouteType | None = None
|
||||||
scheduleDateTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
scheduleDateTime: datetime | None = Field(
|
||||||
|
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||||
|
)
|
||||||
scheduleDate: date | None = Field(None, description="yyyy-MM-dd")
|
scheduleDate: date | None = Field(None, description="yyyy-MM-dd")
|
||||||
scheduleTime: str | None = Field(None, description="hh:mm:ss")
|
scheduleTime: str | None = Field(None, description="hh:mm:ss")
|
||||||
serviceType: str | None = Field(
|
serviceType: str | None = Field(
|
||||||
None,
|
None,
|
||||||
description="The service type category of the commercial flight.",
|
description="The service type category of the commercial flight. For example: J = Passenger Line, C=Passenger Charter, F = Freight Line and H = Freight Charter etc.",
|
||||||
)
|
)
|
||||||
terminal: int | None = None
|
terminal: int | None = None
|
||||||
transferPositions: TransferPositionsType | None = None
|
transferPositions: TransferPositionsType | None = None
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user