diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml new file mode 100644 index 0000000..a959058 --- /dev/null +++ b/.github/workflows/python-package.yml @@ -0,0 +1,45 @@ +# https://github.com/actions/starter-workflows/blob/main/ci/python-package.yml +# This workflow will install Python dependencies, run tests and lint with a variety of Python versions +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python + +name: Python package + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + workflow_dispatch: + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install . + python -m pip install flake8 pytest ruff + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + # exit-zero treats all errors as warnings. + flake8 . --count --exit-zero --show-source --statistics + - name: Lint with ruff + run: | + ruff . + - name: Test with pytest + run: | + pytest diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml new file mode 100644 index 0000000..02d5ca0 --- /dev/null +++ b/.github/workflows/python-publish.yml @@ -0,0 +1,40 @@ +# https://github.com/actions/starter-workflows/blob/main/ci/python-publish.yml +# This workflow will upload a Python Package using Twine when a release is created +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python#publishing-to-package-registries + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +name: Upload Python Package + +on: + release: + types: [published] + +permissions: + contents: read + +jobs: + deploy: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.x' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build + - name: Build package + run: python -m build + - name: Publish package + uses: pypa/gh-action-pypi-publish@release/v1 + with: + user: __token__ + password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/README.md b/README.md index 8513459..c4266c6 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,46 @@ # opower -Python library to get historical and forecasted usage/cost from utilities that use opower.com such as PG&E +A Python library for getting historical and forecasted usage/cost from utilities that use opower.com such as PG&E. + +To add support for a new utility that uses opower JSON API (you can tell if the energy dashboard of your utility is hosted on opower.com, e.g. pge.opower.com) add a file similar to [pge.py](https://github.com/tronikos/opower/blob/main/src/opower/utilities/pge.py). + +## Example + +See [demo.py](https://github.com/tronikos/opower/blob/main/src/demo.py) + +## Development environment + +```sh +python3 -m venv .venv +source .venv/bin/activate +# for Windows CMD: +# .venv\Scripts\activate.bat +# for Windows PowerShell: +# .venv\Scripts\Activate.ps1 + +# Install dependencies +python -m pip install --upgrade pip +python -m pip install . + +# Run formatter +python -m pip install isort black +isort . +black . + +# Run lint +python -m pip install flake8 ruff +flake8 . +ruff . + +# Run tests +python -m pip install pytest +pytest + +# Run demo +python src/demo.py --help +# To output debug logs to a file: +python src/demo.py --verbose 2> out.txt + +# Build package +python -m pip install build +python -m build +``` diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..11e258a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,78 @@ +[project] +name = "opower" +version = "0.0.1" +license = {text = "Apache-2.0"} +authors = [ + { name="tronikos", email="tronikos@gmail.com" }, +] +description = "A Python library for getting historical and forecasted usage/cost from utilities that use opower.com such as PG&E" +readme = "README.md" +requires-python = ">=3.7" +dependencies = [ + "aiohttp>=3.8", + "asyncio>=3.4.3", +] + +[project.urls] +"Homepage" = "https://github.com/tronikos/opower" +"Bug Tracker" = "https://github.com/tronikos/opower/issues" + +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[tool.black] +extend-exclude = "_pb2.py|_pb2_grpc.py" + +[tool.isort] +profile = "black" +force_sort_within_sections = true +combine_as_imports = true +extend_skip_glob = ["*_pb2.py", "*_pb2_grpc.py"] + +[tool.ruff] +target-version = "py311" +exclude = ["*_pb2.py", "*_pb2_grpc.py", "*.pyi"] +line-length = 127 + +select = [ + "B007", # Loop control variable {name} not used within loop body + "B014", # Exception handler with duplicate exception + "C", # complexity + "D", # docstrings + "E", # pycodestyle + "F", # pyflakes/autoflake + "ICN001", # import concentions; {name} should be imported as {asname} + "PGH004", # Use specific rule codes when using noqa + "PLC0414", # Useless import alias. Import alias does not rename original package. + "SIM105", # Use contextlib.suppress({exception}) instead of try-except-pass + "SIM117", # Merge with-statements that use the same scope + "SIM118", # Use {key} in {dict} instead of {key} in {dict}.keys() + "SIM201", # Use {left} != {right} instead of not {left} == {right} + "SIM212", # Use {a} if {a} else {b} instead of {b} if not {a} else {a} + "SIM300", # Yoda conditions. Use 'age == 42' instead of '42 == age'. + "SIM401", # Use get from dict with default instead of an if block + "T20", # flake8-print + "TRY004", # Prefer TypeError exception for invalid type + "RUF006", # Store a reference to the return value of asyncio.create_task + "UP", # pyupgrade + "W", # pycodestyle +] + +ignore = [ + "D203", # 1 blank line required before class docstring + "D213", # Multi-line docstring summary should start at the second line +] + +[tool.ruff.flake8-pytest-style] +fixture-parentheses = false + +[tool.ruff.pyupgrade] +keep-runtime-typing = true + +[tool.ruff.per-file-ignores] +# Allow for demo script to write to stdout +"demo.py" = ["T201"] + +[tool.ruff.mccabe] +max-complexity = 25 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..9e7b286 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,17 @@ +[flake8] +exclude = .venv,.git,docs,venv,bin,lib,deps,build,*_pb2.py,*_pb2_grpc.py +max-complexity = 25 +doctests = True +# To work with Black +# E501: line too long +# W503: Line break occurred before a binary operator +# E203: Whitespace before ':' +# D202 No blank lines allowed after function docstring +# W504 line break after binary operator +ignore = + E501, + W503, + E203, + D202, + W504 +noqa-require-code = True diff --git a/src/demo.py b/src/demo.py new file mode 100644 index 0000000..7083cbe --- /dev/null +++ b/src/demo.py @@ -0,0 +1,138 @@ +"""Demo usage of Opower library.""" + +import argparse +import asyncio +from datetime import datetime, timedelta +from getpass import getpass +import logging + +import aiohttp + +from opower import AggregateType, Opower, get_supported_utility_subdomains + + +async def _main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--utility", + help="Utility (subdomain of opower.com). Defaults to pge", + choices=get_supported_utility_subdomains(), + default="pge", + ) + parser.add_argument( + "--username", + help="Username for logging into the utility's website. " + "If not provided, you will be asked for it", + ) + parser.add_argument( + "--password", + help="Password for logging into the utility's website. " + "If not provided, you will be asked for it", + ) + parser.add_argument( + "--aggregate_type", + help="How to aggregate historical data. Defaults to day", + choices=list(AggregateType), + default=AggregateType.DAY, + ) + parser.add_argument( + "--start_date", + help="Start datetime for historical data. Defaults to 7 days ago", + type=lambda s: datetime.fromisoformat(s), + default=datetime.now() - timedelta(days=7), + ) + parser.add_argument( + "--end_date", + help="end datetime for historical data. Defaults to now", + type=lambda s: datetime.fromisoformat(s), + default=datetime.now(), + ) + parser.add_argument( + "--usage_only", + help="If true will output usage only, not cost", + action="store_true", + ) + parser.add_argument( + "-v", "--verbose", help="enable verbose logging", action="store_true" + ) + args = parser.parse_args() + + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) + + username = args.username or input("Username: ") + password = args.password or getpass("Password: ") + + async with aiohttp.ClientSession() as session: + opower = Opower(session, args.utility, username, password) + await opower.async_login() + forecasts = await opower.async_get_forecast() + for forecast in forecasts: + print("\nData for meter:", forecast.account.meter_type) + print("\nCurrent bill forecast:", forecast) + print( + "\nGetting historical data: aggregate_type=", + args.aggregate_type, + "start_date=", + args.start_date, + "end_date=", + args.end_date, + ) + if args.usage_only: + usage_data = await opower.async_get_usage_reads( + forecast.account, + args.aggregate_type, + args.start_date, + args.end_date, + ) + prev_end = None + print( + "start_time\tend_time\tconsumption" + "\tstart_minus_prev_end\tend_minus_prev_end" + ) + for usage_read in usage_data: + start_minus_prev_end = ( + None if prev_end is None else usage_read.start_time - prev_end + ) + end_minus_prev_end = ( + None if prev_end is None else usage_read.end_time - prev_end + ) + prev_end = usage_read.end_time + print( + f"{usage_read.start_time}" + f"\t{usage_read.end_time}" + f"\t{usage_read.consumption}" + f"\t{start_minus_prev_end}" + f"\t{end_minus_prev_end}" + ) + else: + cost_data = await opower.async_get_cost_reads( + forecast.account, + args.aggregate_type, + args.start_date, + args.end_date, + ) + prev_end = None + print( + "start_time\tend_time\tconsumption\tprovided_cost" + "\tstart_minus_prev_end\tend_minus_prev_end" + ) + for cost_read in cost_data: + start_minus_prev_end = ( + None if prev_end is None else cost_read.start_time - prev_end + ) + end_minus_prev_end = ( + None if prev_end is None else cost_read.end_time - prev_end + ) + prev_end = cost_read.end_time + print( + f"{cost_read.start_time}" + f"\t{cost_read.end_time}" + f"\t{cost_read.consumption}" + f"\t{cost_read.provided_cost}" + f"\t{start_minus_prev_end}" + f"\t{end_minus_prev_end}" + ) + print() + + +asyncio.run(_main()) diff --git a/src/opower/__init__.py b/src/opower/__init__.py new file mode 100644 index 0000000..7b7000b --- /dev/null +++ b/src/opower/__init__.py @@ -0,0 +1,27 @@ +"""Library for getting historical and forecasted usage/cost from an utility using opower.com JSON API.""" + +from .opower import ( + Account, + AggregateType, + CostRead, + Forecast, + MeterType, + Opower, + UnitOfMeasure, + UsageRead, + get_supported_utility_names, + get_supported_utility_subdomains, +) + +__all__ = [ + "Account", + "AggregateType", + "CostRead", + "Forecast", + "MeterType", + "Opower", + "UnitOfMeasure", + "UsageRead", + "get_supported_utility_names", + "get_supported_utility_subdomains", +] diff --git a/src/opower/opower.py b/src/opower/opower.py new file mode 100644 index 0000000..8d669fe --- /dev/null +++ b/src/opower/opower.py @@ -0,0 +1,395 @@ +"""Implementation of opower.com JSON API.""" + +import dataclasses +from datetime import date, datetime, timedelta +from enum import Enum +import json +import logging +import re +from typing import Any +from urllib.parse import urlencode + +import aiohttp +from multidict import CIMultiDict + +from .utilities import UtilityBase + +_LOGGER = logging.getLogger(__file__) + + +class MeterType(Enum): + """Meter type. Electric or gas.""" + + ELEC = "ELEC" + GAS = "GAS" + + def __str__(self): + """Return the value of the enum.""" + return self.value + + +class UnitOfMeasure(Enum): + """Unit of measure for the associated meter type. kWh for electricity or Therm for gas,.""" + + KWH = "KWH" + THERM = "THERM" + + def __str__(self): + """Return the value of the enum.""" + return self.value + + +class AggregateType(Enum): + """How to aggregate historical data.""" + + BILL = "bill" + DAY = "day" + HOUR = "hour" + + def __str__(self): + """Return the value of the enum.""" + return self.value + + +@dataclasses.dataclass +class Account: + """Data about an account.""" + + uuid: str + utility_account_id: str + meter_type: MeterType + + +@dataclasses.dataclass +class Forecast: + """Forecast data for an account.""" + + account: Account + start_date: date + end_date: date + current_date: date + unit_of_measure: UnitOfMeasure + usage_to_date: float + cost_to_date: float + forecasted_usage: float + forecasted_cost: float + typical_usage: float + typical_cost: float + + +@dataclasses.dataclass +class CostRead: + """A read from the meter that has both consumption and cost data.""" + + start_time: datetime + end_time: datetime + consumption: float # taken from value field, in KWH or THERM + provided_cost: float # in $ + + +@dataclasses.dataclass +class UsageRead: + """A read from the meter that has consumption data.""" + + start_time: datetime + end_time: datetime + consumption: float # taken from consumption.value field, in KWH or THERM + + +def get_supported_utility_names() -> list[str]: + """Return a list of names of all supported utilities.""" + return [utility.name() for utility in UtilityBase.subclasses] + + +def get_supported_utility_subdomains() -> list[str]: + """Return a list of subdomains of all supported utilities.""" + return [utility.subdomain() for utility in UtilityBase.subclasses] + + +def _select_utility(name_or_subdomain: str) -> type[UtilityBase]: + """Return the utility with the given name or subdomain.""" + for utility in UtilityBase.subclasses: + if name_or_subdomain in [utility.name(), utility.subdomain()]: + return utility + raise ValueError(f"Utility {name_or_subdomain} not found") + + +def _get_form_action_url_and_hidden_inputs(html: str): + """Return the URL and hidden inputs from the single form in a page.""" + match = re.search(r'action="([^"]*)"', html) + if not match: + return None, None + action_url = match.group(1) + inputs = {} + for match in re.finditer( + r'input\s*type="hidden"\s*name="([^"]*)"\s*value="([^"]*)"', html + ): + inputs[match.group(1)] = match.group(2) + return action_url, inputs + + +def _strip_time(date_time: datetime): + return date_time.astimezone().replace(hour=0, minute=0, second=0, microsecond=0) + + +class Opower: + """Class that can get historical and forecasted usage/cost from an utility.""" + + def __init__( + self, session: aiohttp.ClientSession, utility: str, username: str, password: str + ) -> None: + """Initialize.""" + self.session = session + self.session._default_headers = CIMultiDict({"User-Agent": "Mozilla/5.0"}) + self.session._raise_for_status = True + self.utility: type[UtilityBase] = _select_utility(utility) + self.username = username + self.password = password + self.customer = None + + async def async_login(self) -> None: + """Login to the utility website and authorize opower.com for access.""" + url = await self.utility.login(self.session, self.username, self.password) + await self._async_authorize(url) + + async def _async_authorize(self, url: str) -> None: + # Fetch the URL on the utility website to get RelayState and SAMLResponse. + async with self.session.get(url) as resp: + result = await resp.text() + action_url, hidden_inputs = _get_form_action_url_and_hidden_inputs(result) + assert action_url == "https://sso2.opower.com/sp/ACS.saml2" + assert set(hidden_inputs.keys()) == {"RelayState", "SAMLResponse"} + + # Pass them to https://sso2.opower.com/sp/ACS.saml2 to get opentoken. + async with self.session.post(action_url, data=hidden_inputs) as resp: + result = await resp.text() + action_url, hidden_inputs = _get_form_action_url_and_hidden_inputs(result) + assert set(hidden_inputs.keys()) == {"opentoken"} + + # Pass it back to the utility website. + async with self.session.post(action_url, data=hidden_inputs) as resp: + pass + + async def async_get_accounts(self) -> list[Account]: + """Get a list of accounts for the signed in user. + + Typically one account for electricity and one for gas. + """ + customer = await self._async_get_customer() + accounts = [] + for account in customer["utilityAccounts"]: + accounts.append( + Account( + uuid=account["uuid"], + utility_account_id=account["preferredUtilityAccountId"], + meter_type=MeterType(account["meterType"]), + ) + ) + return accounts + + async def async_get_forecast(self) -> list[Forecast]: + """Get current and forecasted usage and cost for the current monthly bill. + + One forecast for each account, typically one for electricity, one for gas. + """ + customer = await self._async_get_customer() + customer_uuid = customer["uuid"] + url = ( + "https://" + f"{self.utility.subdomain()}" + ".opower.com/ei/edge/apis/bill-forecast-cws-v1/cws/" + f"{self.utility.subdomain()}" + "/customers/" + f"{customer_uuid}" + "/combined-forecast" + ) + _LOGGER.debug("Fetching: %s", url) + async with self.session.get(url) as resp: + result = await resp.json() + _LOGGER.debug("Fetched: %s", json.dumps(result, indent=2)) + forecasts = [] + for forecast in result["accountForecasts"]: + forecasts.append( + Forecast( + account=Account( + uuid=forecast["accountUuids"][0], + utility_account_id=str(forecast["preferredUtilityAccountId"]), + meter_type=MeterType(forecast["meterType"]), + ), + start_date=date.fromisoformat(forecast["startDate"]), + end_date=date.fromisoformat(forecast["endDate"]), + current_date=date.fromisoformat(forecast["currentDate"]), + unit_of_measure=UnitOfMeasure(forecast["unitOfMeasure"]), + usage_to_date=float(forecast["usageToDate"]), + cost_to_date=float(forecast["costToDate"]), + forecasted_usage=float(forecast["forecastedUsage"]), + forecasted_cost=float(forecast["forecastedCost"]), + typical_usage=float(forecast["typicalUsage"]), + typical_cost=float(forecast["typicalCost"]), + ) + ) + return forecasts + + async def _async_get_customer(self): + """Get information about the current customer.""" + # Cache the customer data + if not self.customer: + async with self.session.get( + "https://" + f"{self.utility.subdomain()}" + ".opower.com/ei/edge/apis/multi-account-v1/cws/" + f"{self.utility.subdomain()}" + # Alternative to get multiple accounts: + # /customers?offset=0&batchSize=100&addressFilter= + "/customers/current" + ) as resp: + self.customer = await resp.json() + _LOGGER.debug("Fetched: %s", json.dumps(self.customer, indent=2)) + assert self.customer + return self.customer + + async def async_get_cost_reads( + self, + account: Account, + aggregate_type: AggregateType, + start_date: datetime | None = None, + end_date: datetime | None = None, + ) -> list[CostRead]: + """Get cost data for the selected account in the given date range aggregated by bill/day/hour. + + The resolution for gas is typically 'day' while for electricity it's hour or quarter hour. + Opower typically keeps historical cost data for 3 years. + """ + url = ( + "https://" + f"{self.utility.subdomain()}" + ".opower.com/ei/edge/apis/DataBrowser-v1/cws/cost/utilityAccount/" + f"{account.uuid}" + ) + reads = await self._async_get_dated_data( + url, aggregate_type, start_date, end_date + ) + result = [] + for read in reads: + result.append( + CostRead( + start_time=datetime.fromisoformat(read["startTime"]), + end_time=datetime.fromisoformat(read["endTime"]), + consumption=read["value"], + provided_cost=read["providedCost"], + ) + ) + # Remove last entries with 0 values + while result: + last = result.pop() + if last.provided_cost != 0 or last.consumption != 0: + result.append(last) + break + return result + + async def async_get_usage_reads( + self, + account: Account, + aggregate_type: AggregateType, + start_date: datetime | None = None, + end_date: datetime | None = None, + ) -> list[UsageRead]: + """Get usage data for the selected account in the given date range aggregated by bill/day/hour. + + The resolution for gas is typically 'day' while for electricity it's hour or quarter hour. + Opower typically keeps historical usage data for a bit over 3 years. + """ + url = ( + "https://" + f"{self.utility.subdomain()}" + ".opower.com/ei/edge/apis/DataBrowser-v1/cws/utilities/" + f"{self.utility.subdomain()}" + "/utilityAccounts/" + f"{account.uuid}" + "/reads" + ) + reads = await self._async_get_dated_data( + url, aggregate_type, start_date, end_date + ) + result = [] + for read in reads: + result.append( + UsageRead( + start_time=datetime.fromisoformat(read["startTime"]), + end_time=datetime.fromisoformat(read["endTime"]), + consumption=read["consumption"]["value"], + ) + ) + return result + + async def _async_get_dated_data( + self, + url: str, + aggregate_type: AggregateType, + start_date: datetime | None = None, + end_date: datetime | None = None, + ) -> list[Any]: + """Wrap _async_fetch by breaking requests for big date ranges to smaller ones to satisfy opower imposed limits.""" + if start_date is None: + if aggregate_type == AggregateType.BILL: + return await self._async_fetch( + url, aggregate_type, start_date, end_date + ) + raise ValueError("Missing start_date") + + if end_date is None: + end_date = datetime.max + end_date = min(datetime.now(), end_date) + + start_date = _strip_time(start_date) + end_date = _strip_time(end_date) + timedelta(days=1) + assert start_date + assert end_date + + max_request_days = None + if aggregate_type == AggregateType.DAY: + max_request_days = 363 + elif aggregate_type == AggregateType.HOUR: + max_request_days = 26 + + # Fetch data in batches in reverse chronological order + # until we reach start_date or there is no fetched data + # (non bill data are available up to 3 years ago). + result: list[Any] = [] + req_end = end_date + while True: + req_start = start_date + if max_request_days: + req_start = max(start_date, req_end - timedelta(days=max_request_days)) + if req_start >= req_end: + return result + reads = await self._async_fetch(url, aggregate_type, req_start, req_end) + if not reads: + return result + result = reads + result + req_end = req_start - timedelta(days=1) + + async def _async_fetch( + self, + url: str, + aggregate_type: AggregateType, + start_date: datetime | None = None, + end_date: datetime | None = None, + ) -> list[Any]: + convert_to_date = not ( + aggregate_type == AggregateType.DAY and "/cws/cost/" in url + ) + params = {"aggregateType": aggregate_type.value} + if start_date: + params["startDate"] = ( + start_date.date() if convert_to_date else start_date + ).isoformat() + if end_date: + params["endDate"] = ( + end_date.date() if convert_to_date else end_date + ).isoformat() + _LOGGER.debug("Fetching: %s?%s", url, urlencode(params)) + async with self.session.get(url, params=params) as resp: + result = await resp.json() + _LOGGER.debug("Fetched: %s", json.dumps(result, indent=2)) + return result["reads"] diff --git a/src/opower/utilities/__init__.py b/src/opower/utilities/__init__.py new file mode 100644 index 0000000..295863d --- /dev/null +++ b/src/opower/utilities/__init__.py @@ -0,0 +1,16 @@ +"""Directory of all supported utility websites that use opower.com API.""" + +from .base import UtilityBase + +__all__ = ["UtilityBase"] + +# Import all modules in the current directory. +from importlib import import_module +from pathlib import Path + +for f in Path(__file__).parent.glob("*.py"): + module_name = f.stem + if (not module_name.startswith("_")) and (module_name not in globals()): + import_module(f".{module_name}", __package__) + del f, module_name +del import_module, Path diff --git a/src/opower/utilities/base.py b/src/opower/utilities/base.py new file mode 100644 index 0000000..4250325 --- /dev/null +++ b/src/opower/utilities/base.py @@ -0,0 +1,35 @@ +"""Base class that each utility needs to extend.""" + + +import aiohttp + + +class UtilityBase: + """Base class that each utility needs to extend.""" + + subclasses: list[type["UtilityBase"]] = [] + + def __init_subclass__(cls, **kwargs) -> None: + """Keep track of all subclass implementations.""" + super().__init_subclass__(**kwargs) + cls.subclasses.append(cls) + + @staticmethod + def name() -> str: + """Distinct recognizable name of the utility.""" + raise NotImplementedError + + @staticmethod + def subdomain() -> str: + """Return the opower.com subdomain for this utility.""" + raise NotImplementedError + + @staticmethod + async def login( + session: aiohttp.ClientSession, username: str, password: str + ) -> str: + """Login to the utility website and return a URL where we can authorize opower.com. + + Any failure to login should raise ClientResponseError with status 401 or 403. + """ + raise NotImplementedError diff --git a/src/opower/utilities/pge.py b/src/opower/utilities/pge.py new file mode 100644 index 0000000..7d215dd --- /dev/null +++ b/src/opower/utilities/pge.py @@ -0,0 +1,91 @@ +"""Pacific Gas & Electric (PG&E).""" + +import aiohttp +from aiohttp.client_exceptions import ClientResponseError + +from .base import UtilityBase + + +class PGE(UtilityBase): + """Pacific Gas & Electric (PG&E).""" + + @staticmethod + def name() -> str: + """Distinct recognizable name of the utility.""" + return "Pacific Gas & Electric" + + @staticmethod + def subdomain() -> str: + """Return the opower.com subdomain for this utility.""" + return "pge" + + @staticmethod + async def login( + session: aiohttp.ClientSession, username: str, password: str + ) -> str: + """Login to the utility website and return a URL where we can authorize opower.com.""" + # 1st way of login + async with session.post( + "https://www.pge.com/eimpapi/auth/login", + json={ + "username": username, + "password": password, + "appName": "CustomerSSO", + }, + ) as resp: + result = await resp.json() + if "errorMsg" in result: + raise ClientResponseError( + resp.request_info, + resp.history, + status=403, + message=result["errorMsg"], + ) + + # 2nd way of login + # async with session.get( + # "https://apigprd.cloud.pge.com/myaccount/v1/login", + # headers={ + # "Authorization": "Basic " + # + base64.b64encode(f"{username}:{password}".encode()).decode(), + # }, + # ) as resp: + # await resp.json() + + # Skip the following 2 requests since emToolUrl is constant. + # If it ever changes consider uncommenting. + # These only work with the 2nd way of login. + + # async with session.get( + # "https://apigprd.cloud.pge.com/myaccount/v1/cocaccount/secure/account/retrieveMyEnergyAccounts", + # params=(("userId", username),), + # ) as resp: + # energy_accounts = await resp.json() + + # for energy_account in energy_accounts["accounts"]: + # accountNumber = energy_account["accountNumber"] + # addressAsString = energy_account["accountAddress"]["addressAsString"] + # print(accountNumber) + # print(addressAsString) + # break + + # async with session.get( + # f"https://apigprd.cloud.pge.com/myaccount/v1/cocaccount/secure/retrieveEnergyManagementInfo/{accountNumber}/myusage" + # ) as resp: + # result = await resp.json() + + # for energyManagementInfo in result["energyManagementInfoList"]: + # if energyManagementInfo["vendorType"] == "OPOWER": + # emToolUrl = energyManagementInfo["emToolUrl"] + # break + # assert emToolUrl == ( + # "https://itiamping.cloud.pge.com/idp/startSSO.ping?" + # "PartnerSpId=sso.opower.com&TargetResource=" + # "https%3A%2F%2Fpge.opower.com%2Fei%2Fapp%2Fr%2Fenergy-usage-details" + # ) + + return ( + "https://itiamping.cloud.pge.com/idp/startSSO.ping?" + "PartnerSpId=sso.opower.com&TargetResource=" + "https%3A%2F%2Fpge.opower.com%2Fei%2Fapp%2Fr%2Fenergy-usage-details" + ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..a9740a0 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for opower.""" diff --git a/tests/test_opower.py b/tests/test_opower.py new file mode 100644 index 0000000..42b2c5f --- /dev/null +++ b/tests/test_opower.py @@ -0,0 +1,6 @@ +"""Tests for Opower.""" + + +def test_dummy(): + """Test dummy.""" + assert True