import pandas as pd from dagster import ( asset, AssetExecutionContext, Output, MetadataValue, AutomationCondition, DynamicPartitionsDefinition, sensor, RunRequest, SkipReason, SensorEvaluationContext, DefaultSensorStatus, Config ) from dagster_duckdb import DuckDBResource from resources import MobilityDatabaseAPI import json import requests from pathlib import Path from datetime import datetime, timedelta import logging from typing import Optional @asset( group_name="config", ) def agency_list(duckdb: DuckDBResource) -> None: """Load agency list from CSV into DuckDB.""" # Read the CSV (path is relative to container working directory) df = pd.read_csv('config/agency_list.csv') # Write to DuckDB with duckdb.get_connection() as conn: conn.execute(""" CREATE OR REPLACE TABLE agency_list AS SELECT * FROM df """) @asset( deps=["agency_list"], group_name="gtfs_metadata", automation_condition=AutomationCondition.eager() ) def gtfs_feed_metadata( context: AssetExecutionContext, duckdb: DuckDBResource, mobility_db: MobilityDatabaseAPI ) -> Output[None]: """ Fetch GTFS feed metadata from Mobility Database API for all agencies and store in DuckDB. """ with duckdb.get_connection() as conn: # Create the metadata table if it doesn't exist conn.execute(""" CREATE TABLE IF NOT EXISTS gtfs_feed_metadata ( feed_id VARCHAR PRIMARY KEY, provider VARCHAR, status VARCHAR, official BOOLEAN, producer_url VARCHAR, authentication_type INTEGER, authentication_info_url VARCHAR, api_key_parameter_name VARCHAR, license_url VARCHAR, feed_contact_email VARCHAR, raw_json JSON, fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Get all GTFS feed IDs from agency_list feed_ids = conn.execute(""" SELECT DISTINCT GTFS as feed_id FROM agency_list WHERE GTFS IS NOT NULL AND GTFS != '' """).fetchall() context.log.info(f"Found {len(feed_ids)} feeds to fetch") successful = 0 failed = 0 for (feed_id,) in feed_ids: try: feed_info = mobility_db.get_feed_info(feed_id) # Extract relevant fields source_info = feed_info.get("source_info", {}) # Insert or update the record conn.execute(""" INSERT OR REPLACE INTO gtfs_feed_metadata ( feed_id, provider, status, official, producer_url, authentication_type, authentication_info_url, api_key_parameter_name, license_url, feed_contact_email, raw_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ feed_id, feed_info.get("provider"), feed_info.get("status"), feed_info.get("official"), source_info.get("producer_url"), source_info.get("authentication_type"), source_info.get("authentication_info_url"), source_info.get("api_key_parameter_name"), source_info.get("license_url"), feed_info.get("feed_contact_email"), json.dumps(feed_info) ]) context.log.info(f"✓ Fetched and stored metadata for {feed_id}") successful += 1 except Exception as e: context.log.error(f"✗ Failed to fetch {feed_id}: {e}") failed += 1 # Get summary stats total_records = conn.execute( "SELECT COUNT(*) FROM gtfs_feed_metadata" ).fetchone()[0] # Get preview for metadata preview_df = conn.execute(""" SELECT feed_id, provider, status, producer_url FROM gtfs_feed_metadata LIMIT 5 """).df() return Output( None, metadata={ "total_feeds": len(feed_ids), "successful": successful, "failed": failed, "total_records_in_db": total_records, "preview": MetadataValue.md(preview_df.to_markdown(index=False)) } ) logger = logging.getLogger(__name__) # Dynamic partition definition for GTFS feeds gtfs_feeds_partitions_def = DynamicPartitionsDefinition(name="gtfs_feeds") @asset( deps=["gtfs_feed_metadata"], group_name="gtfs_metadata", automation_condition=AutomationCondition.eager() ) def gtfs_feed_partitions( context: AssetExecutionContext, duckdb: DuckDBResource, ) -> Output[None]: """ Update the dynamic partitions based on feeds in gtfs_feed_metadata table. Creates one partition per feed_id. """ with duckdb.get_connection() as conn: feed_ids = conn.execute(""" SELECT feed_id FROM gtfs_feed_metadata WHERE producer_url IS NOT NULL AND producer_url != '' ORDER BY feed_id """).fetchall() feed_id_list = [feed_id for (feed_id,) in feed_ids] # Update the dynamic partitions context.instance.add_dynamic_partitions( partitions_def_name="gtfs_feeds", partition_keys=feed_id_list ) context.log.info(f"Updated partitions with {len(feed_id_list)} feeds") return Output( None, metadata={ "feed_count": len(feed_id_list), "feeds": MetadataValue.md("\n".join(f"- {f}" for f in feed_id_list[:20])) } ) @asset( partitions_def=gtfs_feeds_partitions_def, deps=["gtfs_feed_partitions"], group_name="gtfs_downloads", automation_condition=AutomationCondition.on_cron("0 * * * *") | AutomationCondition.eager(), ) def gtfs_feed_downloads( context: AssetExecutionContext, duckdb: DuckDBResource, ) -> Output[None]: """ Download GTFS feed for each agency partition. Only downloads if there's a new version available based on Last-Modified header. Files are saved to data/raw/gtfs// Runs on the hour and whenever new partitions are added. """ feed_id = context.partition_key with duckdb.get_connection() as conn: # Create download tracking table if it doesn't exist conn.execute(""" CREATE TABLE IF NOT EXISTS gtfs_download_history ( feed_id VARCHAR, download_url VARCHAR, last_modified TIMESTAMP, file_path VARCHAR, file_size_bytes BIGINT, downloaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (feed_id, last_modified) ) """) # Get the download URL for this feed feed_info = conn.execute(""" SELECT feed_id, provider, producer_url FROM gtfs_feed_metadata WHERE feed_id = ? """, [feed_id]).fetchone() if not feed_info: raise ValueError(f"Feed {feed_id} not found in metadata") feed_id, provider, download_url = feed_info if not download_url: context.log.warning(f"No download URL for {feed_id}") return Output(None, metadata={"status": "no_url"}) # Check the Last-Modified header without downloading the full file try: head_response = requests.head( download_url, timeout=30, allow_redirects=True ) head_response.raise_for_status() last_modified_str = head_response.headers.get('Last-Modified') if last_modified_str: last_modified = datetime.strptime( last_modified_str, '%a, %d %b %Y %H:%M:%S GMT' ) else: # If no Last-Modified header, use current time last_modified = datetime.now() context.log.warning(f"No Last-Modified header for {feed_id}, using current time") except Exception as e: context.log.error(f"Failed to check headers for {feed_id}: {e}") return Output(None, metadata={"status": "error", "error": str(e)}) # Check if we've already downloaded this version existing = conn.execute(""" SELECT file_path, downloaded_at FROM gtfs_download_history WHERE feed_id = ? AND last_modified = ? ORDER BY downloaded_at DESC LIMIT 1 """, [feed_id, last_modified]).fetchone() if existing: file_path, downloaded_at = existing context.log.info( f"Already have latest version of {feed_id} " f"(modified: {last_modified}, downloaded: {downloaded_at})" ) return Output( None, metadata={ "status": "up_to_date", "last_modified": last_modified.isoformat(), "existing_file": file_path, "downloaded_at": downloaded_at.isoformat() } ) # Download the file context.log.info(f"Downloading new version of {feed_id} (modified: {last_modified})") # Create directory structure: data/raw/gtfs// feed_dir = Path(f"data/raw/gtfs/{feed_id}") feed_dir.mkdir(parents=True, exist_ok=True) # Filename format: yyyy-mm-dd-gtfs.zip (using last_modified date) filename = f"{last_modified.strftime('%Y-%m-%d-%H%M%S')}-gtfs.zip" file_path = feed_dir / filename try: response = requests.get(download_url, timeout=120, stream=True) response.raise_for_status() # Write file in chunks to handle large files with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) file_size = file_path.stat().st_size # Record the download in history conn.execute(""" INSERT INTO gtfs_download_history ( feed_id, download_url, last_modified, file_path, file_size_bytes ) VALUES (?, ?, ?, ?, ?) """, [feed_id, download_url, last_modified, str(file_path), file_size]) context.log.info( f"✓ Downloaded {feed_id} to {file_path} ({file_size:,} bytes)" ) return Output( None, metadata={ "status": "downloaded", "file_path": str(file_path), "file_size_mb": round(file_size / 1024 / 1024, 2), "last_modified": last_modified.isoformat(), "provider": provider, "download_url": download_url, } ) except Exception as e: context.log.error(f"Failed to download {feed_id}: {e}") # Clean up partial file if it exists if file_path.exists(): file_path.unlink() return Output( None, metadata={ "status": "error", "error": str(e), "feed_id": feed_id } ) @asset( deps=["agency_list"], group_name="gtfs_rt_metadata", automation_condition=AutomationCondition.eager() ) def gtfs_rt_vehicles_metadata( context: AssetExecutionContext, duckdb: DuckDBResource, mobility_db: MobilityDatabaseAPI ) -> Output[None]: """ Fetch GTFS-RT vehicle feed metadata from Mobility Database API for all agencies and store in DuckDB. Create the download history table in DuckDB """ with duckdb.get_connection() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS gtfs_rt_vehicles_metadata ( feed_id VARCHAR PRIMARY KEY, provider VARCHAR, status VARCHAR, official BOOLEAN, producer_url VARCHAR, authentication_type INTEGER, authentication_info_url VARCHAR, api_key_parameter_name VARCHAR, license_url VARCHAR, feed_contact_email VARCHAR, raw_json JSON, fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Create the download history table conn.execute(""" CREATE TABLE IF NOT EXISTS gtfs_rt_vehicles_download_history ( feed_id VARCHAR, download_url VARCHAR, last_modified TIMESTAMP, file_path VARCHAR, file_size_bytes BIGINT, downloaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (feed_id, downloaded_at) ) """) # Get all GTFS-RT vehicle feed IDs from agency_list feed_ids = conn.execute(""" SELECT DISTINCT "GTFS-RT_vehicles" as feed_id FROM agency_list WHERE "GTFS-RT_vehicles" IS NOT NULL AND "GTFS-RT_vehicles" != '' """).fetchall() context.log.info(f"Found {len(feed_ids)} GTFS-RT vehicle feeds to fetch") successful = 0 failed = 0 for (feed_id,) in feed_ids: try: feed_info = mobility_db.get_feed_info(feed_id) # Extract relevant fields source_info = feed_info.get("source_info", {}) # Insert or update the record conn.execute(""" INSERT OR REPLACE INTO gtfs_rt_vehicles_metadata ( feed_id, provider, status, official, producer_url, authentication_type, authentication_info_url, api_key_parameter_name, license_url, feed_contact_email, raw_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ feed_id, feed_info.get("provider"), feed_info.get("status"), feed_info.get("official"), source_info.get("producer_url"), source_info.get("authentication_type"), source_info.get("authentication_info_url"), source_info.get("api_key_parameter_name"), source_info.get("license_url"), feed_info.get("feed_contact_email"), json.dumps(feed_info) ]) context.log.info(f"✓ Fetched and stored metadata for {feed_id}") successful += 1 except Exception as e: context.log.error(f"✗ Failed to fetch {feed_id}: {e}") failed += 1 # Get summary stats total_records = conn.execute( "SELECT COUNT(*) FROM gtfs_rt_vehicles_metadata" ).fetchone()[0] # Get preview for metadata preview_df = conn.execute(""" SELECT feed_id, provider, status, producer_url FROM gtfs_rt_vehicles_metadata LIMIT 5 """).df() return Output( None, metadata={ "total_feeds": len(feed_ids), "successful": successful, "failed": failed, "total_records_in_db": total_records, "preview": MetadataValue.md(preview_df.to_markdown(index=False)) } ) # Dynamic partition definition for GTFS-RT vehicle feeds gtfs_rt_vehicles_partitions_def = DynamicPartitionsDefinition(name="gtfs_rt_vehicles") @asset( deps=["gtfs_rt_vehicles_metadata"], group_name="gtfs_rt_metadata", automation_condition=AutomationCondition.eager() ) def gtfs_rt_vehicles_partitions( context: AssetExecutionContext, duckdb: DuckDBResource, ) -> Output[None]: """ Update the dynamic partitions based on feeds in gtfs_rt_vehicles_metadata table. Creates one partition per feed_id. """ with duckdb.get_connection() as conn: feed_ids = conn.execute(""" SELECT feed_id FROM gtfs_rt_vehicles_metadata WHERE producer_url IS NOT NULL AND producer_url != '' ORDER BY feed_id """).fetchall() feed_id_list = [feed_id for (feed_id,) in feed_ids] # Update the dynamic partitions context.instance.add_dynamic_partitions( partitions_def_name="gtfs_rt_vehicles", partition_keys=feed_id_list ) context.log.info(f"Updated partitions with {len(feed_id_list)} GTFS-RT vehicle feeds") return Output( None, metadata={ "feed_count": len(feed_id_list), "feeds": MetadataValue.md("\n".join(f"- {f}" for f in feed_id_list[:20])) } ) class GTFSRTDownloadConfig(Config): provider: str producer_url: str @asset( partitions_def=gtfs_rt_vehicles_partitions_def, deps=[gtfs_rt_vehicles_partitions], group_name="gtfs_rt_downloads", ) def gtfs_rt_vehicles_downloads( context: AssetExecutionContext, config: GTFSRTDownloadConfig, duckdb: DuckDBResource, ) -> Output[None]: """ Download GTFS-RT vehicle feed for each agency partition. Only downloads if the Last-Modified date has changed from the previous download. Files are saved to data/raw/gtfs-rt/vehicles//YYYY/MM/DD/.pb Filename and directory structure use the Last-Modified timestamp. """ feed_id = context.partition_key download_url = config.producer_url provider = config.provider with duckdb.get_connection() as conn: # Check the Last-Modified header without downloading the full file try: head_response = requests.head( download_url, timeout=30, allow_redirects=True ) head_response.raise_for_status() last_modified_str = head_response.headers.get('Last-Modified') if last_modified_str: last_modified = datetime.strptime( last_modified_str, '%a, %d %b %Y %H:%M:%S GMT' ) else: # If no Last-Modified header, use current time (will always download) last_modified = datetime.now() context.log.warning(f"No Last-Modified header for {feed_id}, will download") except Exception as e: context.log.error(f"Failed to check headers for {feed_id}: {e}") return Output(None, metadata={"status": "error", "error": str(e)}) # Check if we've already downloaded this version existing = conn.execute(""" SELECT file_path, downloaded_at FROM gtfs_rt_vehicles_download_history WHERE feed_id = ? AND last_modified = ? ORDER BY downloaded_at DESC LIMIT 1 """, [feed_id, last_modified]).fetchone() if existing: file_path, downloaded_at = existing context.log.info( f"Already have latest version of {feed_id} " f"(modified: {last_modified}, downloaded: {downloaded_at})" ) return Output( None, metadata={ "status": "up_to_date", "last_modified": last_modified.isoformat(), "existing_file": file_path, "downloaded_at": downloaded_at.isoformat(), "provider": provider, } ) # Download the file download_time = datetime.now() context.log.info(f"Downloading new version of {feed_id} (modified: {last_modified})") # Create directory structure using last_modified date: data/raw/gtfs-rt/vehicles//YYYY/MM/DD feed_dir = Path(f"data/raw/gtfs-rt/vehicles/{feed_id}/{last_modified.strftime('%Y/%m/%d')}") feed_dir.mkdir(parents=True, exist_ok=True) # Filename format: HHMMSS.pb (using last_modified time) filename = f"{last_modified.strftime('%Y%m%d%H%M%S')}.pb" file_path = feed_dir / filename try: response = requests.get(download_url, timeout=30) response.raise_for_status() # Write the protobuf data with open(file_path, 'wb') as f: f.write(response.content) file_size = file_path.stat().st_size # Record the download in history conn.execute(""" INSERT INTO gtfs_rt_vehicles_download_history ( feed_id, download_url, last_modified, file_path, file_size_bytes, downloaded_at ) VALUES (?, ?, ?, ?, ?, ?) """, [feed_id, download_url, last_modified, str(file_path), file_size, download_time]) return Output( None, metadata={ "status": "downloaded", "file_path": str(file_path), "file_size_kb": round(file_size / 1024, 2), "last_modified": last_modified.isoformat(), "downloaded_at": download_time.isoformat(), "provider": provider, "download_url": download_url } ) except Exception as e: context.log.error(f"Failed to download {feed_id}: {e}") # Clean up partial file if it exists if file_path.exists(): file_path.unlink() return Output( None, metadata={ "status": "error", "error": str(e), "feed_id": feed_id } ) @sensor( name="gtfs_rt_vehicles_sensor", minimum_interval_seconds=60, asset_selection=[gtfs_rt_vehicles_downloads], default_status=DefaultSensorStatus.RUNNING ) def gtfs_rt_vehicles_sensor( context: SensorEvaluationContext, duckdb: DuckDBResource, ) -> list[RunRequest] | SkipReason: """ Sensor that triggers gtfs_rt_vehicles_downloads every 60 seconds. Fetches feed metadata once and passes it to each partition run. """ with duckdb.get_connection() as conn: # Get all active feeds with their metadata in one query feeds = conn.execute(""" SELECT feed_id, provider, producer_url FROM gtfs_rt_vehicles_metadata WHERE producer_url IS NOT NULL AND producer_url != '' ORDER BY feed_id """).fetchall() if not feeds: return SkipReason("No GTFS-RT vehicle feeds configured") # Create a RunRequest for each partition with metadata run_requests = [ RunRequest( partition_key=feed_id, run_config={ "ops": { "gtfs_rt_vehicles_downloads": { "config": { "provider": provider, "producer_url": producer_url, } } } }, tags={ "feed_id": feed_id, "sensor": "gtfs_rt_vehicles_sensor" } ) for feed_id, provider, producer_url in feeds ] context.log.info(f"Triggering downloads for {len(run_requests)} GTFS-RT vehicle feeds") return run_requests