forked from Study-Together-Org/time_counter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtests_bots.py
156 lines (131 loc) · 5.52 KB
/
tests_bots.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import os
from datetime import timedelta
from distest import TestCollector
from distest import run_command_line_bot
from discord import Embed
from sqlalchemy.orm import sessionmaker
from models import Action, User
import utilities
test_collector = TestCollector()
bot = None
guild = None
bot_id = None
time_to_stay = 3600 / (10 ** int(os.getenv("test_display_num_decimal")))
db_tolerance = timedelta(seconds=.2)
redis_tolerance = 3.6 * 1.5
discord_delay = 2 # discord api, when slow, could take 5 seconds to send messages
past_time = "2pm" # specify some timepoint for commands that support it
me_command = f"{os.getenv('prefix')}me {past_time}"
timepoint = None
redis_client = utilities.get_redis_client()
engine = utilities.get_engine()
Session = sessionmaker(bind=engine)
sqlalchemy_session = Session()
@test_collector()
async def test_init(interface):
# A workaround for shared info
# TODO test - Get rid of this function by calling these for each test, so we can specify running just one test if we want
global bot, guild, bot_id, timepoint
bot = interface.client
guild = bot.guilds[0]
bot_id = bot.user.id
timepoint, display_timezone, display_timepoint = await utilities.get_user_timeinfo(guild.system_channel, bot.user, past_time)
@test_collector()
async def test_start_end_channel_incr(interface):
await guild.system_channel.send(me_command)
prev_stats = await utilities.get_user_stats(redis_client, bot_id, timepoint=timepoint)
voice_channel = [channel for channel in guild.voice_channels if "screen/cam" in channel.name][1]
voice_client = await voice_channel.connect()
start_channel_time = utilities.get_time()
utilities.sleep(time_to_stay)
await voice_client.disconnect()
end_channel_time = utilities.get_time()
await guild.system_channel.send(me_command)
utilities.sleep(discord_delay)
cur_stats = await utilities.get_user_stats(redis_client, bot_id, timepoint=timepoint)
# TODO test - use fields to check description?
# reply = await guild.system_channel.history(limit=1).flatten()[0].embeds[0].description
assert (utilities.check_stats_diff(prev_stats, cur_stats, time_to_stay, 1, redis_tolerance))
# Check SQL
records = sqlalchemy_session.query(Action) \
.filter(Action.user_id == bot_id) \
.filter(Action.category.in_(["end channel", "start channel"])) \
.order_by(Action.creation_time.desc()).limit(2).all()
records.reverse()
assert (records[0].category == "start channel")
assert (records[0].detail == records[1].detail == voice_channel.id)
assert (records[0].creation_time - start_channel_time <= db_tolerance)
assert (records[1].creation_time - end_channel_time <= db_tolerance)
@test_collector()
async def test_p(interface):
embed = Embed(title=utilities.config["embed_titles"]["p"])
await interface.assert_reply_embed_equals(os.getenv("prefix") + "p", embed, attributes_to_check=["title"])
@test_collector()
async def test_lb(interface):
embed = Embed(title=f'{utilities.config["embed_titles"]["lb"]} ({utilities.get_month()})')
await interface.assert_reply_embed_equals(os.getenv("prefix") + "lb", embed, attributes_to_check=["title"])
@test_collector()
async def test_lb_with_page(interface):
embed = Embed(title=f'{utilities.config["embed_titles"]["lb"]} ({utilities.get_month()})')
await interface.assert_reply_embed_equals(os.getenv("prefix") + "lb - 4000000", embed, attributes_to_check=["title"])
@test_collector()
async def test_me(interface):
embed = Embed(title=utilities.config["embed_titles"]["me"])
await interface.assert_reply_embed_equals(os.getenv("prefix") + "me", embed, attributes_to_check=["title"])
utilities.sleep(discord_delay)
@test_collector()
async def test_in_session(interface):
# TODO test - find out why this test can't be before test_p
await guild.system_channel.send(me_command)
prev_stats = await utilities.get_user_stats(redis_client, bot_id, timepoint=timepoint)
voice_channel = [channel for channel in guild.voice_channels if "screen/cam" in channel.name][1]
voice_client = await voice_channel.connect()
multiplier = 2
# multiplier = 175
utilities.sleep(time_to_stay * multiplier)
await guild.system_channel.send(me_command)
utilities.sleep(discord_delay)
mid_stats = await utilities.get_user_stats(redis_client, bot_id, timepoint=timepoint)
assert (utilities.check_stats_diff(prev_stats, mid_stats, time_to_stay, multiplier, redis_tolerance))
multiplier = 5
# multiplier = 2147
utilities.sleep(time_to_stay * multiplier)
await voice_client.disconnect()
await guild.system_channel.send(me_command)
utilities.sleep(discord_delay)
cur_stats = await utilities.get_user_stats(redis_client, bot_id, timepoint=timepoint)
assert (utilities.check_stats_diff(mid_stats, cur_stats, time_to_stay, multiplier, redis_tolerance))
# TODO test - using large numbers as inputs to see if CPU hangs; it has caused discord.py not to log in the past
# TODO test - Write case for new member + each role...
# start id_1
# start id_1
#
# #done
# start id_1
# start id_2
#
# end id_1
# end id_1
#
# end id_1
# end id_2
#
# #done
# start id_1
# end id_1
#
# #done
# start id_1
# end id_2
#
# #done
# end id_1
# start id_1
#
# #done
# end id_1
# start id_2
if __name__ == "__main__":
run_command_line_bot(target=int(os.getenv("test_bot_id")), token=os.getenv("test_bot_token"),
channel_id=int(os.getenv("test_channel_id")), tests="all",
stats=True, timeout=5, collector=test_collector)