Compare commits

...

5 Commits

Author SHA1 Message Date
Peter
27e7ed4379 OPT add linting workflow
All checks were successful
Lint and Format / lint (pull_request) Successful in 10s
SonarQube Scan / sonarqube (pull_request) Successful in 49s
2025-12-29 17:15:35 +01:00
Peter
2a67276375 OPT add linting workflow 2025-12-29 17:15:35 +01:00
Peter
f0ce386a78 OPT project structure 2025-12-29 17:15:29 +01:00
Peter
f5d32cfd71 OPT add linting workflow
All checks were successful
Lint and Format / lint (pull_request) Successful in 10s
SonarQube Scan / sonarqube (pull_request) Successful in 50s
2025-12-29 17:09:46 +01:00
Peter
da7b4e07bb OPT add linting workflow
Some checks failed
SonarQube Scan / sonarqube (pull_request) Successful in 53s
Lint and Format / lint (pull_request) Failing after 9s
2025-12-29 17:00:39 +01:00
9 changed files with 83 additions and 1167 deletions

View File

@ -0,0 +1,25 @@
name: Lint and Format
on:
pull_request:
branches:
- main
push:
branches:
- main
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
- name: Check formatting with ruff
run: uvx ruff format --check src/
- name: Lint with ruff
run: uvx ruff check src/

File diff suppressed because it is too large Load Diff

View File

@ -19,3 +19,26 @@ dependencies = [
"opentelemetry-exporter-otlp>=1.27.0",
"opentelemetry-instrumentation-requests>=0.48b0",
]
[project.scripts]
schiphol = "src.main:main"
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["."]
include = ["src*"]
[tool.ruff]
line-length = 100
target-version = "py314"
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "SIM"]
ignore = []
[tool.ruff.format]
quote-style = "double"
indent-style = "space"

1
src/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Schiphol flight data producer."""

View File

@ -1,16 +1,18 @@
from settings import SchipholApiSettings, KafkaSettings
from requests_ratelimiter import LimiterSession
import json
import logging
import aiorun
from kstreams import create_engine
from kstreams.backends.kafka import Kafka
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
import aiorun
import json
import logging
from requests_ratelimiter import LimiterSession
from src.settings import KafkaSettings, SchipholApiSettings
logger = logging.getLogger(__name__)
@ -45,9 +47,7 @@ async def start():
total_flights = 0
while next_url:
with tracer.start_as_current_span(
"fetch_page", attributes={"http.url": next_url}
):
with tracer.start_as_current_span("fetch_page", attributes={"http.url": next_url}):
response = session.get(next_url)
response.raise_for_status()
data = response.json()
@ -80,9 +80,7 @@ async def start():
next_url = link.split(";")[0].strip().strip("<>")
break
logger.info(
f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'"
)
logger.info(f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'")
await stream_engine.stop()
@ -91,6 +89,10 @@ async def shutdown(loop):
logger.info("Shutdown")
if __name__ == "__main__":
def main():
logging.basicConfig(level=logging.INFO)
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
if __name__ == "__main__":
main()

View File

@ -1,17 +1,14 @@
# middleware.py
from kstreams import middleware, ConsumerRecord
from schema_registry.client import AsyncSchemaRegistryClient
from schema_registry.serializers import AsyncJsonMessageSerializer
from typing import Optional, Dict
import logging
from kstreams import ConsumerRecord, middleware
from schema_registry.client import AsyncSchemaRegistryClient
from schema_registry.serializers import AsyncJsonMessageSerializer
logger = logging.getLogger(__name__)
class ConfluentJsonSchemaMiddleware(
middleware.BaseMiddleware, AsyncJsonMessageSerializer
):
class ConfluentJsonSchemaMiddleware(middleware.BaseMiddleware, AsyncJsonMessageSerializer):
"""
Middleware to deserialize JSON messages using Confluent Schema Registry
"""
@ -20,13 +17,13 @@ class ConfluentJsonSchemaMiddleware(
self,
*,
schema_registry_client: AsyncSchemaRegistryClient,
reader_schema: Optional[Dict] = None,
reader_schema: dict | None = None,
**kwargs,
):
super().__init__(**kwargs)
self.schemaregistry_client = schema_registry_client
self.reader_schema = reader_schema
self.id_to_decoder: Dict = {}
self.id_to_decoder: dict = {}
async def __call__(self, cr: ConsumerRecord):
"""

View File

@ -59,12 +59,8 @@ class FlightDirection(Enum):
class RouteType(BaseModel):
destinations: list[str] | None = None
eu: str | None = Field(
None, description="S (Schengen), E (Europe) or N (non-Europe)"
)
visa: bool | None = Field(
None, description="Indicates if a visum is required for destination"
)
eu: str | None = Field(None, description="S (Schengen), E (Europe) or N (non-Europe)")
visa: bool | None = Field(None, description="Indicates if a visum is required for destination")
class AircraftTypeType(BaseModel):
@ -120,35 +116,19 @@ class CheckinAllocationsType(BaseModel):
class Flight(BaseModel):
lastUpdatedAt: datetime | None = None
actualLandingTime: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
actualOffBlockTime: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
actualLandingTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
actualOffBlockTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
aircraftRegistration: str | None = None
aircraftType: AircraftTypeType | None = None
baggageClaim: BaggageClaimType | None = None
checkinAllocations: CheckinAllocationsType | None = None
codeshares: CodesharesType | None = None
estimatedLandingTime: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
expectedTimeBoarding: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
expectedTimeGateClosing: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
expectedTimeGateOpen: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
expectedTimeOnBelt: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
expectedSecurityFilter: str | None = Field(
None, description="expected security filter"
)
estimatedLandingTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
expectedTimeBoarding: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
expectedTimeGateClosing: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
expectedTimeGateOpen: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
expectedTimeOnBelt: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
expectedSecurityFilter: str | None = Field(None, description="expected security filter")
flightDirection: FlightDirection | None = None
flightName: str | None = None
flightNumber: int | None = None
@ -165,14 +145,12 @@ class Flight(BaseModel):
)
publicFlightState: PublicFlightStateType | None = None
route: RouteType | None = None
scheduleDateTime: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
scheduleDateTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
scheduleDate: date | None = Field(None, description="yyyy-MM-dd")
scheduleTime: str | None = Field(None, description="hh:mm:ss")
serviceType: str | None = Field(
None,
description="The service type category of the commercial flight. For example: J = Passenger Line, C=Passenger Charter, F = Freight Line and H = Freight Charter etc.",
description="The service type category of the commercial flight.",
)
terminal: int | None = None
transferPositions: TransferPositionsType | None = None