Compare commits

...

5 Commits

Author SHA1 Message Date
Peter
27e7ed4379 OPT add linting workflow
All checks were successful
Lint and Format / lint (pull_request) Successful in 10s
SonarQube Scan / sonarqube (pull_request) Successful in 49s
2025-12-29 17:15:35 +01:00
Peter
2a67276375 OPT add linting workflow 2025-12-29 17:15:35 +01:00
Peter
f0ce386a78 OPT project structure 2025-12-29 17:15:29 +01:00
Peter
f5d32cfd71 OPT add linting workflow
All checks were successful
Lint and Format / lint (pull_request) Successful in 10s
SonarQube Scan / sonarqube (pull_request) Successful in 50s
2025-12-29 17:09:46 +01:00
Peter
da7b4e07bb OPT add linting workflow
Some checks failed
SonarQube Scan / sonarqube (pull_request) Successful in 53s
Lint and Format / lint (pull_request) Failing after 9s
2025-12-29 17:00:39 +01:00
9 changed files with 83 additions and 1167 deletions

View File

@ -0,0 +1,25 @@
name: Lint and Format
on:
pull_request:
branches:
- main
push:
branches:
- main
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
- name: Check formatting with ruff
run: uvx ruff format --check src/
- name: Lint with ruff
run: uvx ruff check src/

File diff suppressed because it is too large Load Diff

View File

@ -19,3 +19,26 @@ dependencies = [
"opentelemetry-exporter-otlp>=1.27.0", "opentelemetry-exporter-otlp>=1.27.0",
"opentelemetry-instrumentation-requests>=0.48b0", "opentelemetry-instrumentation-requests>=0.48b0",
] ]
[project.scripts]
schiphol = "src.main:main"
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["."]
include = ["src*"]
[tool.ruff]
line-length = 100
target-version = "py314"
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "B", "SIM"]
ignore = []
[tool.ruff.format]
quote-style = "double"
indent-style = "space"

