added asset to read agency_list.csv and add it to table in gtfs.duckdb

This commit is contained in:
Ben Varick 2025-11-05 18:22:26 -08:00
parent 7791d034ae
commit 2b47a45b8f
Signed by: ben
SSH key fingerprint: SHA256:jWnpFDAcacYM5aPFpYRqlsamlDyKNpSj3jj+k4ojtUo
9 changed files with 44 additions and 4 deletions

5
.gitignore vendored
View file

@ -5,3 +5,8 @@
postgres_data postgres_data
postgres_data/* postgres_data/*
#Exclude data directory
data
#except for agency_list.csv
!data/gtfs/agency_list.csv

View file

@ -6,13 +6,13 @@ FROM python:3.10-slim
RUN pip install \ RUN pip install \
dagster \ dagster \
dagster-postgres \ dagster-postgres \
dagster-docker dagster-docker \
dagster-duckdb \
pandas
WORKDIR /opt/dagster/app WORKDIR /opt/dagster/app
COPY user_code/gtfs /opt/dagster/app COPY user_code/gtfs /opt/dagster/app
COPY definitions.py /opt/dagster/app
# Run dagster gRPC server on port 4000 # Run dagster gRPC server on port 4000
EXPOSE 4000 EXPOSE 4000

View file

@ -26,6 +26,7 @@ run_launcher:
volumes: # Make docker client accessible to any launched containers as well volumes: # Make docker client accessible to any launched containers as well
- /var/run/docker.sock:/var/run/docker.sock - /var/run/docker.sock:/var/run/docker.sock
- /tmp/io_manager_storage:/tmp/io_manager_storage - /tmp/io_manager_storage:/tmp/io_manager_storage
- /home/ben/code/gtfs-dagster/data:/opt/dagster/app/data
run_storage: run_storage:
module: dagster_postgres.run_storage module: dagster_postgres.run_storage

View file

View file

@ -39,6 +39,8 @@ services:
DAGSTER_POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} DAGSTER_POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
DAGSTER_POSTGRES_DB: ${POSTGRES_DB} DAGSTER_POSTGRES_DB: ${POSTGRES_DB}
DAGSTER_CURRENT_IMAGE: 'dagster_user_code_gtfs' DAGSTER_CURRENT_IMAGE: 'dagster_user_code_gtfs'
volumes:
- ./data:/opt/dagster/app/data
networks: networks:
- dagster - dagster

View file

@ -0,0 +1,2 @@
# user_code/gtfs/__init__.py
from .assets import *

18
user_code/gtfs/assets.py Normal file
View file

@ -0,0 +1,18 @@
import pandas as pd
from dagster import asset
from dagster_duckdb import DuckDBResource
@asset
def agency_list(duckdb: DuckDBResource) -> None:
"""Load agency list from CSV into DuckDB."""
# Read the CSV (path is relative to container working directory)
df = pd.read_csv('data/gtfs/agency_list.csv')
# Write to DuckDB
with duckdb.get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS agency_list AS
SELECT * FROM df
""")

View file

@ -0,0 +1,12 @@
from dagster import Definitions
from dagster_duckdb import DuckDBResource
from assets import agency_list
defs = Definitions(
assets=[agency_list],
resources={
"duckdb": DuckDBResource(
database="data/gtfs/gtfs.duckdb"
)
}
)

View file

@ -3,4 +3,4 @@ load_from:
- grpc_server: - grpc_server:
host: dagster_user_code_gtfs host: dagster_user_code_gtfs
port: 4000 port: 4000
location_name: "gtfs" location_name: "gtfs_user_code"