#!/usr/bin/env python3 import json import requests from typing import Dict, Any from copy import deepcopy # Configuration SCHEMA_REGISTRY_URL = "http://192.168.1.78:8081" OPENAPI_FILE = "openapi.json" def resolve_refs(schema: Any, all_schemas: Dict[str, Any], visited: set = None) -> Any: """ Recursively resolve $ref references in a schema. """ if visited is None: visited = set() if isinstance(schema, dict): if "$ref" in schema: ref = schema["$ref"] # Extract schema name from #/components/schemas/SchemaName if ref.startswith("#/components/schemas/"): schema_name = ref. split("/")[-1] # Prevent infinite recursion if schema_name in visited: return {"type": "object", "description": f"Circular reference to {schema_name}"} visited.add(schema_name) if schema_name in all_schemas: # Recursively resolve the referenced schema resolved = resolve_refs(deepcopy(all_schemas[schema_name]), all_schemas, visited. copy()) return resolved else: print(f" Warning: Referenced schema '{schema_name}' not found") return schema else: return schema else: # Recursively resolve refs in nested objects return {k: resolve_refs(v, all_schemas, visited. copy()) for k, v in schema.items()} elif isinstance(schema, list): return [resolve_refs(item, all_schemas, visited.copy()) for item in schema] else: return schema def convert_openapi_to_json_schema(schema_def: Dict[str, Any], all_schemas: Dict[str, Any]) -> Dict[str, Any]: """ Convert OpenAPI schema to JSON Schema format with resolved references. """ # First resolve all $ref references resolved_schema = resolve_refs(deepcopy(schema_def), all_schemas) # Add JSON Schema metadata json_schema = { "$schema": "http://json-schema.org/draft-07/schema#", **resolved_schema } return json_schema def register_schema(subject: str, schema_def: Dict[str, Any], all_schemas: Dict[str, Any]) -> bool: """Register a schema with the Schema Registry.""" try: # Convert to JSON Schema with resolved references json_schema = convert_openapi_to_json_schema(schema_def, all_schemas) # Schema Registry expects the schema as a JSON string schema_str = json.dumps(json_schema) # Prepare payload for Schema Registry payload = { "schemaType": "JSON", "schema": schema_str } # Register schema url = f"{SCHEMA_REGISTRY_URL}/subjects/{subject}/versions" response = requests.post( url, headers={"Content-Type": "application/vnd.schemaregistry.v1+json"}, json=payload ) if response. status_code == 200: result = response.json() print(f"✓ Registered '{subject}'") print(f" Schema ID: {result['id']}") return True else: print(f"✗ Failed to register '{subject}'") print(f" Status: {response.status_code}") print(f" Error: {response.text}") return False except Exception as e: print(f"✗ Error registering '{subject}': {str(e)}") return False def main(): # Load OpenAPI file with open(OPENAPI_FILE, 'r') as f: openapi = json.load(f) # Extract schemas schemas = openapi.get('components', {}).get('schemas', {}) if not schemas: print("No schemas found in OpenAPI file!") return print(f"Found {len(schemas)} schemas in OpenAPI file\n") print("=" * 70) # Register each schema success_count = 0 failed_count = 0 for schema_name, schema_def in schemas.items(): # Create subject name subject = f"schiphol.{schema_name. lower()}-value" if register_schema(subject, schema_def, schemas): success_count += 1 else: failed_count += 1 print() print("=" * 70) print(f"\nRegistration complete!") print(f" ✓ Success: {success_count}") print(f" ✗ Failed: {failed_count}") print(f"\nTo list all registered subjects:") print(f" curl {SCHEMA_REGISTRY_URL}/subjects | jq .") print(f"\nTo view a specific schema:") print(f" curl {SCHEMA_REGISTRY_URL}/subjects/schiphol.flight-value/versions/latest | jq -r '.schema | fromjson'") if __name__ == "__main__": main()