From 7aa43b18d85a7d40c4369dd3fe3f8500847c0e6e Mon Sep 17 00:00:00 2001 From: Oleg Klimov Date: Sat, 20 Apr 2024 16:45:58 +0200 Subject: [PATCH 1/9] headless operation --- .../inference/stream_results.py | 6 ++++-- self_hosting_machinery/scripts/first_run.py | 18 +++++------------- .../watchdog/docker_watchdog.py | 17 +++++++++++------ 3 files changed, 20 insertions(+), 21 deletions(-) diff --git a/self_hosting_machinery/inference/stream_results.py b/self_hosting_machinery/inference/stream_results.py index f77730dc..2f9815c2 100644 --- a/self_hosting_machinery/inference/stream_results.py +++ b/self_hosting_machinery/inference/stream_results.py @@ -108,10 +108,12 @@ def completions_wait_batch(req_session: requests.Session, my_desc, verbose=False if json_resp is None: return "ERROR", [] t1 = time.time() - logger.info("%0.1fms %s %s" % (1000*(t1 - t0), url, termcolor.colored(json_resp.get("retcode", "no retcode"), "green"))) + retcode = json_resp.get("retcode", "ERROR") + if retcode != "WAIT": + logger.info("%0.1fms %s %s" % (1000*(t1 - t0), url, termcolor.colored(retcode, "green"))) if verbose or "retcode" not in json_resp: logger.warning("%s unrecognized json: %s" % (url, json.dumps(json_resp, indent=4))) - return json_resp.get("retcode", "ERROR"), json_resp.get("batch", []) + return retcode, json_resp.get("batch", []) def head_and_tail(base: str, modified: str): diff --git a/self_hosting_machinery/scripts/first_run.py b/self_hosting_machinery/scripts/first_run.py index fcd57d40..f2716008 100644 --- a/self_hosting_machinery/scripts/first_run.py +++ b/self_hosting_machinery/scripts/first_run.py @@ -5,13 +5,13 @@ from refact_utils.scripts import env -def copy_watchdog_configs_if_first_run_detected(model_assigner: ModelAssigner): +def assign_gpus_if_first_run_detected(model_assigner: ModelAssigner): if not os.path.exists(env.CONFIG_ENUM_GPUS): enum_gpus.enum_gpus() - model_assigner.first_run() + model_assigner.first_run() # has models_to_watchdog_configs() inside -def convert_old_configs(model_assigner: ModelAssigner): +def convert_old_configs(): # longthink.cfg and openai_api_worker.cfg are deprecated watchdog configs old_longthink = os.path.join(env.DIR_WATCHDOG_D, "longthink.cfg") if os.path.exists(old_longthink): @@ -20,16 +20,8 @@ def convert_old_configs(model_assigner: ModelAssigner): if os.path.exists(openai_watchdog_cfg_fn): os.unlink(openai_watchdog_cfg_fn) - for gpu in range(16): - fn = os.path.join(env.DIR_WATCHDOG_D, "model-gpu%d.cfg" % gpu) - if not os.path.exists(fn): - continue - text = open(fn).read() - - model_assigner.models_to_watchdog_configs() - if __name__ == '__main__': + convert_old_configs() model_assigner = ModelAssigner() - convert_old_configs(model_assigner) - copy_watchdog_configs_if_first_run_detected(model_assigner) + assign_gpus_if_first_run_detected(model_assigner) diff --git a/self_hosting_machinery/watchdog/docker_watchdog.py b/self_hosting_machinery/watchdog/docker_watchdog.py index 3c4e3bf8..dc3fdd2c 100644 --- a/self_hosting_machinery/watchdog/docker_watchdog.py +++ b/self_hosting_machinery/watchdog/docker_watchdog.py @@ -15,6 +15,7 @@ from refact_utils.scripts import env +HEADLESS = ("--headless" in sys.argv) # for ui-less docker: skip all standard cfg-s (web ui, enum gpus, etc), skip first_run script FIRST_RUN_CMDLINE = [sys.executable, "-m", "self_hosting_machinery.scripts.first_run"] @@ -304,9 +305,8 @@ def __str__(self): f" remove: {self.remove_this}\n" \ f" status: {self.status_from_stderr}\n" - tracked: Dict[str, TrackedJob] = {} -watchdog_templates = list(Path(env.DIR_WATCHDOG_TEMPLATES).iterdir()) +watchdog_templates = list(Path(env.DIR_WATCHDOG_TEMPLATES).iterdir()) if not HEADLESS else [] def create_tracked_jobs_from_configs(): @@ -446,14 +446,19 @@ def factory_reset(): def first_run(): + if HEADLESS: + return subprocess.check_call(FIRST_RUN_CMDLINE) def main_loop(): - # Generate a random SMALLCLOUD_API_KEY, it will be inherited by subprocesses, - # this allows inference_worker to authorize on the local web server (both use - # this variable), and work safely even if we expose http port to the world. - os.environ["SMALLCLOUD_API_KEY"] = str(uuid.uuid4()) + if not HEADLESS: + # Generate a random SMALLCLOUD_API_KEY, it will be inherited by subprocesses, + # this allows inference_worker to authorize on the local web server (both use + # this variable), and work safely even if we expose http port to the world. + os.environ["SMALLCLOUD_API_KEY"] = str(uuid.uuid4()) + else: + assert "SMALLCLOUD_API_KEY" in os.environ first_run() while 1: From 28c3cd5ae124bd78da086f1b7039186c48e6ea09 Mon Sep 17 00:00:00 2001 From: Oleg Klimov Date: Sun, 21 Apr 2024 09:49:41 +0200 Subject: [PATCH 2/9] user agent version --- refact_webgui/webgui/selfhost_fastapi_completions.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/refact_webgui/webgui/selfhost_fastapi_completions.py b/refact_webgui/webgui/selfhost_fastapi_completions.py index 5c12e476..8ba0dba2 100644 --- a/refact_webgui/webgui/selfhost_fastapi_completions.py +++ b/refact_webgui/webgui/selfhost_fastapi_completions.py @@ -276,7 +276,12 @@ async def _coding_assistant_caps(self): log(f"Your refact-lsp version is deprecated, finetune is unavailable. Please update your plugin.") return Response(content=json.dumps(self._caps_base_data(), indent=4), media_type="application/json") - async def _caps(self, authorization: str = Header(None)): + async def _caps(self, authorization: str = Header(None), user_agent: str = Header(None)): + if isinstance(user_agent, str): + m = re.match(r"^refact-lsp (\d+)\.(\d+)\.(\d+)$", user_agent) + if m: + major, minor, patch = map(int, m.groups()) + log("user version %d.%d.%d" % (major, minor, patch)) data = self._caps_base_data() running = running_models_and_loras(self._model_assigner) From 7484a9b6c7589bad9bb982dd77d7d19105b73d81 Mon Sep 17 00:00:00 2001 From: Oleg Klimov Date: Tue, 23 Apr 2024 08:48:19 +0200 Subject: [PATCH 3/9] embedding model logs --- .../inference/inference_embeddings.py | 21 ++++++++++++------- .../inference/inference_worker.py | 5 +++-- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/self_hosting_machinery/inference/inference_embeddings.py b/self_hosting_machinery/inference/inference_embeddings.py index c7b832a1..43a477bb 100644 --- a/self_hosting_machinery/inference/inference_embeddings.py +++ b/self_hosting_machinery/inference/inference_embeddings.py @@ -15,6 +15,8 @@ from self_hosting_machinery.inference import InferenceBase from self_hosting_machinery.inference.lora_loader_mixin import LoraLoaderMixin +log = logging.getLogger("MODEL").info + class InferenceEmbeddings(InferenceBase, LoraLoaderMixin): def __init__( @@ -34,7 +36,7 @@ def __init__( for local_files_only in [True, False]: try: # WARNING: this may not work if you have no access to the web as it may try to download tokenizer - logging.getLogger("MODEL").info("loading model local_files_only=%i" % local_files_only) + log("loading model local_files_only=%i" % local_files_only) if local_files_only: self._model = SentenceTransformer( os.path.join(self.cache_dir, self._model_dir), @@ -71,17 +73,22 @@ def cache_dir(self) -> str: return env.DIR_WEIGHTS def infer(self, request: Dict[str, Any], upload_proxy: Any, upload_proxy_args: Dict, log=print): - request_id = request["id"] try: + inputs = request["inputs"] + B = len(inputs) + log("embeddings B=%d" % B) upload_proxy_args["ts_prompt"] = time.time() if request_id in upload_proxy.check_cancelled(): return - + t0 = time.time() files = { - "results": json.dumps(self._model.encode(request["inputs"]).tolist()), + "results": json.dumps(self._model.encode(inputs).tolist()), } - + log("/embeddings %0.3fs" % (time.time() - t0)) + # 8 => 0.141s 0.023s + # 64 => 0.166s 0.060s + # 128 => 0.214s 0.120s *1024 => 1.600s upload_proxy_args["ts_batch_finished"] = time.time() finish_reason = 'DONE' upload_proxy.upload_result( @@ -94,5 +101,5 @@ def infer(self, request: Dict[str, Any], upload_proxy: Any, upload_proxy_args: D ) except Exception as e: # noqa - logging.getLogger("MODEL").error(e) - logging.getLogger("MODEL").error(traceback.format_exc()) + log.error(e) + log.error(traceback.format_exc()) diff --git a/self_hosting_machinery/inference/inference_worker.py b/self_hosting_machinery/inference/inference_worker.py index 59bb2fd3..c9265ce9 100644 --- a/self_hosting_machinery/inference/inference_worker.py +++ b/self_hosting_machinery/inference/inference_worker.py @@ -43,7 +43,7 @@ def worker_loop(model_name: str, models_db: Dict[str, Any], supported_models: Di dummy_call = { 'id': 'emb-legit-42', 'function': 'embeddings', - 'inputs': "Common Knowledge", + 'inputs': 128*["A"*8000], # max size validated at 9000 chars, 128 batch size 'created': time.time(), } else: @@ -73,7 +73,8 @@ def check_cancelled(*args, **kwargs): return set() log("STATUS test batch") - inference_model.infer(dummy_call, DummyUploadProxy, {}) + for _ in range(10): + inference_model.infer(dummy_call, DummyUploadProxy, {}) if compile: return From 65010f214caff91d20d9558caac661b7cf1ea53d Mon Sep 17 00:00:00 2001 From: Oleg Klimov Date: Tue, 23 Apr 2024 09:14:17 +0200 Subject: [PATCH 4/9] logs --- .../inference/inference_embeddings.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/self_hosting_machinery/inference/inference_embeddings.py b/self_hosting_machinery/inference/inference_embeddings.py index 43a477bb..8e1b642a 100644 --- a/self_hosting_machinery/inference/inference_embeddings.py +++ b/self_hosting_machinery/inference/inference_embeddings.py @@ -15,7 +15,9 @@ from self_hosting_machinery.inference import InferenceBase from self_hosting_machinery.inference.lora_loader_mixin import LoraLoaderMixin -log = logging.getLogger("MODEL").info + +def log(*args): + logging.getLogger("MODEL").info(*args) class InferenceEmbeddings(InferenceBase, LoraLoaderMixin): @@ -72,7 +74,7 @@ def model_dict(self) -> Dict[str, Any]: def cache_dir(self) -> str: return env.DIR_WEIGHTS - def infer(self, request: Dict[str, Any], upload_proxy: Any, upload_proxy_args: Dict, log=print): + def infer(self, request: Dict[str, Any], upload_proxy: Any, upload_proxy_args: Dict): request_id = request["id"] try: inputs = request["inputs"] @@ -101,5 +103,5 @@ def infer(self, request: Dict[str, Any], upload_proxy: Any, upload_proxy_args: D ) except Exception as e: # noqa - log.error(e) - log.error(traceback.format_exc()) + log(e) + log(traceback.format_exc()) From 5e9311db9eaa531724cacf25089ebd3289efa412 Mon Sep 17 00:00:00 2001 From: Oleg Klimov Date: Thu, 2 May 2024 07:53:56 +0200 Subject: [PATCH 5/9] watchdog: catch more psutil.NoSuchProcess --- self_hosting_machinery/watchdog/docker_watchdog.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/self_hosting_machinery/watchdog/docker_watchdog.py b/self_hosting_machinery/watchdog/docker_watchdog.py index dc3fdd2c..4c899789 100644 --- a/self_hosting_machinery/watchdog/docker_watchdog.py +++ b/self_hosting_machinery/watchdog/docker_watchdog.py @@ -252,6 +252,12 @@ def maybe_send_usr1(self, sigkill_timeout=30): pass try: self.p.kill() + itself = psutil.Process(self.p.pid) + for child in itself.children(recursive=True): + try: + child.kill() + except psutil.NoSuchProcess: + pass except psutil.NoSuchProcess: pass From 1db39a5e1b2e90628ab6183e764ad9d180389e03 Mon Sep 17 00:00:00 2001 From: Oleg Klimov Date: Sat, 4 May 2024 06:51:15 +0200 Subject: [PATCH 6/9] remove HEADLESS --- .../watchdog/docker_watchdog.py | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/self_hosting_machinery/watchdog/docker_watchdog.py b/self_hosting_machinery/watchdog/docker_watchdog.py index 4c899789..155e6681 100644 --- a/self_hosting_machinery/watchdog/docker_watchdog.py +++ b/self_hosting_machinery/watchdog/docker_watchdog.py @@ -8,14 +8,11 @@ import time import uuid import psutil - from pathlib import Path - from typing import Dict, Optional, List from refact_utils.scripts import env -HEADLESS = ("--headless" in sys.argv) # for ui-less docker: skip all standard cfg-s (web ui, enum gpus, etc), skip first_run script FIRST_RUN_CMDLINE = [sys.executable, "-m", "self_hosting_machinery.scripts.first_run"] @@ -312,7 +309,7 @@ def __str__(self): f" status: {self.status_from_stderr}\n" tracked: Dict[str, TrackedJob] = {} -watchdog_templates = list(Path(env.DIR_WATCHDOG_TEMPLATES).iterdir()) if not HEADLESS else [] +watchdog_templates = list(Path(env.DIR_WATCHDOG_TEMPLATES).iterdir()) def create_tracked_jobs_from_configs(): @@ -452,21 +449,10 @@ def factory_reset(): def first_run(): - if HEADLESS: - return subprocess.check_call(FIRST_RUN_CMDLINE) def main_loop(): - if not HEADLESS: - # Generate a random SMALLCLOUD_API_KEY, it will be inherited by subprocesses, - # this allows inference_worker to authorize on the local web server (both use - # this variable), and work safely even if we expose http port to the world. - os.environ["SMALLCLOUD_API_KEY"] = str(uuid.uuid4()) - else: - assert "SMALLCLOUD_API_KEY" in os.environ - - first_run() while 1: main_loop_body() time.sleep(1) @@ -479,4 +465,9 @@ def main_loop(): if __name__ == '__main__': + # Generate a random SMALLCLOUD_API_KEY, it will be inherited by subprocesses, + # this allows inference_worker to authorize on the local web server (both use + # this variable), and work safely even if we expose http port to the world. + os.environ["SMALLCLOUD_API_KEY"] = str(uuid.uuid4()) + first_run() main_loop() From 5babd6dafb87c64f0b6aa034b86a8f130844a751 Mon Sep 17 00:00:00 2001 From: Oleg Klimov Date: Thu, 9 May 2024 08:53:37 +0200 Subject: [PATCH 7/9] ooops search is correct --- refact_webgui/webgui/webgui.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/refact_webgui/webgui/webgui.py b/refact_webgui/webgui/webgui.py index 69fe0601..8ba55ea1 100644 --- a/refact_webgui/webgui/webgui.py +++ b/refact_webgui/webgui/webgui.py @@ -149,9 +149,9 @@ def setup_logger(): class CustomHandler(logging.Handler): def emit(self, record): log_entry = self.format(record) - if boring1.match(log_entry): + if boring1.search(log_entry): return - if boring2.match(log_entry): + if boring2.search(log_entry): return sys.stderr.write(log_entry) sys.stderr.write("\n") From ddf52a4f897112c41de135ecaee05de21bf0917f Mon Sep 17 00:00:00 2001 From: Oleg Klimov Date: Thu, 9 May 2024 08:54:55 +0200 Subject: [PATCH 8/9] repeat batch 2 times --- self_hosting_machinery/inference/inference_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/self_hosting_machinery/inference/inference_worker.py b/self_hosting_machinery/inference/inference_worker.py index c9265ce9..ece4add7 100644 --- a/self_hosting_machinery/inference/inference_worker.py +++ b/self_hosting_machinery/inference/inference_worker.py @@ -73,7 +73,7 @@ def check_cancelled(*args, **kwargs): return set() log("STATUS test batch") - for _ in range(10): + for _ in range(2): inference_model.infer(dummy_call, DummyUploadProxy, {}) if compile: return From 14a635485158f10f8b38390c62657b82560addc3 Mon Sep 17 00:00:00 2001 From: Oleg Klimov Date: Mon, 20 May 2024 18:02:05 +0200 Subject: [PATCH 9/9] this works in cloud --- self_hosting_machinery/inference/stream_results.py | 4 ++-- self_hosting_machinery/scripts/first_run.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/self_hosting_machinery/inference/stream_results.py b/self_hosting_machinery/inference/stream_results.py index 2f9815c2..475b86c4 100644 --- a/self_hosting_machinery/inference/stream_results.py +++ b/self_hosting_machinery/inference/stream_results.py @@ -53,7 +53,7 @@ def url_complain_doesnt_work(): def model_guid_allowed_characters(name): - return re.sub(r"[^a-zA-Z0-9_]", "_", name) + return re.sub(r"[^a-zA-Z0-9_\.]", "_", name) def validate_description_dict( @@ -242,7 +242,7 @@ def upload_result( progress[original_batch[b]["id"]] = tmp upload_dict["progress"] = progress upload_dict["check_cancelled"] = [call["id"] for call in original_batch] - upload_dict["model_name"] = description_dict["model"] + upload_dict["model_name"] = description_dict["model"].replace("/vllm", "") self.upload_q.put(copy.deepcopy(upload_dict)) if DEBUG_UPLOAD_NOT_SEPARATE_PROCESS: _upload_results_loop(self.upload_q, self.cancelled_q) diff --git a/self_hosting_machinery/scripts/first_run.py b/self_hosting_machinery/scripts/first_run.py index f2716008..dfae7b85 100644 --- a/self_hosting_machinery/scripts/first_run.py +++ b/self_hosting_machinery/scripts/first_run.py @@ -25,3 +25,4 @@ def convert_old_configs(): convert_old_configs() model_assigner = ModelAssigner() assign_gpus_if_first_run_detected(model_assigner) + model_assigner.models_to_watchdog_configs() # removes deprecated models