Skip to content

Commit

Permalink
Convert uploader to http client
Browse files Browse the repository at this point in the history
  • Loading branch information
strtgbb committed Jan 16, 2025
1 parent 771f68e commit ba84cd3
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 44 deletions.
93 changes: 50 additions & 43 deletions .github/upload_results_to_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
import os
from datetime import datetime
from pprint import pprint
import asyncio

import requests
import asynch

from testflows.core import *
from testflows._core.transform.log.pipeline import ResultsLogPipeline
Expand Down Expand Up @@ -49,7 +47,7 @@
"commit_hash": "commit.hash",
"job_url": "job.url",
"report_url": "report.url",
"start_time": "start_datetime",
"start_time": "start_time",
"scheduled": "job.is_scheduled",
},
"test_results": {
Expand Down Expand Up @@ -258,7 +256,7 @@ def report_url(self) -> str:
"""
This is a fallback if test_attributes report.url does not exist.
"""
return os.getenv(REPORT_URL_VAR, "")
return os.getenv(REPORT_URL_VAR)

def read_raw_log(self, log_lines=None):
"""
Expand Down Expand Up @@ -327,10 +325,10 @@ def get_common_attributes(self):
common_attributes = {}

for key, value in table_schema_attr_map["pr_info"].items():
common_attributes[key] = self.pr_info.get(value, "")
common_attributes[key] = self.pr_info.get(value, None)

for key, value in table_schema_attr_map["test_attributes"].items():
common_attributes[key] = self.test_attributes.get(value, "")
common_attributes[key] = self.test_attributes.get(value, None)

if common_attributes["report_url"] is None:
url = self.report_url()
Expand All @@ -339,7 +337,7 @@ def get_common_attributes(self):
common_attributes["report_url"] = url

for key in table_schema_attr_map["test_results"].keys():
common_attributes[key] = ""
common_attributes[key] = None

return common_attributes

Expand All @@ -351,10 +349,7 @@ def iter_formatted_test_results(self, common_attributes):
row = common_attributes.copy()
for schema_attr, test_attr in table_schema_attr_map["test_results"].items():
if test_attr in test_result:
value = test_result[test_attr]
if value is None:
value = ""
row[schema_attr] = value
row[schema_attr] = test_result[test_attr]
yield row

def write_csv(self):
Expand All @@ -375,7 +370,7 @@ def write_csv(self):
for test_result in self.iter_formatted_test_results(common_attributes):
writer.writerow(test_result)

async def upload_results(
def upload_results(
self,
db,
table,
Expand All @@ -387,7 +382,7 @@ async def upload_results(
verify=None,
):

async with Given("database credentials"):
with Given("database credentials"):
if db_host is None:
db_host = os.getenv(DATABASE_HOST_VAR)
assert db_host, "Failed to get database host from environment"
Expand All @@ -398,32 +393,47 @@ async def upload_results(
if db_password is None:
db_password = os.getenv(DATABASE_PASSWORD_VAR, "")

async with And("common attributes for this test run"):
if db_port is None:
db_port = 8443 if secure else 8123

with And("common attributes for this test run"):
common_attributes = self.get_common_attributes()

async with And("a list of all test results"):
with And("a list of all test results"):
rows = self.iter_formatted_test_results(common_attributes)
rows = list(rows)
note(f"There are {len(rows)} records to insert")

try:
async with Given("a database client"):
client = await asynch.connect(
host=db_host,
user=db_user,
password=db_password,
port=db_port,
secure=secure,
with And("a database client"):
session = requests.Session()
session.headers.update(
{
"X-ClickHouse-User": db_user,
"X-ClickHouse-Key": db_password,
"X-ClickHouse-Database": db,
}
)

with Then("inserting test results"):
if secure:
url = f"https://{db_host}:{db_port}/"
else:
url = f"http://{db_host}:{db_port}/"

chunk_size = 100
for i in range(0, len(rows), chunk_size):
chunk = rows[i : i + chunk_size]
r = session.post(
url,
params={
"query": f"INSERT INTO {table} FORMAT JSON",
"input_format_null_as_default": 1,
},
json=chunk,
verify=verify,
)

async with Then("inserting test results"):
async with client.cursor(cursor=asynch.cursors.DictCursor) as cursor:
r = await cursor.execute(f"INSERT INTO `{db}`.{table} VALUES", rows)
note(f"Inserted {r} records")
finally:
async with Finally("closing database client"):
await client.close()
assert r.status_code == 200, f"Failed to insert records: {r.text}"
note(f"Uploaded {i + len(chunk)} of {len(rows)} records")

def report_from_compressed_log(self, log_path=None):
args = namedtuple(
Expand Down Expand Up @@ -509,22 +519,19 @@ def run_upload(
if self.debug:
with And("printing debug info"):
pprint(self.pr_info, indent=2)
pprint(self.get_common_attributes(), indent=2)
pprint(self.test_attributes, indent=2)
pprint(self.test_results[-1], indent=2)

with And("uploading results"):
asyncio.run(
self.upload_results(
db=db,
table=table,
db_host=db_host,
db_user=db_user,
db_password=db_password,
db_port=db_port,
secure=secure,
verify=verify,
)
self.upload_results(
db=db,
table=table,
db_host=db_host,
db_user=db_user,
db_password=db_password,
db_port=db_port,
secure=secure,
verify=verify,
)


Expand Down
2 changes: 1 addition & 1 deletion .github/upload_results_to_database.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ set -x

if [[ $1 == 1 ]];
then
./.github/upload_results_to_database.py -o nice --log-file raw.log --db-name="gh-data" --db-port=9440 --secure --no-verify --table="clickhouse_regression_results" --log uploader.log
./.github/upload_results_to_database.py -o nice --log-file raw.log --db-name="gh-data" --db-port=8443 --secure --no-verify --table="clickhouse_regression_results" --log uploader.log
aws s3 cp uploader.log $SUITE_REPORT_BUCKET_PATH/uploader.log
fi

0 comments on commit ba84cd3

Please sign in to comment.