From 8fd1406c325637467c78a66badb75d51dda7bb3a2d3142479645ba8f720f449e Mon Sep 17 00:00:00 2001 From: Ben Varick Date: Thu, 6 Nov 2025 12:31:17 -0800 Subject: [PATCH] added gtfs_feed_metadata --- Dockerfile_user_code_gtfs | 5 +- dagster.yaml | 6 ++ docker-compose.yaml | 9 +++ user_code/__init__.py | 2 + user_code/gtfs/__init__.py | 2 +- user_code/gtfs/assets.py | 123 +++++++++++++++++++++++++++++++++- user_code/gtfs/definitions.py | 14 +++- user_code/gtfs/resources.py | 85 +++++++++++++++++++++++ 8 files changed, 239 insertions(+), 7 deletions(-) create mode 100644 user_code/__init__.py create mode 100644 user_code/gtfs/resources.py diff --git a/Dockerfile_user_code_gtfs b/Dockerfile_user_code_gtfs index c3ae1ea..4095717 100644 --- a/Dockerfile_user_code_gtfs +++ b/Dockerfile_user_code_gtfs @@ -8,7 +8,8 @@ RUN pip install \ dagster-postgres \ dagster-docker \ dagster-duckdb \ - pandas + pandas \ + requests WORKDIR /opt/dagster/app COPY user_code/gtfs /opt/dagster/app @@ -17,4 +18,4 @@ COPY user_code/gtfs /opt/dagster/app EXPOSE 4000 -CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-f", "definitions.py"] \ No newline at end of file +CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-f", "definitions.py"] diff --git a/dagster.yaml b/dagster.yaml index bf75022..a170c61 100644 --- a/dagster.yaml +++ b/dagster.yaml @@ -21,6 +21,7 @@ run_launcher: - DAGSTER_POSTGRES_USER - DAGSTER_POSTGRES_PASSWORD - DAGSTER_POSTGRES_DB + - MOBILITY_DB_REFRESH_TOKEN network: dagster container_kwargs: volumes: # Make docker client accessible to any launched containers as well @@ -28,6 +29,11 @@ run_launcher: - /tmp/io_manager_storage:/tmp/io_manager_storage - /home/ben/code/gtfs-dagster/data:/opt/dagster/app/data +auto_materialize: + enabled: true + run_tags: + source: auto-materialize + run_storage: module: dagster_postgres.run_storage class: PostgresRunStorage diff --git a/docker-compose.yaml b/docker-compose.yaml index 6bdc67f..9a98f26 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -11,6 +11,8 @@ services: POSTGRES_USER: ${POSTGRES_USER} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} POSTGRES_DB: ${POSTGRES_DB} + env_file: + - .env volumes: - ./postgres_data:/var/lib/postgresql/data networks: @@ -39,6 +41,9 @@ services: DAGSTER_POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} DAGSTER_POSTGRES_DB: ${POSTGRES_DB} DAGSTER_CURRENT_IMAGE: 'dagster_user_code_gtfs' + MOBILITY_DB_REFRESH_TOKEN: ${MOBILITY_DB_REFRESH_TOKEN} + env_file: + - .env volumes: - ./data:/opt/dagster/app/data networks: @@ -66,6 +71,8 @@ services: DAGSTER_POSTGRES_USER: ${POSTGRES_USER} DAGSTER_POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} DAGSTER_POSTGRES_DB: ${POSTGRES_DB} + env_file: + - .env volumes: # Make docker client accessible so we can terminate containers from the webserver - /var/run/docker.sock:/var/run/docker.sock - /tmp/io_manager_storage:/tmp/io_manager_storage @@ -92,6 +99,8 @@ services: DAGSTER_POSTGRES_USER: ${POSTGRES_USER} DAGSTER_POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} DAGSTER_POSTGRES_DB: ${POSTGRES_DB} + env_file: + - .env volumes: # Make docker client accessible so we can launch containers using host docker - /var/run/docker.sock:/var/run/docker.sock - /tmp/io_manager_storage:/tmp/io_manager_storage diff --git a/user_code/__init__.py b/user_code/__init__.py new file mode 100644 index 0000000..3a78e18 --- /dev/null +++ b/user_code/__init__.py @@ -0,0 +1,2 @@ +# user_code/__init__.py +# This file makes user_code a package diff --git a/user_code/gtfs/__init__.py b/user_code/gtfs/__init__.py index 4c4960f..ea8ef8a 100644 --- a/user_code/gtfs/__init__.py +++ b/user_code/gtfs/__init__.py @@ -1,2 +1,2 @@ # user_code/gtfs/__init__.py -from .assets import * +from . import assets, resources diff --git a/user_code/gtfs/assets.py b/user_code/gtfs/assets.py index c1b6ab1..e341f54 100644 --- a/user_code/gtfs/assets.py +++ b/user_code/gtfs/assets.py @@ -1,6 +1,14 @@ import pandas as pd -from dagster import asset +from dagster import ( + asset, + AssetExecutionContext, + Output, + MetadataValue, + AutoMaterializePolicy +) from dagster_duckdb import DuckDBResource +from resources import MobilityDatabaseAPI # Direct import instead of relative +import json @asset @@ -16,3 +24,116 @@ def agency_list(duckdb: DuckDBResource) -> None: CREATE OR REPLACE TABLE agency_list AS SELECT * FROM df """) + + +@asset( + deps=["agency_list"], + group_name="gtfs_metadata", + auto_materialize_policy=AutoMaterializePolicy.eager() +) +def gtfs_feed_metadata( + context: AssetExecutionContext, + duckdb: DuckDBResource, + mobility_db: MobilityDatabaseAPI +) -> Output[None]: + """ + Fetch GTFS feed metadata from Mobility Database API for all agencies + and store in DuckDB. + """ + + with duckdb.get_connection() as conn: + # Create the metadata table if it doesn't exist + conn.execute(""" + CREATE TABLE IF NOT EXISTS gtfs_feed_metadata ( + feed_id VARCHAR PRIMARY KEY, + provider VARCHAR, + status VARCHAR, + official BOOLEAN, + producer_url VARCHAR, + authentication_type INTEGER, + authentication_info_url VARCHAR, + api_key_parameter_name VARCHAR, + license_url VARCHAR, + feed_contact_email VARCHAR, + raw_json JSON, + fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + + # Get all GTFS feed IDs from agency_list + feed_ids = conn.execute(""" + SELECT DISTINCT GTFS as feed_id + FROM agency_list + WHERE GTFS IS NOT NULL AND GTFS != '' + """).fetchall() + + context.log.info(f"Found {len(feed_ids)} feeds to fetch") + + successful = 0 + failed = 0 + + for (feed_id,) in feed_ids: + try: + feed_info = mobility_db.get_feed_info(feed_id) + + # Extract relevant fields + source_info = feed_info.get("source_info", {}) + + # Insert or update the record + conn.execute(""" + INSERT OR REPLACE INTO gtfs_feed_metadata ( + feed_id, + provider, + status, + official, + producer_url, + authentication_type, + authentication_info_url, + api_key_parameter_name, + license_url, + feed_contact_email, + raw_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, [ + feed_id, + feed_info.get("provider"), + feed_info.get("status"), + feed_info.get("official"), + source_info.get("producer_url"), + source_info.get("authentication_type"), + source_info.get("authentication_info_url"), + source_info.get("api_key_parameter_name"), + source_info.get("license_url"), + feed_info.get("feed_contact_email"), + json.dumps(feed_info) + ]) + + context.log.info(f"✓ Fetched and stored metadata for {feed_id}") + successful += 1 + + except Exception as e: + context.log.error(f"✗ Failed to fetch {feed_id}: {e}") + failed += 1 + + # Get summary stats + total_records = conn.execute( + "SELECT COUNT(*) FROM gtfs_feed_metadata" + ).fetchone()[0] + + # Get preview for metadata + preview_df = conn.execute(""" + SELECT feed_id, provider, status, producer_url + FROM gtfs_feed_metadata + LIMIT 5 + """).df() + + return Output( + None, + metadata={ + "total_feeds": len(feed_ids), + "successful": successful, + "failed": failed, + "total_records_in_db": total_records, + "preview": MetadataValue.md(preview_df.to_markdown(index=False)) + } + ) diff --git a/user_code/gtfs/definitions.py b/user_code/gtfs/definitions.py index ef43db8..12cea67 100644 --- a/user_code/gtfs/definitions.py +++ b/user_code/gtfs/definitions.py @@ -1,12 +1,20 @@ -from dagster import Definitions +from dagster import Definitions, load_assets_from_modules, EnvVar from dagster_duckdb import DuckDBResource -from assets import agency_list + +import assets +from resources import MobilityDatabaseAPI + +all_assets = load_assets_from_modules([assets]) defs = Definitions( - assets=[agency_list], + assets=all_assets, resources={ "duckdb": DuckDBResource( database="data/gtfs/gtfs.duckdb" + ), + "mobility_db": MobilityDatabaseAPI( + refresh_token=EnvVar("MOBILITY_DB_REFRESH_TOKEN"), + rate_limit_delay=0.5 ) } ) diff --git a/user_code/gtfs/resources.py b/user_code/gtfs/resources.py new file mode 100644 index 0000000..ce888e1 --- /dev/null +++ b/user_code/gtfs/resources.py @@ -0,0 +1,85 @@ +from dagster import ConfigurableResource +import requests +from typing import Optional +from time import sleep +import logging +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + + +class MobilityDatabaseAPI(ConfigurableResource): + """Resource for interacting with the Mobility Database API with OAuth2 token management.""" + + base_url: str = "https://api.mobilitydatabase.org" + refresh_token: str # Long-lived refresh token + rate_limit_delay: float = 0.5 # Seconds between requests + + # These will be set at runtime, not in config + _access_token: Optional[str] = None + _token_expires_at: Optional[datetime] = None + + def _get_access_token(self) -> str: + """ + Get a valid access token, refreshing if necessary. + Access tokens are valid for 1 hour. + """ + # If we have a token and it's not expired (with 5 min buffer), use it + if self._access_token and self._token_expires_at: + if datetime.now() < self._token_expires_at - timedelta(minutes=5): + return self._access_token + + # Need to get a new token + logger.info("Fetching new access token from Mobility Database API") + + url = f"{self.base_url}/v1/tokens" + headers = {"Content-Type": "application/json"} + data = {"refresh_token": self.refresh_token} + + try: + response = requests.post(url, headers=headers, json=data, timeout=30) + response.raise_for_status() + + token_data = response.json() + self._access_token = token_data.get("access_token") + + # Tokens are valid for 1 hour + self._token_expires_at = datetime.now() + timedelta(hours=1) + + logger.info("Successfully obtained new access token") + return self._access_token + + except requests.exceptions.RequestException as e: + logger.error(f"Error fetching access token: {e}") + raise RuntimeError(f"Failed to obtain access token: {e}") + + def get_feed_info(self, feed_id: str) -> dict: + """ + Fetch feed information from the Mobility Database API. + + Args: + feed_id: The MDB feed ID (e.g., 'mdb-394') + + Returns: + Dictionary containing feed information + """ + access_token = self._get_access_token() + + url = f"{self.base_url}/v1/feeds/{feed_id}" + headers = { + "Authorization": f"Bearer {access_token}" + } + + logger.info(f"Fetching feed info for {feed_id}") + + try: + response = requests.get(url, headers=headers, timeout=30) + response.raise_for_status() + + # Rate limiting + sleep(self.rate_limit_delay) + + return response.json() + except requests.exceptions.RequestException as e: + logger.error(f"Error fetching feed {feed_id}: {e}") + raise