from concurrent.futures import ThreadPoolExecutor, as_completed
from io import StringIO
from time import sleep
from urllib.error import HTTPError, URLError
from urllib.parse import urljoin
from urllib.request import urlopen
import geopandas as gpd
import pandas as pd
from geopandas.base import GeoPandasBase
from tqdm.auto import tqdm
from .misc import daterange, proj, to_crs
[docs]
class DataDownloadError(RuntimeError):
"""Raised when station data cannot be fetched or parsed."""
[docs]
class IsdLite:
"""
A client for accessing NOAA's ISD-Lite (Integrated Surface Data Lite) weather dataset.
ISD-Lite provides global hourly observations of temperature, dew point, pressure, wind,
sky coverage, and precipitation from weather stations worldwide. This client handles
downloading and processing the data, with support for spatial and temporal filtering.
The data fields available are:
- temp: Air temperature (Celsius)
- dewtemp: Dew point temperature (Celsius)
- pressure: Sea level pressure (hectopascals)
- winddirection: Wind direction (degrees)
- windspeed: Wind speed (meters/second)
- skycoverage: Sky coverage/ceiling (code)
- precipitation-1h: One-hour precipitation (mm)
- precipitation-6h: Six-hour precipitation (mm)
Args:
crs (int or str, optional): Coordinate reference system for spatial operations.
Defaults to 4326 (WGS 84).
verbose (int, optional): Verbosity level for progress reporting.
0 for silent, 1 for progress bars. Defaults to 0.
metadata_retries (int, optional): Number of metadata download attempts before failing.
Defaults to 3.
metadata_timeout (int or float, optional): Metadata request timeout in seconds.
Defaults to 2.
metadata_retry_delay (int or float, optional): Base delay between metadata retries.
Retries use exponential backoff. Defaults to 1.
Examples:
.. code-block:: python
# Initialize the client
isd = IsdLite()
# Get data for all US stations for January 2020
data = isd.get_data(
start='2020-01-01',
end='2020-01-31',
countries='US'
)
# Get data within a specific region, organized by weather variable
texas_data = isd.get_data(
start='2020-01-01',
geometry=(-106.6, 25.8, -93.5, 36.5), # Texas bounding box
organize_by='field'
)
# Use with geopandas for spatial filtering
import geopandas as gpd
city = gpd.read_file('city_boundary.geojson')
city_data = isd.get_data(
start='2020-01-01',
geometry=city
)
# Access specific weather variables
temperatures = texas_data['temp'] # When organize_by='field'
# Or
station_data = data['724940'] # When organize_by='location'
station_temp = station_data['temp']
"""
metadata_url = "https://www.ncei.noaa.gov/pub/data/noaa/isd-history.txt"
data_url = "https://www.ncei.noaa.gov/pub/data/noaa/isd-lite/{year}/"
fields = (
"temp",
"dewtemp",
"pressure",
"winddirection",
"windspeed",
"skycoverage",
"precipitation-1h",
"precipitation-6h",
)
default_metadata_retries = 3
default_metadata_timeout = 2
default_metadata_retry_delay = 1
def __init__(self, crs=4326, verbose=0, metadata_retries=None, metadata_timeout=None, metadata_retry_delay=None):
crs_value = to_crs(crs)
if crs_value is None:
raise TypeError("crs must not be None")
self.crs = crs_value
self.verbose = verbose
self.metadata_retries = self.default_metadata_retries if metadata_retries is None else metadata_retries
self.metadata_timeout = self.default_metadata_timeout if metadata_timeout is None else metadata_timeout
self.metadata_retry_delay = (
self.default_metadata_retry_delay if metadata_retry_delay is None else metadata_retry_delay
)
self._raw_metadata = None
if self.metadata_retries < 1:
raise ValueError("metadata_retries must be at least 1")
if self.metadata_timeout <= 0:
raise ValueError("metadata_timeout must be positive")
if self.metadata_retry_delay < 0:
raise ValueError("metadata_retry_delay must be non-negative")
@property
def raw_metadata(self):
"""Weather station metadata, loaded lazily on first access."""
return self._load_metadata()
@raw_metadata.setter
def raw_metadata(self, value):
self._raw_metadata = value
def _get_raw_metadata(self):
"""Backward-compatible wrapper for refreshing station metadata."""
return self.refresh_metadata()
def _download_metadata_text(self):
with urlopen(self.metadata_url, timeout=self.metadata_timeout) as response:
return response.read().decode("utf-8")
def _parse_metadata(self, content):
metadata = (
pd.read_fwf(StringIO(content), skiprows=20, header=0, dtype={"USAF": str, "WBAN": str})
.dropna(subset=["LAT", "LON"])
.query("not (LON == 0 and LAT == 0)")
)
metadata["x"], metadata["y"] = proj(metadata["LON"], metadata["LAT"], 4326, self.crs)
metadata[["BEGIN", "END"]] = metadata[["BEGIN", "END"]].astype(str).apply(pd.to_datetime)
return gpd.GeoDataFrame(
metadata.drop(columns=["LON", "LAT"]),
geometry=gpd.points_from_xy(metadata.x, metadata.y, crs=self.crs),
)
def _load_metadata(self, force=False):
"""Retrieve and cache weather station metadata from NOAA sources."""
if self._raw_metadata is not None and not force:
return self._raw_metadata
last_error = None
for attempt in range(self.metadata_retries):
try:
self._raw_metadata = self._parse_metadata(self._download_metadata_text())
return self._raw_metadata
except URLError as e:
last_error = e
if attempt < self.metadata_retries - 1 and self.metadata_retry_delay:
sleep(self.metadata_retry_delay * (2**attempt))
raise MetadataDownloadError(
f"Failed to download metadata from {self.metadata_url} after {self.metadata_retries} attempts"
) from last_error
def _filter_metadata(self, countries, geometry):
"""
Internal: filter raw_metadata by country or geometry and return list of (USAF, WBAN) tuples.
"""
filt = self.raw_metadata
if countries is not None:
if isinstance(countries, str):
countries = (countries,)
filt = filt[filt["CTRY"].isin(countries)]
if geometry is not None:
if isinstance(geometry, GeoPandasBase):
geometry = geometry.to_crs(self.crs)
filt = gpd.clip(filt, geometry)
# Extract unique station identifier pairs
pairs = filt.drop_duplicates(subset=["USAF", "WBAN"])[["USAF", "WBAN"]].values
return [(str(usaf), str(wban)) for usaf, wban in pairs]
@classmethod
def _download_read(cls, url):
time_features = ["year", "month", "day", "hour"]
df = pd.read_csv(url, sep="\\s+", header=None, na_values=-9999)
df.columns = time_features + list(cls.fields)
df[["temp", "dewtemp", "pressure", "windspeed"]] /= 10.0
df.index = pd.to_datetime(df[time_features])
df = df.drop(columns=time_features)
return df
@classmethod
def _download_data_id(cls, usaf_id, wban_id, years):
station_id = f"{usaf_id}-{wban_id}"
frames = []
for year in years:
url = urljoin(cls.data_url.format(year=year), f"{usaf_id}-{wban_id}-{year}.gz")
try:
frames.append(cls._download_read(url))
except Exception as exc:
if isinstance(exc, HTTPError) and exc.code == 404:
continue
raise DataDownloadError(
f"Failed to download data for station {station_id} in {year} from {url}"
) from exc
return pd.concat(frames) if frames else pd.DataFrame()
[docs]
def get_data(
self,
start,
end=None,
station_id=None,
countries=None,
geometry=None,
organize_by="location",
n_jobs=6,
):
"""
Fetches weather data from the ISD-Lite dataset for the specified time range and location.
Args:
start (datetime): The start date for the data retrieval.
end (datetime, optional): The end date for the data retrieval. If not provided, defaults to the start date.
station_id (str, optional): A specific weather station ID in the format 'USAF-WBAN' to retrieve data for.
If provided, overrides any spatial or country filters. If None, data for all stations will
countries (str or iterable of str, optional): Country code(s) to filter stations by. Must be valid codes from
the ISD-Lite metadata (found in raw_metadata['CTRY']). Can be either a single country code as string
or multiple codes as an iterable. If None, stations from all countries will be considered. When used
together with ``geometry``, both filters are applied.
geometry (GeoSeries or tuple, optional): A spatial filter for the stations. Can be either:
- A GeoSeries or geometry object to filter stations by spatial location
- A tuple of (xmin, ymin, xmax, ymax) defining a bounding box
If None, data for all stations will be retrieved. Defaults to None. When used together with
``countries``, only stations matching both filters are considered.
organize_by (str, optional): Determines how the resulting data is organized. Options are:
- 'location': Organize data by weather station.
- 'field': Organize data by weather variable.
Defaults to 'location'.
n_jobs (int, optional): The number of threads to use for parallel data downloads. Defaults to 6.
Returns:
dict: A dictionary containing the weather data. The structure of the dictionary depends on the
`organize_by` parameter:
- If 'location': Keys are station IDs, and values are DataFrames with weather data.
- If 'field': Keys are weather variables, and values are DataFrames with stations as columns.
If no data is available, each field maps to an empty DataFrame indexed by the requested time range.
Raises:
ValueError: If `organize_by` is not one of the allowed options.
DataDownloadError: If station data cannot be downloaded or parsed.
Examples:
.. code-block:: python
# Get data for a single country
data = isd.get_data(start='2020-01-01', end='2020-12-31', countries='US')
# Get data for multiple countries
data = isd.get_data(start='2020-01-01', countries=['US', 'CA', 'MX'])
# Get data within a bounding box
data = isd.get_data(start='2020-01-01', geometry=(-100, 30, -90, 40))
"""
if organize_by not in {"field", "location"}:
raise ValueError(f"`{organize_by}` is not a recognized argument, must be one of field, location!")
time = daterange(start, end, freq="h")
years = [int(year) for year in dict.fromkeys(time.strftime("%Y"))]
# Determine station list: optional single station override
if station_id is not None:
try:
usaf_id, wban_id = station_id.split("-", 1)
except ValueError:
raise ValueError("station_id must be in format 'USAF-WBAN'")
stations = [(usaf_id, wban_id)]
else:
stations = self._filter_metadata(countries=countries, geometry=geometry)
def fetch_data(station):
usaf_id, wban_id = station
df = self._download_data_id(usaf_id=usaf_id, wban_id=wban_id, years=years)
return usaf_id, wban_id, df.reindex(index=time)
ret = {}
with ThreadPoolExecutor(max_workers=n_jobs) as executor:
# Launch fetch tasks for each station tuple
futures = {executor.submit(fetch_data, station): station for station in stations}
for future in tqdm(as_completed(futures), total=len(futures), disable=(not self.verbose)):
usaf_id, wban_id, data = future.result()
if data.size > 0:
ret[f"{usaf_id}-{wban_id}"] = data
if organize_by == "field":
if not ret:
return {field: pd.DataFrame(index=time) for field in self.fields}
ret = {
field: pd.concat([ret[station_id][field].rename(station_id) for station_id in ret], axis=1)
for field in self.fields
}
return ret