import pandas as pd from dagster import ( asset, AssetExecutionContext, Output, MetadataValue, AutomationCondition ) from dagster_duckdb import DuckDBResource from resources import MobilityDatabaseAPI import json @asset( group_name="gtfs_metadata", ) def agency_list(duckdb: DuckDBResource) -> None: """Load agency list from CSV into DuckDB.""" # Read the CSV (path is relative to container working directory) df = pd.read_csv('config/agency_list.csv') # Write to DuckDB with duckdb.get_connection() as conn: conn.execute(""" CREATE OR REPLACE TABLE agency_list AS SELECT * FROM df """) @asset( deps=["agency_list"], group_name="gtfs_metadata", automation_condition=AutomationCondition.eager() ) def gtfs_feed_metadata( context: AssetExecutionContext, duckdb: DuckDBResource, mobility_db: MobilityDatabaseAPI ) -> Output[None]: """ Fetch GTFS feed metadata from Mobility Database API for all agencies and store in DuckDB. """ with duckdb.get_connection() as conn: # Create the metadata table if it doesn't exist conn.execute(""" CREATE TABLE IF NOT EXISTS gtfs_feed_metadata ( feed_id VARCHAR PRIMARY KEY, provider VARCHAR, status VARCHAR, official BOOLEAN, producer_url VARCHAR, authentication_type INTEGER, authentication_info_url VARCHAR, api_key_parameter_name VARCHAR, license_url VARCHAR, feed_contact_email VARCHAR, raw_json JSON, fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Get all GTFS feed IDs from agency_list feed_ids = conn.execute(""" SELECT DISTINCT GTFS as feed_id FROM agency_list WHERE GTFS IS NOT NULL AND GTFS != '' """).fetchall() context.log.info(f"Found {len(feed_ids)} feeds to fetch") successful = 0 failed = 0 for (feed_id,) in feed_ids: try: feed_info = mobility_db.get_feed_info(feed_id) # Extract relevant fields source_info = feed_info.get("source_info", {}) # Insert or update the record conn.execute(""" INSERT OR REPLACE INTO gtfs_feed_metadata ( feed_id, provider, status, official, producer_url, authentication_type, authentication_info_url, api_key_parameter_name, license_url, feed_contact_email, raw_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ feed_id, feed_info.get("provider"), feed_info.get("status"), feed_info.get("official"), source_info.get("producer_url"), source_info.get("authentication_type"), source_info.get("authentication_info_url"), source_info.get("api_key_parameter_name"), source_info.get("license_url"), feed_info.get("feed_contact_email"), json.dumps(feed_info) ]) context.log.info(f"✓ Fetched and stored metadata for {feed_id}") successful += 1 except Exception as e: context.log.error(f"✗ Failed to fetch {feed_id}: {e}") failed += 1 # Get summary stats total_records = conn.execute( "SELECT COUNT(*) FROM gtfs_feed_metadata" ).fetchone()[0] # Get preview for metadata preview_df = conn.execute(""" SELECT feed_id, provider, status, producer_url FROM gtfs_feed_metadata LIMIT 5 """).df() return Output( None, metadata={ "total_feeds": len(feed_ids), "successful": successful, "failed": failed, "total_records_in_db": total_records, "preview": MetadataValue.md(preview_df.to_markdown(index=False)) } )