OPT add linting workflow #3
@ -27,8 +27,9 @@ schiphol = "src.main:main"
|
||||
requires = ["setuptools>=61.0"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[tool.setuptools]
|
||||
packages = ["src"]
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["."]
|
||||
include = ["src*"]
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 100
|
||||
|
||||
24
src/main.py
24
src/main.py
@ -1,16 +1,18 @@
|
||||
from src.settings import SchipholApiSettings, KafkaSettings
|
||||
from requests_ratelimiter import LimiterSession
|
||||
import json
|
||||
import logging
|
||||
|
||||
import aiorun
|
||||
from kstreams import create_engine
|
||||
from kstreams.backends.kafka import Kafka
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.instrumentation.requests import RequestsInstrumentor
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.instrumentation.requests import RequestsInstrumentor
|
||||
import aiorun
|
||||
import json
|
||||
import logging
|
||||
from requests_ratelimiter import LimiterSession
|
||||
|
||||
from src.settings import KafkaSettings, SchipholApiSettings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -45,9 +47,7 @@ async def start():
|
||||
total_flights = 0
|
||||
|
||||
while next_url:
|
||||
with tracer.start_as_current_span(
|
||||
"fetch_page", attributes={"http.url": next_url}
|
||||
):
|
||||
with tracer.start_as_current_span("fetch_page", attributes={"http.url": next_url}):
|
||||
response = session.get(next_url)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
@ -80,9 +80,7 @@ async def start():
|
||||
next_url = link.split(";")[0].strip().strip("<>")
|
||||
break
|
||||
|
||||
logger.info(
|
||||
f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'"
|
||||
)
|
||||
logger.info(f"Published {total_flights} flights to Kafka topic '{kafka_settings.topic}'")
|
||||
|
||||
await stream_engine.stop()
|
||||
|
||||
|
||||
@ -1,17 +1,14 @@
|
||||
# middleware.py
|
||||
from kstreams import middleware, ConsumerRecord
|
||||
from schema_registry.client import AsyncSchemaRegistryClient
|
||||
from schema_registry.serializers import AsyncJsonMessageSerializer
|
||||
from typing import Optional, Dict
|
||||
import logging
|
||||
|
||||
from kstreams import ConsumerRecord, middleware
|
||||
from schema_registry.client import AsyncSchemaRegistryClient
|
||||
from schema_registry.serializers import AsyncJsonMessageSerializer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConfluentJsonSchemaMiddleware(
|
||||
middleware.BaseMiddleware, AsyncJsonMessageSerializer
|
||||
):
|
||||
class ConfluentJsonSchemaMiddleware(middleware.BaseMiddleware, AsyncJsonMessageSerializer):
|
||||
"""
|
||||
Middleware to deserialize JSON messages using Confluent Schema Registry
|
||||
"""
|
||||
@ -20,13 +17,13 @@ class ConfluentJsonSchemaMiddleware(
|
||||
self,
|
||||
*,
|
||||
schema_registry_client: AsyncSchemaRegistryClient,
|
||||
reader_schema: Optional[Dict] = None,
|
||||
reader_schema: dict | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self.schemaregistry_client = schema_registry_client
|
||||
self.reader_schema = reader_schema
|
||||
self.id_to_decoder: Dict = {}
|
||||
self.id_to_decoder: dict = {}
|
||||
|
||||
async def __call__(self, cr: ConsumerRecord):
|
||||
"""
|
||||
|
||||
@ -59,12 +59,8 @@ class FlightDirection(Enum):
|
||||
|
||||
class RouteType(BaseModel):
|
||||
destinations: list[str] | None = None
|
||||
eu: str | None = Field(
|
||||
None, description="S (Schengen), E (Europe) or N (non-Europe)"
|
||||
)
|
||||
visa: bool | None = Field(
|
||||
None, description="Indicates if a visum is required for destination"
|
||||
)
|
||||
eu: str | None = Field(None, description="S (Schengen), E (Europe) or N (non-Europe)")
|
||||
visa: bool | None = Field(None, description="Indicates if a visum is required for destination")
|
||||
|
||||
|
||||
class AircraftTypeType(BaseModel):
|
||||
@ -120,35 +116,19 @@ class CheckinAllocationsType(BaseModel):
|
||||
|
||||
class Flight(BaseModel):
|
||||
lastUpdatedAt: datetime | None = None
|
||||
actualLandingTime: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
actualOffBlockTime: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
actualLandingTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||
actualOffBlockTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||
aircraftRegistration: str | None = None
|
||||
aircraftType: AircraftTypeType | None = None
|
||||
baggageClaim: BaggageClaimType | None = None
|
||||
checkinAllocations: CheckinAllocationsType | None = None
|
||||
codeshares: CodesharesType | None = None
|
||||
estimatedLandingTime: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
expectedTimeBoarding: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
expectedTimeGateClosing: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
expectedTimeGateOpen: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
expectedTimeOnBelt: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
expectedSecurityFilter: str | None = Field(
|
||||
None, description="expected security filter"
|
||||
)
|
||||
estimatedLandingTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||
expectedTimeBoarding: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||
expectedTimeGateClosing: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||
expectedTimeGateOpen: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||
expectedTimeOnBelt: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||
expectedSecurityFilter: str | None = Field(None, description="expected security filter")
|
||||
flightDirection: FlightDirection | None = None
|
||||
flightName: str | None = None
|
||||
flightNumber: int | None = None
|
||||
@ -165,14 +145,12 @@ class Flight(BaseModel):
|
||||
)
|
||||
publicFlightState: PublicFlightStateType | None = None
|
||||
route: RouteType | None = None
|
||||
scheduleDateTime: datetime | None = Field(
|
||||
None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ"
|
||||
)
|
||||
scheduleDateTime: datetime | None = Field(None, description="yyyy-MM-dd'T'HH:mm:ss.SSSZ")
|
||||
scheduleDate: date | None = Field(None, description="yyyy-MM-dd")
|
||||
scheduleTime: str | None = Field(None, description="hh:mm:ss")
|
||||
serviceType: str | None = Field(
|
||||
None,
|
||||
description="The service type category of the commercial flight. For example: J = Passenger Line, C=Passenger Charter, F = Freight Line and H = Freight Charter etc.",
|
||||
description="The service type category of the commercial flight.",
|
||||
)
|
||||
terminal: int | None = None
|
||||
transferPositions: TransferPositionsType | None = None
|
||||
|
||||
Loading…
Reference in New Issue
Block a user