1
src/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Schiphol flight data producer."""

View File

@ -1,16 +1,18 @@
from settings import SchipholApiSettings, KafkaSettings import json
from requests_ratelimiter import LimiterSession import logging
import aiorun
from kstreams import create_engine from kstreams import create_engine
from kstreams.backends.kafka import Kafka from kstreams.backends.kafka import Kafka
from opentelemetry import trace from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from requests_ratelimiter import LimiterSession
from opentelemetry.instrumentation.requests import RequestsInstrumentor
import aiorun from src.settings import KafkaSettings, SchipholApiSettings
import json
import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -45,9 +47,7 @@ async def start():
total_flights = 0 total_flights = 0
while next_url: while next_url:
with tracer.start_as_current_span( with tracer.start_as_current_span("fetch_page", attributes={"http.url": next_url}):
"fetch_page", attributes={"http.url": next_url}
):
response = session.get(next_url) response = session.get(next_url)
response.raise_for_status() response.raise_for_status()
data = response.json() data = response.json()
@ -80,9 +80,7 @@ async def start():
next_url = link.split(";")[0].strip().strip("<>") next_url = link.split(";")[0].strip().strip("<>")
break break
logger.info( logger.info(f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'")
f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'"
)
await stream_engine.stop() await stream_engine.stop()
@ -91,6 +89,10 @@ async def shutdown(loop):
logger.info("Shutdown") logger.info("Shutdown")
if __name__ == "__main__": def main():
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown) aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
if __name__ == "__main__":
main()

View File

@ -1,17 +1,14 @@
# middleware.py # middleware.py
from kstreams import middleware, ConsumerRecord
from schema_registry.client import AsyncSchemaRegistryClient
from schema_registry.serializers import AsyncJsonMessageSerializer
from typing import Optional, Dict
import logging import logging
from kstreams import ConsumerRecord, middleware
from schema_registry.client import AsyncSchemaRegistryClient
from schema_registry.serializers import AsyncJsonMessageSerializer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ConfluentJsonSchemaMiddleware( class ConfluentJsonSchemaMiddleware(middleware.BaseMiddleware, AsyncJsonMessageSerializer):
middleware.BaseMiddleware, AsyncJsonMessageSerializer
):
""" """
Middleware to deserialize JSON messages using Confluent Schema Registry Middleware to deserialize JSON messages using Confluent Schema Registry
""" """
@ -20,13 +17,13 @@ class ConfluentJsonSchemaMiddleware(
self, self,
*, *,
schema_registry_client: AsyncSchemaRegistryClient, schema_registry_client: AsyncSchemaRegistryClient,
reader_schema: Optional[Dict] = None, reader_schema: dict | None = None,
**kwargs, **kwargs,
): ):
super().__init__(**kwargs) super().__init__(**kwargs)
self.schemaregistry_client = schema_registry_client self.schemaregistry_client = schema_registry_client
self.reader_schema = reader_schema self.reader_schema = reader_schema
self.id_to_decoder: Dict = {} self.id_to_decoder: dict = {}
async def __call__(self, cr: ConsumerRecord): async def __call__(self, cr: ConsumerRecord):
""" """

View File

@ -59,12 +59,8 @@ class FlightDirection(Enum):
class RouteType(BaseModel): class RouteType(BaseModel):
destinations: list[str] | None = None destinations: list[str] | None = None
eu: str | None = Field( eu: str | None = Field(None, description="S (Schengen), E (Europe) or N (non-Europe)")
None, description="S (Schengen), E (Europe) or N (non-Europe)" visa: bool | None = Field(None, description="Indicates if a visum is required for destination")
)
visa: bool | None = Field(
None, description="Indicates if a visum is required for destination"
)
class AircraftTypeType(BaseModel): class AircraftTypeType(BaseModel):
@ -120,35 +116,19 @@ class CheckinAllocationsType(BaseModel):
class Flight(BaseModel): class Flight(BaseModel):
lastUpdatedAt: datetime | None = None lastUpdatedAt: datetime | None = None
actualLandingTime: datetime | None = Field( actualLandingTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ" actualOffBlockTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
)
actualOffBlockTime: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
aircraftRegistration: str | None = None aircraftRegistration: str | None = None
aircraftType: AircraftTypeType | None = None aircraftType: AircraftTypeType | None = None
baggageClaim: BaggageClaimType | None = None baggageClaim: BaggageClaimType | None = None
checkinAllocations: CheckinAllocationsType | None = None checkinAllocations: CheckinAllocationsType | None = None
codeshares: CodesharesType | None = None codeshares: CodesharesType | None = None
estimatedLandingTime: datetime | None = Field( estimatedLandingTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ" expectedTimeBoarding: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
) expectedTimeGateClosing: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
expectedTimeBoarding: datetime | None = Field( expectedTimeGateOpen: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ" expectedTimeOnBelt: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
) expectedSecurityFilter: str | None = Field(None, description="expected security filter")
expectedTimeGateClosing: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
expectedTimeGateOpen: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
expectedTimeOnBelt: datetime | None = Field(
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
expectedSecurityFilter: str | None = Field(
None, description="expected security filter"
)
flightDirection: FlightDirection | None = None flightDirection: FlightDirection | None = None
flightName: str | None = None flightName: str | None = None
flightNumber: int | None = None flightNumber: int | None = None
@ -165,14 +145,12 @@ class Flight(BaseModel):
) )
publicFlightState: PublicFlightStateType | None = None publicFlightState: PublicFlightStateType | None = None
route: RouteType | None = None route: RouteType | None = None
scheduleDateTime: datetime | None = Field( scheduleDateTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
)
scheduleDate: date | None = Field(None, description="yyyy-MM-dd") scheduleDate: date | None = Field(None, description="yyyy-MM-dd")
scheduleTime: str | None = Field(None, description="hh:mm:ss") scheduleTime: str | None = Field(None, description="hh:mm:ss")
serviceType: str | None = Field( serviceType: str | None = Field(
None, None,
description="The service type category of the commercial flight. For example: J = Passenger Line, C=Passenger Charter, F = Freight Line and H = Freight Charter etc.", description="The service type category of the commercial flight.",
) )
terminal: int | None = None terminal: int | None = None
transferPositions: TransferPositionsType | None = None transferPositions: TransferPositionsType | None = None