Compare commits
6 Commits
bb9d75a26d
...
b4088c8a93
| Author | SHA1 | Date | |
|---|---|---|---|
| b4088c8a93 | |||
|
|
2ee0d0f439 | ||
|
|
b573128b15 | ||
|
|
8228c9eb7d | ||
|
|
273d0abb25 | ||
|
|
2df6791eca |
25
.gitea/workflows/lint.yaml
Normal file
25
.gitea/workflows/lint.yaml
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
name: Lint and Format
|
||||||
|
|
||||||
|
on:
|
||||||
|
pull_request:
|
||||||
|
branches:
|
||||||
|
- main
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- main
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
lint:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout code
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Install uv
|
||||||
|
uses: astral-sh/setup-uv@v5
|
||||||
|
|
||||||
|
- name: Check formatting with ruff
|
||||||
|
run: uvx ruff format --check src/
|
||||||
|
|
||||||
|
- name: Lint with ruff
|
||||||
|
run: uvx ruff check src/
|
||||||
@ -27,5 +27,18 @@ schiphol = "src.main:main"
|
|||||||
requires = ["setuptools>=61.0"]
|
requires = ["setuptools>=61.0"]
|
||||||
build-backend = "setuptools.build_meta"
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
[tool.setuptools]
|
[tool.setuptools.packages.find]
|
||||||
packages = ["src"]
|
where = ["."]
|
||||||
|
include = ["src*"]
|
||||||
|
|
||||||
|
[tool.ruff]
|
||||||
|
line-length = 100
|
||||||
|
target-version = "py314"
|
||||||
|
|
||||||
|
[tool.ruff.lint]
|
||||||
|
select = ["E", "F", "I", "UP", "B", "SIM"]
|
||||||
|
ignore = []
|
||||||
|
|
||||||
|
[tool.ruff.format]
|
||||||
|
quote-style = "double"
|
||||||
|
indent-style = "space"
|
||||||
|
|||||||
24
src/main.py
24
src/main.py
@ -1,16 +1,18 @@
|
|||||||
from src.settings import SchipholApiSettings, KafkaSettings
|
import json
|
||||||
from requests_ratelimiter import LimiterSession
|
import logging
|
||||||
|
|
||||||
|
import aiorun
|
||||||
from kstreams import create_engine
|
from kstreams import create_engine
|
||||||
from kstreams.backends.kafka import Kafka
|
from kstreams.backends.kafka import Kafka
|
||||||
from opentelemetry import trace
|
from opentelemetry import trace
|
||||||
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||||
|
from opentelemetry.instrumentation.requests import RequestsInstrumentor
|
||||||
from opentelemetry.sdk.resources import Resource
|
from opentelemetry.sdk.resources import Resource
|
||||||
from opentelemetry.sdk.trace import TracerProvider
|
from opentelemetry.sdk.trace import TracerProvider
|
||||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
from requests_ratelimiter import LimiterSession
|
||||||
from opentelemetry.instrumentation.requests import RequestsInstrumentor
|
|
||||||
import aiorun
|
from src.settings import KafkaSettings, SchipholApiSettings
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -45,9 +47,7 @@ async def start():
|
|||||||
total_flights = 0
|
total_flights = 0
|
||||||
|
|
||||||
while next_url:
|
while next_url:
|
||||||
with tracer.start_as_current_span(
|
with tracer.start_as_current_span("fetch_page", attributes={"http.url": next_url}):
|
||||||
"fetch_page", attributes={"http.url": next_url}
|
|
||||||
):
|
|
||||||
response = session.get(next_url)
|
response = session.get(next_url)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
data = response.json()
|
data = response.json()
|
||||||
@ -80,9 +80,7 @@ async def start():
|
|||||||
next_url = link.split(";")[0].strip().strip("<>")
|
next_url = link.split(";")[0].strip().strip("<>")
|
||||||
break
|
break
|
||||||
|
|
||||||
logger.info(
|
logger.info(f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'")
|
||||||
f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'"
|
|
||||||
)
|
|
||||||
|
|
||||||
await stream_engine.stop()
|
await stream_engine.stop()
|
||||||
|
|
||||||
|
|||||||
@ -1,17 +1,14 @@
|
|||||||
# middleware.py
|
# middleware.py
|
||||||
from kstreams import middleware, ConsumerRecord
|
|
||||||
from schema_registry.client import AsyncSchemaRegistryClient
|
|
||||||
from schema_registry.serializers import AsyncJsonMessageSerializer
|
|
||||||
from typing import Optional, Dict
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from kstreams import ConsumerRecord, middleware
|
||||||
|
from schema_registry.client import AsyncSchemaRegistryClient
|
||||||
|
from schema_registry.serializers import AsyncJsonMessageSerializer
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ConfluentJsonSchemaMiddleware(
|
class ConfluentJsonSchemaMiddleware(middleware.BaseMiddleware, AsyncJsonMessageSerializer):
|
||||||
middleware.BaseMiddleware, AsyncJsonMessageSerializer
|
|
||||||
):
|
|
||||||
"""
|
"""
|
||||||
Middleware to deserialize JSON messages using Confluent Schema Registry
|
Middleware to deserialize JSON messages using Confluent Schema Registry
|
||||||
"""
|
"""
|
||||||
@ -20,13 +17,13 @@ class ConfluentJsonSchemaMiddleware(
|
|||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
schema_registry_client: AsyncSchemaRegistryClient,
|
schema_registry_client: AsyncSchemaRegistryClient,
|
||||||
reader_schema: Optional[Dict] = None,
|
reader_schema: dict | None = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self.schemaregistry_client = schema_registry_client
|
self.schemaregistry_client = schema_registry_client
|
||||||
self.reader_schema = reader_schema
|
self.reader_schema = reader_schema
|
||||||
self.id_to_decoder: Dict = {}
|
self.id_to_decoder: dict = {}
|
||||||
|
|
||||||
async def __call__(self, cr: ConsumerRecord):
|
async def __call__(self, cr: ConsumerRecord):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@ -59,12 +59,8 @@ class FlightDirection(Enum):
|
|||||||
|
|
||||||
class RouteType(BaseModel):
|
class RouteType(BaseModel):
|
||||||
destinations: list[str] | None = None
|
destinations: list[str] | None = None
|
||||||
eu: str | None = Field(
|
eu: str | None = Field(None, description="S (Schengen), E (Europe) or N (non-Europe)")
|
||||||
None, description="S (Schengen), E (Europe) or N (non-Europe)"
|
visa: bool | None = Field(None, description="Indicates if a visum is required for destination")
|
||||||
)
|
|
||||||
visa: bool | None = Field(
|
|
||||||
None, description="Indicates if a visum is required for destination"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class AircraftTypeType(BaseModel):
|
class AircraftTypeType(BaseModel):
|
||||||
@ -120,35 +116,19 @@ class CheckinAllocationsType(BaseModel):
|
|||||||
|
|
||||||
class Flight(BaseModel):
|
class Flight(BaseModel):
|
||||||
lastUpdatedAt: datetime | None = None
|
lastUpdatedAt: datetime | None = None
|
||||||
actualLandingTime: datetime | None = Field(
|
actualLandingTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
actualOffBlockTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||||
)
|
|
||||||
actualOffBlockTime: datetime | None = Field(
|
|
||||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
|
||||||
)
|
|
||||||
aircraftRegistration: str | None = None
|
aircraftRegistration: str | None = None
|
||||||
aircraftType: AircraftTypeType | None = None
|
aircraftType: AircraftTypeType | None = None
|
||||||
baggageClaim: BaggageClaimType | None = None
|
baggageClaim: BaggageClaimType | None = None
|
||||||
checkinAllocations: CheckinAllocationsType | None = None
|
checkinAllocations: CheckinAllocationsType | None = None
|
||||||
codeshares: CodesharesType | None = None
|
codeshares: CodesharesType | None = None
|
||||||
estimatedLandingTime: datetime | None = Field(
|
estimatedLandingTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
expectedTimeBoarding: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||||
)
|
expectedTimeGateClosing: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||||
expectedTimeBoarding: datetime | None = Field(
|
expectedTimeGateOpen: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
expectedTimeOnBelt: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||||
)
|
expectedSecurityFilter: str | None = Field(None, description="expected security filter")
|
||||||
expectedTimeGateClosing: datetime | None = Field(
|
|
||||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
|
||||||
)
|
|
||||||
expectedTimeGateOpen: datetime | None = Field(
|
|
||||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
|
||||||
)
|
|
||||||
expectedTimeOnBelt: datetime | None = Field(
|
|
||||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
|
||||||
)
|
|
||||||
expectedSecurityFilter: str | None = Field(
|
|
||||||
None, description="expected security filter"
|
|
||||||
)
|
|
||||||
flightDirection: FlightDirection | None = None
|
flightDirection: FlightDirection | None = None
|
||||||
flightName: str | None = None
|
flightName: str | None = None
|
||||||
flightNumber: int | None = None
|
flightNumber: int | None = None
|
||||||
@ -165,14 +145,12 @@ class Flight(BaseModel):
|
|||||||
)
|
)
|
||||||
publicFlightState: PublicFlightStateType | None = None
|
publicFlightState: PublicFlightStateType | None = None
|
||||||
route: RouteType | None = None
|
route: RouteType | None = None
|
||||||
scheduleDateTime: datetime | None = Field(
|
scheduleDateTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
|
||||||
)
|
|
||||||
scheduleDate: date | None = Field(None, description="yyyy-MM-dd")
|
scheduleDate: date | None = Field(None, description="yyyy-MM-dd")
|
||||||
scheduleTime: str | None = Field(None, description="hh:mm:ss")
|
scheduleTime: str | None = Field(None, description="hh:mm:ss")
|
||||||
serviceType: str | None = Field(
|
serviceType: str | None = Field(
|
||||||
None,
|
None,
|
||||||
description="The service type category of the commercial flight. For example: J = Passenger Line, C=Passenger Charter, F = Freight Line and H = Freight Charter etc.",
|
description="The service type category of the commercial flight.",
|
||||||
)
|
)
|
||||||
terminal: int | None = None
|
terminal: int | None = None
|
||||||
transferPositions: TransferPositionsType | None = None
|
transferPositions: TransferPositionsType | None = None
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user