-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathautoddnsv6.py
192 lines (173 loc) · 7.32 KB
/
autoddnsv6.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import os
import requests
import logging
import sys
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("dns_update.log"),
logging.StreamHandler()
]
)
# 从环境变量获取 Cloudflare API 配置信息
API_KEY = os.getenv("CLOUDFLARE_API_KEY")
EMAIL = os.getenv("CLOUDFLARE_EMAIL")
ZONE_ID = os.getenv("CLOUDFLARE_ZONE_ID")
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
# 确保从环境变量中获取到了这些信息
if not all([API_KEY, EMAIL, ZONE_ID, TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID]):
logging.error("缺少必要的配置信息,请确保在 GitHub Secrets 中设置了 CLOUDFLARE_API_KEY, CLOUDFLARE_EMAIL, CLOUDFLARE_ZONE_ID, TELEGRAM_BOT_TOKEN 和 TELEGRAM_CHAT_ID。")
sys.exit(1)
# 发送消息到 Telegram
def send_to_telegram(message):
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TELEGRAM_CHAT_ID,
"text": message
}
try:
response = requests.post(url, json=payload)
if response.status_code != 200:
logging.error(f"发送消息到 Telegram 失败: {response.status_code}, {response.text}")
except requests.exceptions.RequestException as e:
logging.error(f"请求失败: {e}")
# 自定义日志处理器,将日志信息发送到 Telegram
class TelegramLogHandler(logging.Handler):
def emit(self, record):
log_entry = self.format(record)
send_to_telegram(log_entry)
# 添加 Telegram 日志处理器
telegram_handler = TelegramLogHandler()
telegram_handler.setLevel(logging.INFO)
telegram_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
logging.getLogger().addHandler(telegram_handler)
# 域名与标记映射关系
LOCATION_TO_DOMAIN = {
# 示例映射(可根据实际需求调整)
# 美国
"SJC": "usv6.616049.xyz", # 圣何塞
"LAX": "usv6.616049.xyz", # 洛杉矶
"SEA": "usv6.616049.xyz", # 西雅图
"JFK": "usv6.616049.xyz", # 纽约 - 肯尼迪国际机场
"ORD": "usv6.616049.xyz", # 芝加哥 - 奥黑尔国际机场
"US": "usv6.616049.xyz", # 美国
# 德国
"FRA": "dev6.616049.xyz", # 法兰克福机场
"DE": "dev6.616049.xyz", # 德国
# 英国
"LHR": "ukv6.616049.xyz", # 伦敦
"UK": "ukv6.616049.xyz", # 英国
# 日本
"NRT": "jpv6.616049.xyz", # 东京成田
"HND": "jpv6.616049.xyz", # 东京羽田
"JP": "jpv6.616049.xyz", # 日本
# 香港
"HKG": "hkv6.616049.xyz", # 香港国际机场
"HK": "hkv6.616049.xyz", # 香港
# 韩国
"ICN": "krv6.616049.xyz", # 仁川国际机场
"KR": "krv6.616049.xyz", # 韩国
# 台湾
"TPE": "twv6.616049.xyz", # 台北桃园机场
"KHH": "proxy.tw.616049.xyz", # 台湾高雄
"TW": "twv6.616049.xyz", # 台湾
# 新加坡
"SIN": "sgv6.616049.xyz", # 樟宜机场
"SG": "sgv6.616049.xyz", # 新加坡
# CF优选
"CFIPV6优选": "cfv6.616049.xyz" # CF优选
}
# 从 cfipv6.txt 文件中读取前十个 IPv6 和标记
def get_ips_from_file(file_path, limit=10):
ip_data = []
try:
with open(file_path, "r") as file:
for line in file:
if "#" in line:
ip, location = line.strip().split("#")
ip_data.append((ip.strip(), location.strip()))
if len(ip_data) >= limit:
break
return ip_data
except FileNotFoundError:
logging.error(f"文件未找到: {file_path}")
return []
# 删除相同前缀的所有 DNS 记录(完全匹配)
def delete_dns_records_with_prefix(prefix):
try:
url = f"https://api.cloudflare.com/client/v4/zones/{ZONE_ID}/dns_records"
headers = {
"X-Auth-Email": EMAIL,
"X-Auth-Key": API_KEY,
"Content-Type": "application/json"
}
response = requests.get(url, headers=headers)
response.raise_for_status()
records = response.json().get("result", [])
logging.info(f"找到 {len(records)} 条 DNS 记录,开始删除与 {prefix} 完全匹配的记录...")
for record in records:
# 提取记录名称的前缀(例如 "proxy.us.616049.xyz" 的前缀是 "proxy")
record_prefix = record["name"].split(".")[0]
# 仅删除与给定前缀完全匹配的记录
if record_prefix == prefix:
# 打印即将删除的 DNS 记录的详细信息
logging.info(f"即将删除记录: 名称={record['name']}, 类型={record['type']}, 内容={record['content']}, TTL={record['ttl']}, 代理={record['proxied']}")
record_id = record["id"]
delete_url = f"{url}/{record_id}"
delete_response = requests.delete(delete_url, headers=headers)
if delete_response.status_code == 200:
logging.info(f"已删除记录: {record['name']} -> {record['content']}")
else:
logging.error(f"删除失败: {record['name']} -> {record['content']}, 错误信息: {delete_response.status_code}, {delete_response.text}")
except requests.exceptions.RequestException as e:
logging.error(f"请求失败: {e}")
sys.exit(1)
# 批量添加 DNS 记录
def add_dns_records_bulk(ip_data):
url = f"https://api.cloudflare.com/client/v4/zones/{ZONE_ID}/dns_records"
headers = {
"X-Auth-Email": EMAIL,
"X-Auth-Key": API_KEY,
"Content-Type": "application/json"
}
# 记录已经删除过哪些前缀
deleted_prefixes = set()
for ip, location in ip_data:
domain = LOCATION_TO_DOMAIN.get(location)
if domain:
# 提取前缀(例如 "usv6.616049.xyz" 的前缀是 "usv6")
prefix = domain.split(".")[0]
# 如果该前缀没有被删除过,则删除该前缀的所有 DNS 记录
if prefix not in deleted_prefixes:
delete_dns_records_with_prefix(prefix)
deleted_prefixes.add(prefix) # 标记该前缀已删除
data = {
"type": "AAAA", # IPv6 记录类型
"name": domain,
"content": ip,
"ttl": 1,
"proxied": False
}
try:
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
logging.info(f"添加成功: {domain} -> {ip}")
elif response.status_code == 409:
logging.info(f"记录已存在: {domain} -> {ip}")
else:
logging.error(f"添加失败: {domain} -> {ip}, 错误信息: {response.status_code}, {response.text}")
except requests.exceptions.RequestException as e:
logging.error(f"请求失败: {e}")
else:
logging.warning(f"未找到标记 {location} 对应的域名映射,跳过。")
# 主程序
if __name__ == "__main__":
# 添加新的 DNS 记录
ip_data = get_ips_from_file("cfip/ipv6.txt")
if not ip_data:
logging.error("未读取到 IP 数据,请检查 cfipv6.txt 文件格式是否正确。")
else:
add_dns_records_bulk(ip_data)