-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmainloop.py
173 lines (133 loc) · 5.24 KB
/
mainloop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import asyncio
import os
import signal
import sys
from enum import Enum
from typing import Callable, Optional, Coroutine, Any
from quart import Quart
from database.accesscontrol import UserPrivilegeManager
from database.accounts import AccountManager
from database.core import Database
from telegram.auth.base import StaticSecrets
from telegram.client import TelegramClient
from telegram.manager import TelegramClientManager
from telegram.util import Environment
from telegram.webapp.webapp import WebApp
# dotenv wouldn't install...
def read_env_file(file_path):
env_dict = {}
with open(file_path, 'r') as file:
for line in file:
line = line.strip()
if not line or line.startswith("#"):
continue
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
env_dict[key] = value
return env_dict
class MainLoop:
def __init__(self, environment: Environment):
self.environment = environment
self.config = dict(os.environ)
# noinspection PyBroadException
try:
extra = read_env_file(".env")
self.config.update(extra)
except Exception as ex:
print("Failed to read .env file: " + str(ex))
try:
if(len(self.config.get('TWO_FACTOR_ENCRYPTION_KEY', '')) != 32):
raise Exception("TWO_FACTOR_ENCRYPTION_KEY must be exactly 32 characters.")
except KeyError:
raise Exception("Must set TWO_FACTOR_ENCRYPTION_KEY within environment file or environment variables.")
# TODO: these do not give useful error messages if they're missing
# TODO: this stuff is gross, use some kind DI or similar
self.db = Database(self.config['DB_DSN'])
self.accounts = AccountManager(self.db, self.config['TWO_FACTOR_ENCRYPTION_KEY'])
self.API_ID = self.config['API_ID']
self.API_HASH = self.config['API_HASH']
self.privmanager = UserPrivilegeManager(self.db, int(self.config.get("PRIVILEGE_REFRESH_TIME_IN_MINUTES", "300")))
self.manager = TelegramClientManager()
self._app: Optional[Quart] = None
self._webapp = None
self._shutting_down = False
self._quart_task = None
self._did_init = False
async def init(self):
if(self._did_init):
raise Exception("Cannot initialize twice.")
self._did_init = True
await self.accounts.init()
await self.privmanager.init()
def _check_init(self):
if(not self._did_init):
raise Exception("Not initialized...")
async def addClient(self, client: TelegramClient):
self._check_init()
await self.manager.add_client(client)
async def run(self, clientGenerator: Callable[[str], TelegramClient]):
self._check_init()
self._webapp = WebApp(self.config, self.manager, self.accounts, clientGenerator, self.environment, self.privmanager)
self._app = self._webapp.app
host = self.config.get("HOST", "localhost").lower()
port = int(self.config.get("PORT", "8888"))
self._quart_task = self._app.run_task(host, port)
# we can't pass this to gather or it will eat control+c events for some reason...
# it's okay because the manager runs forever, and we shut the manager down AFTER we shut the webapp down
#
# we awake .start() because we need the resulting task returned from it, not the async coroutine from the
# async method...
await asyncio.gather(await self.manager.start(), self._quart_task)
async def shutdown(self):
self._check_init()
if(self._shutting_down):
return
self._shutting_down = True
print("Shutting down...")
if(self._app is not None):
await self._app.shutdown()
print("Webapp stopped")
if(self.manager.is_started()):
async for client in self.manager.stop_and_yield():
print(f"Stopped client: {client.auth.phone}")
await self.db.close_all()
# apparently quart is just never going to shutdown properly
# this library sucks
sys.exit(0)
def mainLoop(self, runOnStart: Callable[[], Coroutine[Any, Any, None]], clientGenerator: Callable[[str, Optional[str], Optional[StaticSecrets]], TelegramClient]):
# make aiopg work on windows
if sys.version_info >= (3, 8) and sys.platform.lower().startswith("win"):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
shut_us_down = None
def graceful_shutdown(*args):
nonlocal shut_us_down
if(self._shutting_down):
return
shut_us_down = asyncio.create_task(self.shutdown())
# we do this because the Quart app sets its own close signal handlers that block ours (???)
# so we take them back after the app's initialization is done
# there is no race condition here as the app initialization happens instantly
# we just need to make sure this happens after self.run(...) does
async def hijack_close_signal():
await asyncio.sleep(0.1)
try:
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, graceful_shutdown)
# doesn't work on windows
# can't use signal.signal because the handlers will be permanently blocked by the asyncio event loop
# and we are allergic to threads
except NotImplementedError:
pass
async def startup():
await self.init()
await runOnStart()
await asyncio.gather(self.run(clientGenerator), hijack_close_signal())
loop.run_until_complete(startup())
if(shut_us_down is not None):
loop.run_until_complete(shut_us_down)
loop.close()