85 lines
2.8 KiB
Python
85 lines
2.8 KiB
Python
from dagster import ConfigurableResource
|
|
import requests
|
|
from typing import Optional
|
|
from time import sleep
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MobilityDatabaseAPI(ConfigurableResource):
|
|
"""Resource for interacting with the Mobility Database API with OAuth2 token management."""
|
|
|
|
base_url: str = "https://api.mobilitydatabase.org"
|
|
refresh_token: str # Long-lived refresh token
|
|
rate_limit_delay: float = 0.5 # Seconds between requests
|
|
|
|
# These will be set at runtime, not in config
|
|
_access_token: Optional[str] = None
|
|
_token_expires_at: Optional[datetime] = None
|
|
|
|
def _get_access_token(self) -> str:
|
|
"""
|
|
Get a valid access token, refreshing if necessary.
|
|
Access tokens are valid for 1 hour.
|
|
"""
|
|
# If we have a token and it's not expired (with 5 min buffer), use it
|
|
if self._access_token and self._token_expires_at:
|
|
if datetime.now() < self._token_expires_at - timedelta(minutes=5):
|
|
return self._access_token
|
|
|
|
# Need to get a new token
|
|
logger.info("Fetching new access token from Mobility Database API")
|
|
|
|
url = f"{self.base_url}/v1/tokens"
|
|
headers = {"Content-Type": "application/json"}
|
|
data = {"refresh_token": self.refresh_token}
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, json=data, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
token_data = response.json()
|
|
self._access_token = token_data.get("access_token")
|
|
|
|
# Tokens are valid for 1 hour
|
|
self._token_expires_at = datetime.now() + timedelta(hours=1)
|
|
|
|
logger.info("Successfully obtained new access token")
|
|
return self._access_token
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error fetching access token: {e}")
|
|
raise RuntimeError(f"Failed to obtain access token: {e}")
|
|
|
|
def get_feed_info(self, feed_id: str) -> dict:
|
|
"""
|
|
Fetch feed information from the Mobility Database API.
|
|
|
|
Args:
|
|
feed_id: The MDB feed ID (e.g., 'mdb-394')
|
|
|
|
Returns:
|
|
Dictionary containing feed information
|
|
"""
|
|
access_token = self._get_access_token()
|
|
|
|
url = f"{self.base_url}/v1/feeds/{feed_id}"
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}"
|
|
}
|
|
|
|
logger.info(f"Fetching feed info for {feed_id}")
|
|
|
|
try:
|
|
response = requests.get(url, headers=headers, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
# Rate limiting
|
|
sleep(self.rate_limit_delay)
|
|
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error fetching feed {feed_id}: {e}")
|
|
raise
|