139 lines
4.4 KiB
Python
139 lines
4.4 KiB
Python
import pandas as pd
|
|
from dagster import (
|
|
asset,
|
|
AssetExecutionContext,
|
|
Output,
|
|
MetadataValue,
|
|
AutoMaterializePolicy
|
|
)
|
|
from dagster_duckdb import DuckDBResource
|
|
from resources import MobilityDatabaseAPI # Direct import instead of relative
|
|
import json
|
|
|
|
|
|
@asset
|
|
def agency_list(duckdb: DuckDBResource) -> None:
|
|
"""Load agency list from CSV into DuckDB."""
|
|
|
|
# Read the CSV (path is relative to container working directory)
|
|
df = pd.read_csv('data/gtfs/agency_list.csv')
|
|
|
|
# Write to DuckDB
|
|
with duckdb.get_connection() as conn:
|
|
conn.execute("""
|
|
CREATE OR REPLACE TABLE agency_list AS
|
|
SELECT * FROM df
|
|
""")
|
|
|
|
|
|
@asset(
|
|
deps=["agency_list"],
|
|
group_name="gtfs_metadata",
|
|
auto_materialize_policy=AutoMaterializePolicy.eager()
|
|
)
|
|
def gtfs_feed_metadata(
|
|
context: AssetExecutionContext,
|
|
duckdb: DuckDBResource,
|
|
mobility_db: MobilityDatabaseAPI
|
|
) -> Output[None]:
|
|
"""
|
|
Fetch GTFS feed metadata from Mobility Database API for all agencies
|
|
and store in DuckDB.
|
|
"""
|
|
|
|
with duckdb.get_connection() as conn:
|
|
# Create the metadata table if it doesn't exist
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS gtfs_feed_metadata (
|
|
feed_id VARCHAR PRIMARY KEY,
|
|
provider VARCHAR,
|
|
status VARCHAR,
|
|
official BOOLEAN,
|
|
producer_url VARCHAR,
|
|
authentication_type INTEGER,
|
|
authentication_info_url VARCHAR,
|
|
api_key_parameter_name VARCHAR,
|
|
license_url VARCHAR,
|
|
feed_contact_email VARCHAR,
|
|
raw_json JSON,
|
|
fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
# Get all GTFS feed IDs from agency_list
|
|
feed_ids = conn.execute("""
|
|
SELECT DISTINCT GTFS as feed_id
|
|
FROM agency_list
|
|
WHERE GTFS IS NOT NULL AND GTFS != ''
|
|
""").fetchall()
|
|
|
|
context.log.info(f"Found {len(feed_ids)} feeds to fetch")
|
|
|
|
successful = 0
|
|
failed = 0
|
|
|
|
for (feed_id,) in feed_ids:
|
|
try:
|
|
feed_info = mobility_db.get_feed_info(feed_id)
|
|
|
|
# Extract relevant fields
|
|
source_info = feed_info.get("source_info", {})
|
|
|
|
# Insert or update the record
|
|
conn.execute("""
|
|
INSERT OR REPLACE INTO gtfs_feed_metadata (
|
|
feed_id,
|
|
provider,
|
|
status,
|
|
official,
|
|
producer_url,
|
|
authentication_type,
|
|
authentication_info_url,
|
|
api_key_parameter_name,
|
|
license_url,
|
|
feed_contact_email,
|
|
raw_json
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", [
|
|
feed_id,
|
|
feed_info.get("provider"),
|
|
feed_info.get("status"),
|
|
feed_info.get("official"),
|
|
source_info.get("producer_url"),
|
|
source_info.get("authentication_type"),
|
|
source_info.get("authentication_info_url"),
|
|
source_info.get("api_key_parameter_name"),
|
|
source_info.get("license_url"),
|
|
feed_info.get("feed_contact_email"),
|
|
json.dumps(feed_info)
|
|
])
|
|
|
|
context.log.info(f"✓ Fetched and stored metadata for {feed_id}")
|
|
successful += 1
|
|
|
|
except Exception as e:
|
|
context.log.error(f"✗ Failed to fetch {feed_id}: {e}")
|
|
failed += 1
|
|
|
|
# Get summary stats
|
|
total_records = conn.execute(
|
|
"SELECT COUNT(*) FROM gtfs_feed_metadata"
|
|
).fetchone()[0]
|
|
|
|
# Get preview for metadata
|
|
preview_df = conn.execute("""
|
|
SELECT feed_id, provider, status, producer_url
|
|
FROM gtfs_feed_metadata
|
|
LIMIT 5
|
|
""").df()
|
|
|
|
return Output(
|
|
None,
|
|
metadata={
|
|
"total_feeds": len(feed_ids),
|
|
"successful": successful,
|
|
"failed": failed,
|
|
"total_records_in_db": total_records,
|
|
"preview": MetadataValue.md(preview_df.to_markdown(index=False))
|
|
}
|
|
)
|