from dagster import ( sensor, RunRequest, SkipReason, SensorEvaluationContext, DefaultSensorStatus, SensorResult, EventLogEntry, AssetKey, ) from dagster_duckdb import DuckDBResource @sensor( name="gtfs_static_hourly_sensor", minimum_interval_seconds=3600, # 60 minutes asset_selection=["gtfs_feed_downloads"], default_status=DefaultSensorStatus.RUNNING ) def gtfs_static_hourly_sensor( context: SensorEvaluationContext, duckdb: DuckDBResource, ) -> list[RunRequest] | SkipReason: """ Sensor that triggers gtfs_feed_downloads every 60 minutes. Fetches feed metadata once and passes it to each partition run. """ # Check if upstream asset has been materialized at least once # Update this asset name if your upstream asset has a different name upstream_asset_key = AssetKey("gtfs_feed_partitions") latest_materialization = context.instance.get_latest_materialization_event(upstream_asset_key) if latest_materialization is None: return SkipReason( "Waiting for upstream asset 'gtfs_feed_partitions' to be materialized. " "Run the upstream assets first." ) try: with duckdb.get_connection() as conn: # Get all active feeds with their metadata in one query feeds = conn.execute(""" SELECT feed_id, provider, producer_url FROM gtfs_feed_metadata WHERE producer_url IS NOT NULL AND producer_url != '' ORDER BY feed_id """).fetchall() if not feeds: return SkipReason("No GTFS feeds configured") # Create a RunRequest for each partition with metadata run_requests = [ RunRequest( partition_key=feed_id, run_config={ "ops": { "gtfs_feed_downloads": { "config": { "provider": provider, "producer_url": producer_url, } } } }, tags={ "feed_id": feed_id, "sensor": "gtfs_static_sensor" } ) for feed_id, provider, producer_url in feeds ] context.log.info(f"Triggering downloads for {len(run_requests)} GTFS feeds") return run_requests except Exception as e: # Handle case where table doesn't exist yet or other DB errors context.log.warning(f"Database query failed: {e}") return SkipReason(f"Database not ready or query failed: {e}") @sensor( name="gtfs_static_partition_update_sensor", asset_selection=["gtfs_feed_downloads"], default_status=DefaultSensorStatus.RUNNING ) def gtfs_static_partition_update_sensor( context: SensorEvaluationContext, duckdb: DuckDBResource, ) -> list[RunRequest] | SkipReason: """ Sensor that triggers gtfs_feed_downloads when gtfs_feed_partitions is materialized. This runs whenever new partitions are added. """ # Get the asset key for gtfs_feed_partitions from dagster import AssetKey partition_asset_key = AssetKey("gtfs_feed_partitions") # Get the last materialization of gtfs_feed_partitions last_materialization = context.instance.get_latest_materialization_event( partition_asset_key ) if not last_materialization: return SkipReason("gtfs_feed_partitions has not been materialized yet") # Get cursor (last processed materialization timestamp) cursor = context.cursor last_run_timestamp = float(cursor) if cursor else 0 # Get the timestamp of the latest materialization materialization_timestamp = last_materialization.timestamp # Check if there's a new materialization since last sensor run if materialization_timestamp <= last_run_timestamp: return SkipReason( f"No new materialization of gtfs_feed_partitions since last check " f"(last: {last_run_timestamp}, current: {materialization_timestamp})" ) context.log.info( f"Detected new materialization of gtfs_feed_partitions at {materialization_timestamp}" ) # Get all feeds from the database with duckdb.get_connection() as conn: feeds = conn.execute(""" SELECT feed_id, provider, producer_url FROM gtfs_feed_metadata WHERE producer_url IS NOT NULL AND producer_url != '' ORDER BY feed_id """).fetchall() if not feeds: # Update cursor even if no feeds to avoid re-triggering return SensorResult( skip_reason="No GTFS feeds configured", cursor=str(materialization_timestamp) ) run_requests = [ RunRequest( partition_key=feed_id, run_config={ "ops": { "gtfs_feed_downloads": { "config": { "provider": provider, "producer_url": producer_url, } } } }, tags={ "feed_id": feed_id, "sensor": "gtfs_static_partition_update_sensor", "trigger": "partition_update" } ) for feed_id, provider, producer_url in feeds ] context.log.info( f"Triggering downloads for {len(run_requests)} GTFS feeds " f"due to partition update" ) # Return with updated cursor return SensorResult( run_requests=run_requests, cursor=str(materialization_timestamp) )