forked from sultansq/kiu
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgit.py
75 lines (67 loc) · 2.33 KB
/
git.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import asyncio
import shlex
from typing import Tuple
from git import Repo
from git.exc import GitCommandError, InvalidGitRepositoryError
import config
from ..logging import LOGGER
def install_req(cmd: str) -> Tuple[str, str, int, int]:
async def install_requirements():
args = shlex.split(cmd)
process = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
return (
stdout.decode("utf-8", "replace").strip(),
stderr.decode("utf-8", "replace").strip(),
process.returncode,
process.pid,
)
return asyncio.get_event_loop().run_until_complete(
install_requirements()
)
def git():
REPO_LINK = config.UPSTREAM_REPO
if config.GIT_TOKEN:
GIT_USERNAME = REPO_LINK.split("com/")[1].split("/")[0]
TEMP_REPO = REPO_LINK.split("https://")[1]
UPSTREAM_REPO = (
f"https://{GIT_USERNAME}:{config.GIT_TOKEN}@{TEMP_REPO}"
)
else:
UPSTREAM_REPO = config.UPSTREAM_REPO
try:
repo = Repo()
LOGGER(__name__).info(f"Git Client Found [VPS DEPLOYER]")
except GitCommandError:
LOGGER(__name__).info(f"Invalid Git Command")
except InvalidGitRepositoryError:
repo = Repo.init()
if "origin" in repo.remotes:
origin = repo.remote("origin")
else:
origin = repo.create_remote("origin", UPSTREAM_REPO)
origin.fetch()
repo.create_head(
config.UPSTREAM_BRANCH,
origin.refs[config.UPSTREAM_BRANCH],
)
repo.heads[config.UPSTREAM_BRANCH].set_tracking_branch(
origin.refs[config.UPSTREAM_BRANCH]
)
repo.heads[config.UPSTREAM_BRANCH].checkout(True)
try:
repo.create_remote("origin", config.UPSTREAM_REPO)
except BaseException:
pass
nrs = repo.remote("origin")
nrs.fetch(config.UPSTREAM_BRANCH)
try:
nrs.pull(config.UPSTREAM_BRANCH)
except GitCommandError:
repo.git.reset("--hard", "FETCH_HEAD")
install_req("pip3 install --no-cache-dir -r requirements.txt")
LOGGER(__name__).info(f"Fetching updates from AnonXMusic...")