diff --git a/user_code/assets.py b/user_code/assets.py deleted file mode 100644 index 0f19d7d..0000000 --- a/user_code/assets.py +++ /dev/null @@ -1,704 +0,0 @@ -import pandas as pd -from dagster import ( - asset, - AssetExecutionContext, - Output, - MetadataValue, - AutomationCondition, - DynamicPartitionsDefinition, - sensor, - RunRequest, - SkipReason, - SensorEvaluationContext, - DefaultSensorStatus, - Config -) -from dagster_duckdb import DuckDBResource -from resources import MobilityDatabaseAPI -import json -import requests -from pathlib import Path -from datetime import datetime, timedelta -import logging -from typing import Optional - - -@asset( - group_name="config", - ) -def agency_list(duckdb: DuckDBResource) -> None: - """Load agency list from CSV into DuckDB.""" - - # Read the CSV (path is relative to container working directory) - df = pd.read_csv('config/agency_list.csv') - - # Write to DuckDB - with duckdb.get_connection() as conn: - conn.execute(""" - CREATE OR REPLACE TABLE agency_list AS - SELECT * FROM df - """) - - -@asset( - deps=["agency_list"], - group_name="gtfs_metadata", - automation_condition=AutomationCondition.eager() -) -def gtfs_feed_metadata( - context: AssetExecutionContext, - duckdb: DuckDBResource, - mobility_db: MobilityDatabaseAPI -) -> Output[None]: - """ - Fetch GTFS feed metadata from Mobility Database API for all agencies - and store in DuckDB. - """ - - with duckdb.get_connection() as conn: - # Create the metadata table if it doesn't exist - conn.execute(""" - CREATE TABLE IF NOT EXISTS gtfs_feed_metadata ( - feed_id VARCHAR PRIMARY KEY, - provider VARCHAR, - status VARCHAR, - official BOOLEAN, - producer_url VARCHAR, - authentication_type INTEGER, - authentication_info_url VARCHAR, - api_key_parameter_name VARCHAR, - license_url VARCHAR, - feed_contact_email VARCHAR, - raw_json JSON, - fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """) - - # Get all GTFS feed IDs from agency_list - feed_ids = conn.execute(""" - SELECT DISTINCT GTFS as feed_id - FROM agency_list - WHERE GTFS IS NOT NULL AND GTFS != '' - """).fetchall() - - context.log.info(f"Found {len(feed_ids)} feeds to fetch") - - successful = 0 - failed = 0 - - for (feed_id,) in feed_ids: - try: - feed_info = mobility_db.get_feed_info(feed_id) - - # Extract relevant fields - source_info = feed_info.get("source_info", {}) - - # Insert or update the record - conn.execute(""" - INSERT OR REPLACE INTO gtfs_feed_metadata ( - feed_id, - provider, - status, - official, - producer_url, - authentication_type, - authentication_info_url, - api_key_parameter_name, - license_url, - feed_contact_email, - raw_json - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, [ - feed_id, - feed_info.get("provider"), - feed_info.get("status"), - feed_info.get("official"), - source_info.get("producer_url"), - source_info.get("authentication_type"), - source_info.get("authentication_info_url"), - source_info.get("api_key_parameter_name"), - source_info.get("license_url"), - feed_info.get("feed_contact_email"), - json.dumps(feed_info) - ]) - - context.log.info(f"✓ Fetched and stored metadata for {feed_id}") - successful += 1 - - except Exception as e: - context.log.error(f"✗ Failed to fetch {feed_id}: {e}") - failed += 1 - - # Get summary stats - total_records = conn.execute( - "SELECT COUNT(*) FROM gtfs_feed_metadata" - ).fetchone()[0] - - # Get preview for metadata - preview_df = conn.execute(""" - SELECT feed_id, provider, status, producer_url - FROM gtfs_feed_metadata - LIMIT 5 - """).df() - - return Output( - None, - metadata={ - "total_feeds": len(feed_ids), - "successful": successful, - "failed": failed, - "total_records_in_db": total_records, - "preview": MetadataValue.md(preview_df.to_markdown(index=False)) - } - ) -logger = logging.getLogger(__name__) - -# Dynamic partition definition for GTFS feeds -gtfs_feeds_partitions_def = DynamicPartitionsDefinition(name="gtfs_feeds") - -@asset( - deps=["gtfs_feed_metadata"], - group_name="gtfs_metadata", - automation_condition=AutomationCondition.eager() -) -def gtfs_feed_partitions( - context: AssetExecutionContext, - duckdb: DuckDBResource, -) -> Output[None]: - """ - Update the dynamic partitions based on feeds in gtfs_feed_metadata table. - Creates one partition per feed_id. - """ - with duckdb.get_connection() as conn: - feed_ids = conn.execute(""" - SELECT feed_id - FROM gtfs_feed_metadata - WHERE producer_url IS NOT NULL AND producer_url != '' - ORDER BY feed_id - """).fetchall() - - feed_id_list = [feed_id for (feed_id,) in feed_ids] - - # Update the dynamic partitions - context.instance.add_dynamic_partitions( - partitions_def_name="gtfs_feeds", - partition_keys=feed_id_list - ) - - context.log.info(f"Updated partitions with {len(feed_id_list)} feeds") - - return Output( - None, - metadata={ - "feed_count": len(feed_id_list), - "feeds": MetadataValue.md("\n".join(f"- {f}" for f in feed_id_list[:20])) - } - ) - - -@asset( - partitions_def=gtfs_feeds_partitions_def, - deps=["gtfs_feed_partitions"], - group_name="gtfs_downloads", - automation_condition=AutomationCondition.on_cron("0 * * * *") | AutomationCondition.eager(), -) -def gtfs_feed_downloads( - context: AssetExecutionContext, - duckdb: DuckDBResource, -) -> Output[None]: - """ - Download GTFS feed for each agency partition. - Only downloads if there's a new version available based on Last-Modified header. - Files are saved to data/raw/gtfs// - Runs on the hour and whenever new partitions are added. - """ - feed_id = context.partition_key - - with duckdb.get_connection() as conn: - # Create download tracking table if it doesn't exist - conn.execute(""" - CREATE TABLE IF NOT EXISTS gtfs_download_history ( - feed_id VARCHAR, - download_url VARCHAR, - last_modified TIMESTAMP, - file_path VARCHAR, - file_size_bytes BIGINT, - downloaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (feed_id, last_modified) - ) - """) - - # Get the download URL for this feed - feed_info = conn.execute(""" - SELECT feed_id, provider, producer_url - FROM gtfs_feed_metadata - WHERE feed_id = ? - """, [feed_id]).fetchone() - - if not feed_info: - raise ValueError(f"Feed {feed_id} not found in metadata") - - feed_id, provider, download_url = feed_info - - if not download_url: - context.log.warning(f"No download URL for {feed_id}") - return Output(None, metadata={"status": "no_url"}) - - # Check the Last-Modified header without downloading the full file - try: - head_response = requests.head( - download_url, - timeout=30, - allow_redirects=True - ) - head_response.raise_for_status() - - last_modified_str = head_response.headers.get('Last-Modified') - if last_modified_str: - last_modified = datetime.strptime( - last_modified_str, - '%a, %d %b %Y %H:%M:%S GMT' - ) - else: - # If no Last-Modified header, use current time - last_modified = datetime.now() - context.log.warning(f"No Last-Modified header for {feed_id}, using current time") - - except Exception as e: - context.log.error(f"Failed to check headers for {feed_id}: {e}") - return Output(None, metadata={"status": "error", "error": str(e)}) - - # Check if we've already downloaded this version - existing = conn.execute(""" - SELECT file_path, downloaded_at - FROM gtfs_download_history - WHERE feed_id = ? AND last_modified = ? - ORDER BY downloaded_at DESC - LIMIT 1 - """, [feed_id, last_modified]).fetchone() - - if existing: - file_path, downloaded_at = existing - context.log.info( - f"Already have latest version of {feed_id} " - f"(modified: {last_modified}, downloaded: {downloaded_at})" - ) - return Output( - None, - metadata={ - "status": "up_to_date", - "last_modified": last_modified.isoformat(), - "existing_file": file_path, - "downloaded_at": downloaded_at.isoformat() - } - ) - - # Download the file - context.log.info(f"Downloading new version of {feed_id} (modified: {last_modified})") - - # Create directory structure: data/raw/gtfs// - feed_dir = Path(f"data/raw/gtfs/{feed_id}") - feed_dir.mkdir(parents=True, exist_ok=True) - - # Filename format: yyyy-mm-dd-gtfs.zip (using last_modified date) - filename = f"{last_modified.strftime('%Y-%m-%d-%H%M%S')}-gtfs.zip" - file_path = feed_dir / filename - - try: - response = requests.get(download_url, timeout=120, stream=True) - response.raise_for_status() - - # Write file in chunks to handle large files - with open(file_path, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - - file_size = file_path.stat().st_size - - # Record the download in history - conn.execute(""" - INSERT INTO gtfs_download_history ( - feed_id, download_url, last_modified, file_path, file_size_bytes - ) VALUES (?, ?, ?, ?, ?) - """, [feed_id, download_url, last_modified, str(file_path), file_size]) - - context.log.info( - f"✓ Downloaded {feed_id} to {file_path} ({file_size:,} bytes)" - ) - - return Output( - None, - metadata={ - "status": "downloaded", - "file_path": str(file_path), - "file_size_mb": round(file_size / 1024 / 1024, 2), - "last_modified": last_modified.isoformat(), - "provider": provider, - "download_url": download_url, - } - ) - - except Exception as e: - context.log.error(f"Failed to download {feed_id}: {e}") - # Clean up partial file if it exists - if file_path.exists(): - file_path.unlink() - return Output( - None, - metadata={ - "status": "error", - "error": str(e), - "feed_id": feed_id - } - ) - -@asset( - deps=["agency_list"], - group_name="gtfs_rt_metadata", - automation_condition=AutomationCondition.eager() -) -def gtfs_rt_vehicles_metadata( - context: AssetExecutionContext, - duckdb: DuckDBResource, - mobility_db: MobilityDatabaseAPI -) -> Output[None]: - """ - Fetch GTFS-RT vehicle feed metadata from Mobility Database API for all agencies and store in DuckDB. Create the download history table in DuckDB - """ - - with duckdb.get_connection() as conn: - conn.execute(""" - CREATE TABLE IF NOT EXISTS gtfs_rt_vehicles_metadata ( - feed_id VARCHAR PRIMARY KEY, - provider VARCHAR, - status VARCHAR, - official BOOLEAN, - producer_url VARCHAR, - authentication_type INTEGER, - authentication_info_url VARCHAR, - api_key_parameter_name VARCHAR, - license_url VARCHAR, - feed_contact_email VARCHAR, - raw_json JSON, - fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """) - - # Create the download history table - conn.execute(""" - CREATE TABLE IF NOT EXISTS gtfs_rt_vehicles_download_history ( - feed_id VARCHAR, - download_url VARCHAR, - last_modified TIMESTAMP, - file_path VARCHAR, - file_size_bytes BIGINT, - downloaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (feed_id, downloaded_at) - ) - """) - - # Get all GTFS-RT vehicle feed IDs from agency_list - feed_ids = conn.execute(""" - SELECT DISTINCT "GTFS-RT_vehicles" as feed_id - FROM agency_list - WHERE "GTFS-RT_vehicles" IS NOT NULL AND "GTFS-RT_vehicles" != '' - """).fetchall() - - context.log.info(f"Found {len(feed_ids)} GTFS-RT vehicle feeds to fetch") - - successful = 0 - failed = 0 - - for (feed_id,) in feed_ids: - try: - feed_info = mobility_db.get_feed_info(feed_id) - - # Extract relevant fields - source_info = feed_info.get("source_info", {}) - - # Insert or update the record - conn.execute(""" - INSERT OR REPLACE INTO gtfs_rt_vehicles_metadata ( - feed_id, - provider, - status, - official, - producer_url, - authentication_type, - authentication_info_url, - api_key_parameter_name, - license_url, - feed_contact_email, - raw_json - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, [ - feed_id, - feed_info.get("provider"), - feed_info.get("status"), - feed_info.get("official"), - source_info.get("producer_url"), - source_info.get("authentication_type"), - source_info.get("authentication_info_url"), - source_info.get("api_key_parameter_name"), - source_info.get("license_url"), - feed_info.get("feed_contact_email"), - json.dumps(feed_info) - ]) - - context.log.info(f"✓ Fetched and stored metadata for {feed_id}") - successful += 1 - - except Exception as e: - context.log.error(f"✗ Failed to fetch {feed_id}: {e}") - failed += 1 - - # Get summary stats - total_records = conn.execute( - "SELECT COUNT(*) FROM gtfs_rt_vehicles_metadata" - ).fetchone()[0] - - # Get preview for metadata - preview_df = conn.execute(""" - SELECT feed_id, provider, status, producer_url - FROM gtfs_rt_vehicles_metadata - LIMIT 5 - """).df() - - return Output( - None, - metadata={ - "total_feeds": len(feed_ids), - "successful": successful, - "failed": failed, - "total_records_in_db": total_records, - "preview": MetadataValue.md(preview_df.to_markdown(index=False)) - } - ) - -# Dynamic partition definition for GTFS-RT vehicle feeds -gtfs_rt_vehicles_partitions_def = DynamicPartitionsDefinition(name="gtfs_rt_vehicles") - - -@asset( - deps=["gtfs_rt_vehicles_metadata"], - group_name="gtfs_rt_metadata", - automation_condition=AutomationCondition.eager() -) -def gtfs_rt_vehicles_partitions( - context: AssetExecutionContext, - duckdb: DuckDBResource, -) -> Output[None]: - """ - Update the dynamic partitions based on feeds in gtfs_rt_vehicles_metadata table. - Creates one partition per feed_id. - """ - with duckdb.get_connection() as conn: - feed_ids = conn.execute(""" - SELECT feed_id - FROM gtfs_rt_vehicles_metadata - WHERE producer_url IS NOT NULL AND producer_url != '' - ORDER BY feed_id - """).fetchall() - - feed_id_list = [feed_id for (feed_id,) in feed_ids] - - # Update the dynamic partitions - context.instance.add_dynamic_partitions( - partitions_def_name="gtfs_rt_vehicles", - partition_keys=feed_id_list - ) - - context.log.info(f"Updated partitions with {len(feed_id_list)} GTFS-RT vehicle feeds") - - return Output( - None, - metadata={ - "feed_count": len(feed_id_list), - "feeds": MetadataValue.md("\n".join(f"- {f}" for f in feed_id_list[:20])) - } - ) - -class GTFSRTDownloadConfig(Config): - provider: str - producer_url: str - -@asset( - partitions_def=gtfs_rt_vehicles_partitions_def, - deps=[gtfs_rt_vehicles_partitions], - group_name="gtfs_rt_downloads", -) -def gtfs_rt_vehicles_downloads( - context: AssetExecutionContext, - config: GTFSRTDownloadConfig, - duckdb: DuckDBResource, -) -> Output[None]: - """ - Download GTFS-RT vehicle feed for each agency partition. - Only downloads if the Last-Modified date has changed from the previous download. - Files are saved to data/raw/gtfs-rt/vehicles//YYYY/MM/DD/.pb - Filename and directory structure use the Last-Modified timestamp. - """ - feed_id = context.partition_key - download_url = config.producer_url - provider = config.provider - - with duckdb.get_connection() as conn: - - # Check the Last-Modified header without downloading the full file - try: - head_response = requests.head( - download_url, - timeout=30, - allow_redirects=True - ) - head_response.raise_for_status() - - last_modified_str = head_response.headers.get('Last-Modified') - if last_modified_str: - last_modified = datetime.strptime( - last_modified_str, - '%a, %d %b %Y %H:%M:%S GMT' - ) - else: - # If no Last-Modified header, use current time (will always download) - last_modified = datetime.now() - context.log.warning(f"No Last-Modified header for {feed_id}, will download") - - except Exception as e: - context.log.error(f"Failed to check headers for {feed_id}: {e}") - return Output(None, metadata={"status": "error", "error": str(e)}) - - # Check if we've already downloaded this version - existing = conn.execute(""" - SELECT file_path, downloaded_at - FROM gtfs_rt_vehicles_download_history - WHERE feed_id = ? AND last_modified = ? - ORDER BY downloaded_at DESC - LIMIT 1 - """, [feed_id, last_modified]).fetchone() - - if existing: - file_path, downloaded_at = existing - context.log.info( - f"Already have latest version of {feed_id} " - f"(modified: {last_modified}, downloaded: {downloaded_at})" - ) - return Output( - None, - metadata={ - "status": "up_to_date", - "last_modified": last_modified.isoformat(), - "existing_file": file_path, - "downloaded_at": downloaded_at.isoformat(), - "provider": provider, - } - ) - - # Download the file - download_time = datetime.now() - - context.log.info(f"Downloading new version of {feed_id} (modified: {last_modified})") - - # Create directory structure using last_modified date: data/raw/gtfs-rt/vehicles//YYYY/MM/DD - feed_dir = Path(f"data/raw/gtfs-rt/vehicles/{feed_id}/{last_modified.strftime('%Y/%m/%d')}") - feed_dir.mkdir(parents=True, exist_ok=True) - - # Filename format: HHMMSS.pb (using last_modified time) - filename = f"{last_modified.strftime('%Y%m%d%H%M%S')}.pb" - file_path = feed_dir / filename - - try: - response = requests.get(download_url, timeout=30) - response.raise_for_status() - - # Write the protobuf data - with open(file_path, 'wb') as f: - f.write(response.content) - - file_size = file_path.stat().st_size - - # Record the download in history - conn.execute(""" - INSERT INTO gtfs_rt_vehicles_download_history ( - feed_id, download_url, last_modified, file_path, file_size_bytes, downloaded_at - ) VALUES (?, ?, ?, ?, ?, ?) - """, [feed_id, download_url, last_modified, str(file_path), file_size, download_time]) - - return Output( - None, - metadata={ - "status": "downloaded", - "file_path": str(file_path), - "file_size_kb": round(file_size / 1024, 2), - "last_modified": last_modified.isoformat(), - "downloaded_at": download_time.isoformat(), - "provider": provider, - "download_url": download_url - } - ) - - except Exception as e: - context.log.error(f"Failed to download {feed_id}: {e}") - # Clean up partial file if it exists - if file_path.exists(): - file_path.unlink() - return Output( - None, - metadata={ - "status": "error", - "error": str(e), - "feed_id": feed_id - } - ) - -@sensor( - name="gtfs_rt_vehicles_sensor", - minimum_interval_seconds=60, - asset_selection=[gtfs_rt_vehicles_downloads], - default_status=DefaultSensorStatus.RUNNING -) -def gtfs_rt_vehicles_sensor( - context: SensorEvaluationContext, - duckdb: DuckDBResource, -) -> list[RunRequest] | SkipReason: - """ - Sensor that triggers gtfs_rt_vehicles_downloads every 60 seconds. - Fetches feed metadata once and passes it to each partition run. - """ - with duckdb.get_connection() as conn: - # Get all active feeds with their metadata in one query - feeds = conn.execute(""" - SELECT feed_id, provider, producer_url - FROM gtfs_rt_vehicles_metadata - WHERE producer_url IS NOT NULL AND producer_url != '' - ORDER BY feed_id - """).fetchall() - - if not feeds: - return SkipReason("No GTFS-RT vehicle feeds configured") - - # Create a RunRequest for each partition with metadata - run_requests = [ - RunRequest( - partition_key=feed_id, - run_config={ - "ops": { - "gtfs_rt_vehicles_downloads": { - "config": { - "provider": provider, - "producer_url": producer_url, - } - } - } - }, - tags={ - "feed_id": feed_id, - "sensor": "gtfs_rt_vehicles_sensor" - } - ) - for feed_id, provider, producer_url in feeds - ] - - context.log.info(f"Triggering downloads for {len(run_requests)} GTFS-RT vehicle feeds") - - return run_requests \ No newline at end of file diff --git a/user_code/assets/__init__.py b/user_code/assets/__init__.py new file mode 100644 index 0000000..212223f --- /dev/null +++ b/user_code/assets/__init__.py @@ -0,0 +1,23 @@ +""" +GTFS data pipeline assets. +""" +from .config import * +from .gtfs_static import * +from .gtfs_realtime import * + +__all__ = [ + # config assets + "agency_list", + + # GTFS assets + "gtfs_feed_metadata", + "gtfs_feed_partitions", + "gtfs_feeds_partitions_def", + "gtfs_feed_downloads", + + # GTFS-RT assets + "gtfs_rt_vehicles_metadata", + "gtfs_rt_vehicles_partitions", + "gtfs_rt_vehicles_partitions_def", + "gtfs_rt_vehicles_downloads", +] diff --git a/user_code/assets/config.py b/user_code/assets/config.py new file mode 100644 index 0000000..bd45f61 --- /dev/null +++ b/user_code/assets/config.py @@ -0,0 +1,23 @@ +import pandas as pd +from dagster import ( + asset, +) +from dagster_duckdb import DuckDBResource + + +@asset( + group_name="config", +) +def agency_list(duckdb: DuckDBResource) -> None: + """Load agency list from CSV into DuckDB.""" + + # Read the CSV (path is relative to container working directory) + df = pd.read_csv('config/agency_list.csv') + + # Write to DuckDB + with duckdb.get_connection() as conn: + conn.execute(""" + CREATE OR REPLACE TABLE agency_list AS + SELECT * FROM df + """) + diff --git a/user_code/assets/gtfs_realtime.py b/user_code/assets/gtfs_realtime.py new file mode 100644 index 0000000..c02bb1e --- /dev/null +++ b/user_code/assets/gtfs_realtime.py @@ -0,0 +1,322 @@ +from dagster import ( + asset, + AssetExecutionContext, + Output, + MetadataValue, + AutomationCondition, + DynamicPartitionsDefinition, + Config, +) +from dagster_duckdb import DuckDBResource +from resources import MobilityDatabaseAPI +import requests +from pathlib import Path +from datetime import datetime +import json +import logging + +logger = logging.getLogger(__name__) + + +@asset( + deps=["agency_list"], + group_name="gtfs_rt_metadata", + automation_condition=AutomationCondition.eager() +) +def gtfs_rt_vehicles_metadata( + context: AssetExecutionContext, + duckdb: DuckDBResource, + mobility_db: MobilityDatabaseAPI +) -> Output[None]: + """ + Fetch GTFS-RT vehicle feed metadata from Mobility Database API for all agencies + and store in DuckDB. Also creates the download history table. + """ + + with duckdb.get_connection() as conn: + # Create metadata table + conn.execute(""" + CREATE TABLE IF NOT EXISTS gtfs_rt_vehicles_metadata ( + feed_id VARCHAR PRIMARY KEY, + provider VARCHAR, + status VARCHAR, + official BOOLEAN, + producer_url VARCHAR, + authentication_type INTEGER, + authentication_info_url VARCHAR, + api_key_parameter_name VARCHAR, + license_url VARCHAR, + feed_contact_email VARCHAR, + raw_json JSON, + fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + + # Create download history table + conn.execute(""" + CREATE TABLE IF NOT EXISTS gtfs_rt_vehicles_download_history ( + feed_id VARCHAR, + download_url VARCHAR, + last_modified TIMESTAMP, + file_path VARCHAR, + file_size_bytes BIGINT, + downloaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (feed_id, downloaded_at) + ) + """) + + # Get all GTFS-RT vehicle feed IDs from agency_list + feed_ids = conn.execute(""" + SELECT DISTINCT "GTFS-RT_vehicles" as feed_id + FROM agency_list + WHERE "GTFS-RT_vehicles" IS NOT NULL AND "GTFS-RT_vehicles" != '' + """).fetchall() + + context.log.info(f"Found {len(feed_ids)} GTFS-RT vehicle feeds to fetch") + + successful = 0 + failed = 0 + + for (feed_id,) in feed_ids: + try: + feed_info = mobility_db.get_feed_info(feed_id) + + # Extract relevant fields + source_info = feed_info.get("source_info", {}) + + # Insert or update the record + conn.execute(""" + INSERT OR REPLACE INTO gtfs_rt_vehicles_metadata ( + feed_id, + provider, + status, + official, + producer_url, + authentication_type, + authentication_info_url, + api_key_parameter_name, + license_url, + feed_contact_email, + raw_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, [ + feed_id, + feed_info.get("provider"), + feed_info.get("status"), + feed_info.get("official"), + source_info.get("producer_url"), + source_info.get("authentication_type"), + source_info.get("authentication_info_url"), + source_info.get("api_key_parameter_name"), + source_info.get("license_url"), + feed_info.get("feed_contact_email"), + json.dumps(feed_info) + ]) + + context.log.info(f"✓ Fetched and stored metadata for {feed_id}") + successful += 1 + + except Exception as e: + context.log.error(f"✗ Failed to fetch {feed_id}: {e}") + failed += 1 + + # Get summary stats + total_records = conn.execute( + "SELECT COUNT(*) FROM gtfs_rt_vehicles_metadata" + ).fetchone()[0] + + # Get preview for metadata + preview_df = conn.execute(""" + SELECT feed_id, provider, status, producer_url + FROM gtfs_rt_vehicles_metadata + LIMIT 5 + """).df() + + return Output( + None, + metadata={ + "total_feeds": len(feed_ids), + "successful": successful, + "failed": failed, + "total_records_in_db": total_records, + "preview": MetadataValue.md(preview_df.to_markdown(index=False)) + } + ) + + +# Dynamic partition definition for GTFS-RT vehicle feeds +gtfs_rt_vehicles_partitions_def = DynamicPartitionsDefinition(name="gtfs_rt_vehicles") + + +@asset( + deps=["gtfs_rt_vehicles_metadata"], + group_name="gtfs_rt_metadata", + automation_condition=AutomationCondition.eager() +) +def gtfs_rt_vehicles_partitions( + context: AssetExecutionContext, + duckdb: DuckDBResource, +) -> Output[None]: + """ + Update the dynamic partitions based on feeds in gtfs_rt_vehicles_metadata table. + Creates one partition per feed_id. + """ + with duckdb.get_connection() as conn: + feed_ids = conn.execute(""" + SELECT feed_id + FROM gtfs_rt_vehicles_metadata + WHERE producer_url IS NOT NULL AND producer_url != '' + ORDER BY feed_id + """).fetchall() + + feed_id_list = [feed_id for (feed_id,) in feed_ids] + + # Update the dynamic partitions + context.instance.add_dynamic_partitions( + partitions_def_name="gtfs_rt_vehicles", + partition_keys=feed_id_list + ) + + context.log.info(f"Updated partitions with {len(feed_id_list)} GTFS-RT vehicle feeds") + + return Output( + None, + metadata={ + "feed_count": len(feed_id_list), + "feeds": MetadataValue.md("\n".join(f"- {f}" for f in feed_id_list[:20])) + } + ) + + +class GTFSRTDownloadConfig(Config): + provider: str + producer_url: str + + +@asset( + partitions_def=gtfs_rt_vehicles_partitions_def, + deps=[gtfs_rt_vehicles_partitions], + group_name="gtfs_rt_downloads", +) +def gtfs_rt_vehicles_downloads( + context: AssetExecutionContext, + config: GTFSRTDownloadConfig, + duckdb: DuckDBResource, +) -> Output[None]: + """ + Download GTFS-RT vehicle feed for each agency partition. + Only downloads if the Last-Modified date has changed from the previous download. + Files are saved to data/raw/gtfs-rt/vehicles//YYYY/MM/DD/.pb + Filename and directory structure use the Last-Modified timestamp. + """ + feed_id = context.partition_key + download_url = config.producer_url + provider = config.provider + + with duckdb.get_connection() as conn: + # Check the Last-Modified header without downloading the full file + try: + head_response = requests.head( + download_url, + timeout=30, + allow_redirects=True + ) + head_response.raise_for_status() + + last_modified_str = head_response.headers.get('Last-Modified') + if last_modified_str: + last_modified = datetime.strptime( + last_modified_str, + '%a, %d %b %Y %H:%M:%S GMT' + ) + else: + # If no Last-Modified header, use current time (will always download) + last_modified = datetime.now() + context.log.warning(f"No Last-Modified header for {feed_id}, will download") + + except Exception as e: + context.log.error(f"Failed to check headers for {feed_id}: {e}") + return Output(None, metadata={"status": "error", "error": str(e)}) + + # Check if we've already downloaded this version + existing = conn.execute(""" + SELECT file_path, downloaded_at + FROM gtfs_rt_vehicles_download_history + WHERE feed_id = ? AND last_modified = ? + ORDER BY downloaded_at DESC + LIMIT 1 + """, [feed_id, last_modified]).fetchone() + + if existing: + file_path, downloaded_at = existing + context.log.info( + f"Already have latest version of {feed_id} " + f"(modified: {last_modified}, downloaded: {downloaded_at})" + ) + return Output( + None, + metadata={ + "status": "up_to_date", + "last_modified": last_modified.isoformat(), + "existing_file": file_path, + "downloaded_at": downloaded_at.isoformat(), + "provider": provider, + } + ) + + # Download the file + download_time = datetime.now() + + context.log.info(f"Downloading new version of {feed_id} (modified: {last_modified})") + + # Create directory structure using last_modified date: data/raw/gtfs-rt/vehicles//YYYY/MM/DD + feed_dir = Path(f"data/raw/gtfs-rt/vehicles/{feed_id}/{last_modified.strftime('%Y/%m/%d')}") + feed_dir.mkdir(parents=True, exist_ok=True) + + # Filename format: HHMMSS.pb (using last_modified time) + filename = f"{last_modified.strftime('%Y%m%d%H%M%S')}.pb" + file_path = feed_dir / filename + + try: + response = requests.get(download_url, timeout=30) + response.raise_for_status() + + # Write the protobuf data + with open(file_path, 'wb') as f: + f.write(response.content) + + file_size = file_path.stat().st_size + + # Record the download in history + conn.execute(""" + INSERT INTO gtfs_rt_vehicles_download_history ( + feed_id, download_url, last_modified, file_path, file_size_bytes, downloaded_at + ) VALUES (?, ?, ?, ?, ?, ?) + """, [feed_id, download_url, last_modified, str(file_path), file_size, download_time]) + + return Output( + None, + metadata={ + "status": "downloaded", + "file_path": str(file_path), + "file_size_kb": round(file_size / 1024, 2), + "last_modified": last_modified.isoformat(), + "downloaded_at": download_time.isoformat(), + "provider": provider, + "download_url": download_url, + } + ) + + except Exception as e: + context.log.error(f"Failed to download {feed_id}: {e}") + # Clean up partial file if it exists + if file_path.exists(): + file_path.unlink() + return Output( + None, + metadata={ + "status": "error", + "error": str(e), + "feed_id": feed_id + } + ) diff --git a/user_code/assets/gtfs_static.py b/user_code/assets/gtfs_static.py new file mode 100644 index 0000000..ff6cc19 --- /dev/null +++ b/user_code/assets/gtfs_static.py @@ -0,0 +1,336 @@ +from dagster import ( + asset, + AssetExecutionContext, + Output, + MetadataValue, + AutomationCondition, + DynamicPartitionsDefinition, +) +from dagster_duckdb import DuckDBResource +from resources import MobilityDatabaseAPI +import json +import requests +from pathlib import Path +from datetime import datetime +import logging + + +logger = logging.getLogger(__name__) + +@asset( + deps=["agency_list"], + group_name="gtfs_metadata", + automation_condition=AutomationCondition.eager() +) +def gtfs_feed_metadata( + context: AssetExecutionContext, + duckdb: DuckDBResource, + mobility_db: MobilityDatabaseAPI +) -> Output[None]: + """ + Fetch GTFS feed metadata from Mobility Database API for all agencies + and store in DuckDB. + """ + + with duckdb.get_connection() as conn: + # Create the metadata table if it doesn't exist + conn.execute(""" + CREATE TABLE IF NOT EXISTS gtfs_feed_metadata ( + feed_id VARCHAR PRIMARY KEY, + provider VARCHAR, + status VARCHAR, + official BOOLEAN, + producer_url VARCHAR, + authentication_type INTEGER, + authentication_info_url VARCHAR, + api_key_parameter_name VARCHAR, + license_url VARCHAR, + feed_contact_email VARCHAR, + raw_json JSON, + fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + + # Create download history table for static GTFS + conn.execute(""" + CREATE TABLE IF NOT EXISTS gtfs_download_history ( + feed_id VARCHAR, + download_url VARCHAR, + last_modified TIMESTAMP, + file_path VARCHAR, + file_size_bytes BIGINT, + downloaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (feed_id, last_modified) + ) + """) + + # Get all GTFS feed IDs from agency_list + feed_ids = conn.execute(""" + SELECT DISTINCT GTFS as feed_id + FROM agency_list + WHERE GTFS IS NOT NULL AND GTFS != '' + """).fetchall() + + context.log.info(f"Found {len(feed_ids)} feeds to fetch") + + successful = 0 + failed = 0 + + for (feed_id,) in feed_ids: + try: + feed_info = mobility_db.get_feed_info(feed_id) + + # Extract relevant fields + source_info = feed_info.get("source_info", {}) + + # Insert or update the record + conn.execute(""" + INSERT OR REPLACE INTO gtfs_feed_metadata ( + feed_id, + provider, + status, + official, + producer_url, + authentication_type, + authentication_info_url, + api_key_parameter_name, + license_url, + feed_contact_email, + raw_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, [ + feed_id, + feed_info.get("provider"), + feed_info.get("status"), + feed_info.get("official"), + source_info.get("producer_url"), + source_info.get("authentication_type"), + source_info.get("authentication_info_url"), + source_info.get("api_key_parameter_name"), + source_info.get("license_url"), + feed_info.get("feed_contact_email"), + json.dumps(feed_info) + ]) + + context.log.info(f"✓ Fetched and stored metadata for {feed_id}") + successful += 1 + + except Exception as e: + context.log.error(f"✗ Failed to fetch {feed_id}: {e}") + failed += 1 + + # Get summary stats + total_records = conn.execute( + "SELECT COUNT(*) FROM gtfs_feed_metadata" + ).fetchone()[0] + + # Get preview for metadata + preview_df = conn.execute(""" + SELECT feed_id, provider, status, producer_url + FROM gtfs_feed_metadata + LIMIT 5 + """).df() + + return Output( + None, + metadata={ + "total_feeds": len(feed_ids), + "successful": successful, + "failed": failed, + "total_records_in_db": total_records, + "preview": MetadataValue.md(preview_df.to_markdown(index=False)) + } + ) + + +# Dynamic partition definition for GTFS feeds +gtfs_feeds_partitions_def = DynamicPartitionsDefinition(name="gtfs_feeds") + + +@asset( + deps=["gtfs_feed_metadata"], + group_name="gtfs_metadata", + automation_condition=AutomationCondition.eager() +) +def gtfs_feed_partitions( + context: AssetExecutionContext, + duckdb: DuckDBResource, +) -> Output[None]: + """ + Update the dynamic partitions based on feeds in gtfs_feed_metadata table. + Creates one partition per feed_id. + """ + with duckdb.get_connection() as conn: + feed_ids = conn.execute(""" + SELECT feed_id + FROM gtfs_feed_metadata + WHERE producer_url IS NOT NULL AND producer_url != '' + ORDER BY feed_id + """).fetchall() + + feed_id_list = [feed_id for (feed_id,) in feed_ids] + + # Update the dynamic partitions + context.instance.add_dynamic_partitions( + partitions_def_name="gtfs_feeds", + partition_keys=feed_id_list + ) + + context.log.info(f"Updated partitions with {len(feed_id_list)} feeds") + + return Output( + None, + metadata={ + "feed_count": len(feed_id_list), + "feeds": MetadataValue.md("\n".join(f"- {f}" for f in feed_id_list[:20])) + } + ) + + +@asset( + partitions_def=gtfs_feeds_partitions_def, + deps=["gtfs_feed_partitions"], + group_name="gtfs_downloads", + automation_condition=AutomationCondition.on_cron("0 * * * *") | AutomationCondition.eager(), +) +def gtfs_feed_downloads( + context: AssetExecutionContext, + duckdb: DuckDBResource, +) -> Output[None]: + """ + Download GTFS feed for each agency partition. + Only downloads if there's a new version available based on Last-Modified header. + Files are saved to data/raw/gtfs// + Runs on the hour and whenever new partitions are added. + """ + feed_id = context.partition_key + + with duckdb.get_connection() as conn: + # Get the download URL for this feed + feed_info = conn.execute(""" + SELECT feed_id, provider, producer_url + FROM gtfs_feed_metadata + WHERE feed_id = ? + """, [feed_id]).fetchone() + + if not feed_info: + raise ValueError(f"Feed {feed_id} not found in metadata") + + feed_id, provider, download_url = feed_info + + if not download_url: + context.log.warning(f"No download URL for {feed_id}") + return Output(None, metadata={"status": "no_url"}) + + # Check the Last-Modified header without downloading the full file + try: + head_response = requests.head( + download_url, + timeout=30, + allow_redirects=True + ) + head_response.raise_for_status() + + last_modified_str = head_response.headers.get('Last-Modified') + if last_modified_str: + last_modified = datetime.strptime( + last_modified_str, + '%a, %d %b %Y %H:%M:%S GMT' + ) + else: + # If no Last-Modified header, use current time + last_modified = datetime.now() + context.log.warning(f"No Last-Modified header for {feed_id}, using current time") + + except Exception as e: + context.log.error(f"Failed to check headers for {feed_id}: {e}") + return Output(None, metadata={"status": "error", "error": str(e)}) + + # Check if we've already downloaded this version + existing = conn.execute(""" + SELECT file_path, downloaded_at + FROM gtfs_download_history + WHERE feed_id = ? AND last_modified = ? + ORDER BY downloaded_at DESC + LIMIT 1 + """, [feed_id, last_modified]).fetchone() + + if existing: + file_path, downloaded_at = existing + context.log.info( + f"Already have latest version of {feed_id} " + f"(modified: {last_modified}, downloaded: {downloaded_at})" + ) + return Output( + None, + metadata={ + "status": "up_to_date", + "last_modified": last_modified.isoformat(), + "existing_file": file_path, + "downloaded_at": downloaded_at.isoformat() + } + ) + + # Download the file + context.log.info(f"Downloading new version of {feed_id} (modified: {last_modified})") + + # Create directory structure: data/raw/gtfs// + feed_dir = Path(f"data/raw/gtfs/{feed_id}") + feed_dir.mkdir(parents=True, exist_ok=True) + + # Filename format: yyyy-mm-dd-gtfs.zip (using last_modified date) + filename = f"{last_modified.strftime('%Y-%m-%d')}-gtfs.zip" + file_path = feed_dir / filename + + # If file exists with same name but different modified time, append time to filename + if file_path.exists(): + filename = f"{last_modified.strftime('%Y-%m-%d-%H%M%S')}-gtfs.zip" + file_path = feed_dir / filename + + try: + response = requests.get(download_url, timeout=120, stream=True) + response.raise_for_status() + + # Write file in chunks to handle large files + with open(file_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + file_size = file_path.stat().st_size + + # Record the download in history + conn.execute(""" + INSERT INTO gtfs_download_history ( + feed_id, download_url, last_modified, file_path, file_size_bytes + ) VALUES (?, ?, ?, ?, ?) + """, [feed_id, download_url, last_modified, str(file_path), file_size]) + + context.log.info( + f"✓ Downloaded {feed_id} to {file_path} ({file_size:,} bytes)" + ) + + return Output( + None, + metadata={ + "status": "downloaded", + "file_path": str(file_path), + "file_size_mb": round(file_size / 1024 / 1024, 2), + "last_modified": last_modified.isoformat(), + "provider": provider, + "download_url": download_url, + } + ) + + except Exception as e: + context.log.error(f"Failed to download {feed_id}: {e}") + # Clean up partial file if it exists + if file_path.exists(): + file_path.unlink() + return Output( + None, + metadata={ + "status": "error", + "error": str(e), + "feed_id": feed_id + } + ) diff --git a/user_code/definitions.py b/user_code/definitions.py index f8d4515..3fbae5e 100644 --- a/user_code/definitions.py +++ b/user_code/definitions.py @@ -7,10 +7,16 @@ from dagster import ( ) from dagster_duckdb import DuckDBResource -import assets +# Import asset modules +from assets import config, gtfs_static, gtfs_realtime + +# Import sensor modules +from sensors import gtfs_realtime as gtfs_rt_sensors + from resources import MobilityDatabaseAPI -all_assets = load_assets_from_modules([assets]) +# Load all assets from the modules +all_assets = load_assets_from_modules([config, gtfs_static, gtfs_realtime]) defs = Definitions( assets=all_assets, @@ -20,7 +26,7 @@ defs = Definitions( target="*", default_status=DefaultSensorStatus.RUNNING, ), - assets.gtfs_rt_vehicles_sensor + gtfs_rt_sensors.gtfs_rt_vehicles_sensor, ], resources={ "duckdb": DuckDBResource( diff --git a/user_code/sensors/__init__.py b/user_code/sensors/__init__.py new file mode 100644 index 0000000..9b6b8ec --- /dev/null +++ b/user_code/sensors/__init__.py @@ -0,0 +1,8 @@ +""" +GTFS data pipeline sensors. +""" +from .gtfs_realtime import * + +__all__ = [ + "gtfs_rt_vehicles_sensor", +] diff --git a/user_code/sensors/gtfs_realtime.py b/user_code/sensors/gtfs_realtime.py new file mode 100644 index 0000000..3e27602 --- /dev/null +++ b/user_code/sensors/gtfs_realtime.py @@ -0,0 +1,61 @@ +from dagster import ( + sensor, + RunRequest, + SkipReason, + SensorEvaluationContext, + DefaultSensorStatus, +) +from dagster_duckdb import DuckDBResource + + +@sensor( + name="gtfs_rt_vehicles_sensor", + minimum_interval_seconds=60, + asset_selection=["gtfs_rt_vehicles_downloads"], + default_status=DefaultSensorStatus.RUNNING +) +def gtfs_rt_vehicles_sensor( + context: SensorEvaluationContext, + duckdb: DuckDBResource, +) -> list[RunRequest] | SkipReason: + """ + Sensor that triggers gtfs_rt_vehicles_downloads every 60 seconds. + Fetches feed metadata once and passes it to each partition run. + """ + with duckdb.get_connection() as conn: + # Get all active feeds with their metadata in one query + feeds = conn.execute(""" + SELECT feed_id, provider, producer_url + FROM gtfs_rt_vehicles_metadata + WHERE producer_url IS NOT NULL AND producer_url != '' + ORDER BY feed_id + """).fetchall() + + if not feeds: + return SkipReason("No GTFS-RT vehicle feeds configured") + + # Create a RunRequest for each partition with metadata + run_requests = [ + RunRequest( + partition_key=feed_id, + run_config={ + "ops": { + "gtfs_rt_vehicles_downloads": { + "config": { + "provider": provider, + "producer_url": producer_url, + } + } + } + }, + tags={ + "feed_id": feed_id, + "sensor": "gtfs_rt_vehicles_sensor" + } + ) + for feed_id, provider, producer_url in feeds + ] + + context.log.info(f"Triggering downloads for {len(run_requests)} GTFS-RT vehicle feeds") + + return run_requests