ABS Data API Client.
Handles fetching and parsing Wage Price Index (WPI) data for the health sector.
Classes
ABSApiClient
Client for fetching data from the Australian Bureau of Statistics Data API.
Source code in src/nhra_gt/domain/abs_api.py
| class ABSApiClient:
"""Client for fetching data from the Australian Bureau of Statistics Data API."""
def __init__(self, cache_dir: Path | str = "data/raw"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
def fetch_wpi_health(self, use_cache: bool = True) -> pd.DataFrame:
"""
Fetches the WPI for Health Care and Social Assistance.
Returns:
DataFrame with columns ['year', 'wpi_health_index'] normalized to 2011=100.
"""
cache_path = self.cache_dir / "abs_wpi_health_raw.csv"
if use_cache and cache_path.exists():
logger.info(f"Using cached WPI data from {cache_path}")
df_raw = pd.read_csv(cache_path)
else:
url = f"{ABS_API_BASE_URL}/{WPI_DATAFLOW}/{WPI_HEALTH_KEY}"
headers = {"Accept": "application/vnd.sdmx.data+csv"}
logger.info(f"Fetching WPI data from {url}")
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
with open(cache_path, "w") as f:
f.write(response.text)
df_raw = pd.read_csv(cache_path)
except Exception as e:
logger.error(f"Failed to fetch WPI data from ABS API: {e}")
if cache_path.exists():
logger.warning("Falling back to existing cache.")
df_raw = pd.read_csv(cache_path)
else:
raise
# Process: Convert TIME_PERIOD (YYYY-QX) to year and average
df_raw["year"] = df_raw["TIME_PERIOD"].str[:4].astype(int)
# Annual average
df_annual = df_raw.groupby("year")["OBS_VALUE"].mean().reset_index()
df_annual = df_annual.rename(columns={"OBS_VALUE": "wpi_health_index"})
# Normalize to 2011 = 100.0
if 2011 in df_annual["year"].values:
base_val = df_annual.loc[df_annual["year"] == 2011, "wpi_health_index"].values[0]
df_annual["wpi_health_index"] = (df_annual["wpi_health_index"] / base_val) * 100.0
else:
logger.warning("2011 not found in WPI data. Normalization might be inconsistent.")
return df_annual.sort_values("year")
|
Functions
fetch_wpi_health(use_cache=True)
Fetches the WPI for Health Care and Social Assistance.
Returns:
| Type |
Description |
DataFrame
|
DataFrame with columns ['year', 'wpi_health_index'] normalized to 2011=100.
|
Source code in src/nhra_gt/domain/abs_api.py
| def fetch_wpi_health(self, use_cache: bool = True) -> pd.DataFrame:
"""
Fetches the WPI for Health Care and Social Assistance.
Returns:
DataFrame with columns ['year', 'wpi_health_index'] normalized to 2011=100.
"""
cache_path = self.cache_dir / "abs_wpi_health_raw.csv"
if use_cache and cache_path.exists():
logger.info(f"Using cached WPI data from {cache_path}")
df_raw = pd.read_csv(cache_path)
else:
url = f"{ABS_API_BASE_URL}/{WPI_DATAFLOW}/{WPI_HEALTH_KEY}"
headers = {"Accept": "application/vnd.sdmx.data+csv"}
logger.info(f"Fetching WPI data from {url}")
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
with open(cache_path, "w") as f:
f.write(response.text)
df_raw = pd.read_csv(cache_path)
except Exception as e:
logger.error(f"Failed to fetch WPI data from ABS API: {e}")
if cache_path.exists():
logger.warning("Falling back to existing cache.")
df_raw = pd.read_csv(cache_path)
else:
raise
# Process: Convert TIME_PERIOD (YYYY-QX) to year and average
df_raw["year"] = df_raw["TIME_PERIOD"].str[:4].astype(int)
# Annual average
df_annual = df_raw.groupby("year")["OBS_VALUE"].mean().reset_index()
df_annual = df_annual.rename(columns={"OBS_VALUE": "wpi_health_index"})
# Normalize to 2011 = 100.0
if 2011 in df_annual["year"].values:
base_val = df_annual.loc[df_annual["year"] == 2011, "wpi_health_index"].values[0]
df_annual["wpi_health_index"] = (df_annual["wpi_health_index"] / base_val) * 100.0
else:
logger.warning("2011 not found in WPI data. Normalization might be inconsistent.")
return df_annual.sort_values("year")
|