Skip to content

Commit

Permalink
add proxy config
Browse files Browse the repository at this point in the history
  • Loading branch information
[email protected] committed Jan 10, 2025
1 parent eec189b commit f5a6316
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 36 deletions.
24 changes: 21 additions & 3 deletions packages/opal-client/opal_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ class OpalClientConfig(Confi):
"http://localhost:8181",
description="The URL of the policy store (e.g., OPA agent).",
)

POLICY_STORE_PROXY_URL = confi.str(
"POLICY_STORE_PROXY_URL",
None,
description="The URL of the proxy to use for the policy store.",
)
POLICY_STORE_AUTH_TYPE = confi.enum(
"POLICY_STORE_AUTH_TYPE",
PolicyStoreAuth,
Expand Down Expand Up @@ -96,7 +100,21 @@ class OpalClientConfig(Confi):
},
description="Retry options when connecting to the base data source (e.g. an external API server which returns data snapshot)",
)

DATA_FETCHER_PROXY_URL = confi.str(
"DATA_FETCHER_PROXY_URL",
None,
description="The URL of the proxy to use for the data fetcher.",
)
POLICY_PROXY_URL = confi.str(
"POLICY_PROXY_URL",
None,
description="The URL of the proxy to use for the policy.",
)
LIMITER_SERVER_PROXY_URL = confi.str(
"LIMITER_PROXY_URL",
None,
description="The URL of the proxy to use for the limiter.",
)
POLICY_STORE_POLICY_PATHS_TO_IGNORE = confi.list(
"POLICY_STORE_POLICY_PATHS_TO_IGNORE",
[],
Expand Down Expand Up @@ -389,4 +407,4 @@ def on_load(self):
self.DATA_UPDATER_CONN_RETRY = self.DATA_STORE_CONN_RETRY


opal_client_config = OpalClientConfig(prefix="OPAL_")
opal_client_config = OpalClientConfig(prefix="OPAL_")
6 changes: 5 additions & 1 deletion packages/opal-client/opal_client/data/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def __init__(
self._polling_update_tasks = []
self._on_connect_callbacks = on_connect or []
self._on_disconnect_callbacks = on_disconnect or []
self._aiohttp_client_session_args = {}
if opal_client_config.DATA_FETCHER_PROXY_URL:
_aiohttp_client_session_args['proxy'] = opal_client_config.DATA_FETCHER_PROXY_URL

async def __aenter__(self):
await self.start()
Expand Down Expand Up @@ -183,7 +186,8 @@ async def get_policy_data_config(self, url: str = None) -> DataSourceConfig:
url = self._data_sources_config_url
logger.info("Getting data-sources configuration from '{source}'", source=url)
try:
async with ClientSession(headers=self._extra_headers) as session:
_aiohttp_client_session_args['headers'] = self._extra_headers
async with ClientSession(**self._aiohttp_client_session_args) as session:
response = await session.get(url, **self._ssl_context_kwargs)
if response.status == 200:
return DataSourceConfig.parse_obj(await response.json())
Expand Down
5 changes: 4 additions & 1 deletion packages/opal-client/opal_client/limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ def __init__(self, backend_url=None, token=None):
if self._custom_ssl_context is not None
else {}
)
self._aiohttp_client_session_args = {}
if opal_client_config.LIMITER_SERVER_PROXY_URL:
_aiohttp_client_session_args['proxy'] = opal_client_config.LIMITER_SERVER_PROXY_URL

@retry(wait=wait_random_exponential(max=10), stop=stop.stop_never, reraise=True)
async def wait_for_server_ready(self):
logger.info("Trying to get server's load limit pass")
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
async with session.get(
self._loadlimit_endpoint_url,
Expand Down
5 changes: 4 additions & 1 deletion packages/opal-client/opal_client/policy/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def __init__(self, backend_url=None, token=None):
if self._custom_ssl_context is not None
else {}
)
self._aiohttp_client_session_args = {}
if opal_client_config.POLICY_PROXY_URL:
_aiohttp_client_session_args['proxy'] = opal_client_config.POLICY_PROXY_URL

@property
def policy_endpoint_url(self):
Expand Down Expand Up @@ -85,7 +88,7 @@ async def _fetch_policy_bundle(
params = {"path": directories}
if base_hash is not None:
params["base_hash"] = base_hash
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
logger.info(
"Fetching policy bundle from {url}",
url=self._policy_endpoint_url,
Expand Down
23 changes: 11 additions & 12 deletions packages/opal-client/opal_client/policy_store/cedar_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def __init__(
self._had_successful_policy_transaction = False
self._most_recent_data_transaction: Optional[StoreTransaction] = None
self._most_recent_policy_transaction: Optional[StoreTransaction] = None
self._aiohttp_client_session_args = {}
if opal_client_config.POLICY_STORE_PROXY_URL:
_aiohttp_client_session_args['proxy'] = opal_client_config.POLICY_STORE_PROXY_URL

if auth_type == PolicyStoreAuth.TOKEN:
if self._token is None:
Expand Down Expand Up @@ -75,7 +78,7 @@ async def set_policy(
f"Ignoring setting policy - {policy_id}, set in POLICY_STORE_POLICY_PATHS_TO_IGNORE."
)
return
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()
async with session.put(
Expand All @@ -99,7 +102,7 @@ async def set_policy(
@fail_silently()
@retry(**RETRY_CONFIG)
async def get_policy(self, policy_id: str) -> Optional[str]:
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand All @@ -116,7 +119,7 @@ async def get_policy(self, policy_id: str) -> Optional[str]:
@fail_silently()
@retry(**RETRY_CONFIG)
async def get_policies(self) -> Optional[Dict[str, str]]:
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand All @@ -140,8 +143,7 @@ async def delete_policy(self, policy_id: str, transaction_id: Optional[str] = No
f"Ignoring deleting policy - {policy_id}, set in POLICY_STORE_POLICY_PATHS_TO_IGNORE."
)
return

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand Down Expand Up @@ -175,8 +177,7 @@ async def set_policy_data(
logger.warning(
"OPAL client was instructed to put something that is not a list on Cedar. This will probably not work."
)

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()
async with session.put(
Expand Down Expand Up @@ -204,8 +205,7 @@ async def delete_policy_data(
):
if path != "":
raise ValueError("Cedar can only change the entire data structure at once.")

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand Down Expand Up @@ -237,8 +237,7 @@ async def get_data(self, path: str) -> Dict:
raise ValueError("Cedar can only change the entire data structure at once.")
try:
headers = await self._get_auth_headers()

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
async with session.get(
f"{self._cedar_url}/data", headers=headers
) as cedar_response:
Expand Down Expand Up @@ -312,4 +311,4 @@ async def set_policies(
for module_id in deleted_modules:
print(module_id)
await self.delete_policy(policy_id=module_id)
self._policy_version = bundle.hash
self._policy_version = bundle.hash
32 changes: 14 additions & 18 deletions packages/opal-client/opal_client/policy_store/opa_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ def __init__(
self._last_failed_policy_transaction: Optional[StoreTransaction] = None
self._last_data_transaction: Optional[StoreTransaction] = None
self._last_failed_data_transaction: Optional[StoreTransaction] = None
self._aiohttp_client_session_args = {}
if opal_client_config.POLICY_STORE_PROXY_URL:
self._aiohttp_client_session_args['proxy'] = opal_client_config.POLICY_STORE_PROXY_URL

@property
def ready(self) -> bool:
Expand Down Expand Up @@ -410,8 +413,7 @@ async def get_policy_version(self) -> Optional[str]:
@retry(**RETRY_CONFIG)
async def _get_oauth_token(self):
logger.info("Retrieving a new OAuth access_token.")

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
async with session.post(
self._oauth_server,
Expand Down Expand Up @@ -475,7 +477,7 @@ async def set_policy(
f"Ignoring setting policy - {policy_id}, set in POLICY_STORE_POLICY_PATHS_TO_IGNORE."
)
return
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand All @@ -500,7 +502,7 @@ async def set_policy(
@fail_silently()
@retry(**RETRY_CONFIG)
async def get_policy(self, policy_id: str) -> Optional[str]:
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand All @@ -518,7 +520,7 @@ async def get_policy(self, policy_id: str) -> Optional[str]:
@fail_silently()
@retry(**RETRY_CONFIG)
async def get_policies(self) -> Optional[Dict[str, str]]:
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand All @@ -544,8 +546,7 @@ async def delete_policy(self, policy_id: str, transaction_id: Optional[str] = No
f"Ignoring deleting policy - {policy_id}, set in POLICY_STORE_POLICY_PATHS_TO_IGNORE."
)
return

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand Down Expand Up @@ -758,8 +759,7 @@ async def set_policy_data(
"OPAL client was instructed to put a list on OPA's root document. In OPA the root document must be an object so the original value was wrapped."
)
policy_data = {"items": policy_data}

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()
data = json.dumps(exclude_none_fields(policy_data))
Expand Down Expand Up @@ -799,8 +799,7 @@ async def patch_policy_data(
"OPAL client was instructed to put a list on OPA's root document. In OPA the root document must be an object so the original value was wrapped."
)
policy_data = {"items": policy_data}

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()
headers["Content-Type"] = "application/json-patch+json"
Expand Down Expand Up @@ -833,8 +832,7 @@ async def delete_policy_data(
path = self._safe_data_module_path(path)
if not path:
return await self.set_policy_data({})

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
try:
headers = await self._get_auth_headers()

Expand Down Expand Up @@ -871,8 +869,7 @@ async def get_data(self, path: str) -> Dict:
path = "/" + path
try:
headers = await self._get_auth_headers()

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
async with session.get(
f"{self._opa_url}/data{path}",
headers=headers,
Expand Down Expand Up @@ -901,8 +898,7 @@ async def get_data_with_input(self, path: str, input: BaseModel) -> Dict:
path = path[1:]
try:
headers = await self._get_auth_headers()

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(**self._aiohttp_client_session_args) as session:
async with session.post(
f"{self._opa_url}/data/{path}",
data=json.dumps(opa_input),
Expand Down Expand Up @@ -968,4 +964,4 @@ async def full_import(self, reader: AsyncTextIOWrapper) -> None:
]
)

await self.set_policy_data(import_data["data"])
await self.set_policy_data(import_data["data"])

0 comments on commit f5a6316

Please sign in to comment.