-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathNotifAnalyzer.py
140 lines (130 loc) · 4.94 KB
/
NotifAnalyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import argparse
import sys
import time
import sqlite3
import json
# It's only used for formatting, so we don't want to crash due to it.
try:
from lxml import etree
except ImportError:
pass
import re
PRAGMA_USER_VERSION = 'PRAGMA user_version'
USER_VERSION = 'user_version'
QUERY_ASSETS = 'SELECT RecordId, PrimaryId, nh.ParentId, AssetKey, AssetValue, WNSId, HandlerType, WNFEventName, \
SystemDataPropertySet, nh.CreatedTime, nh.ModifiedTime, n.Payload, n.Type, n.ArrivalTime, \
n.PayloadType, n.ExpiryTime, n.Id \
FROM NotificationHandler nh \
LEFT JOIN HandlerAssets ha ON ha.HandlerId = nh.RecordId \
LEFT JOIN Notification n ON nh.RecordId = n.HandlerId \
ORDER BY RecordId'
ASSETS = "assets"
def main(args):
start_time = time.time()
path = args.path
jpath = args.json
if not path:
print("Path is required.")
exit()
if not jpath:
print("JSON result path is required.")
exit()
data = process_db(path)
print(str(data))
with open(jpath, 'w') as fp:
json.dump(data, fp, indent=4)
total_time = round(time.time() - start_time, 2)
print('Elapsed time: ' + str(total_time) + 's')
try:
from win10toast import ToastNotifier
ToastNotifier().show_toast('NotifAnalyzer', f'Finished processing notifications! Took {str(total_time)}s', duration = None)
except Exception as e:
pass
def process_db(file):
db_info = {}
try:
db_conn = sqlite3.connect(file)
db_conn.row_factory = sqlite3.Row
c = db_conn.cursor()
c.execute(PRAGMA_USER_VERSION)
db_info[USER_VERSION] = c.fetchone()[0]
c.execute(QUERY_ASSETS)
asset_data = [dict(row) for row in c.fetchall()]
db_info[ASSETS] = process_assets(asset_data)
except Exception as e:
db_info = None
print(str(e))
finally:
c.close()
db_conn.close()
return db_info
def process_assets(assets):
processed_assets = {}
for asset in assets:
id = asset["RecordId"]
if id in processed_assets:
# Not new asset
process_asset_key(asset, processed_assets[id])
process_notification(asset, dict_asset)
else:
# New asset
dict_asset = {}
dict_asset["HandlerId"] = id
dict_asset["HandlerPrimaryId"] = asset["PrimaryId"]
dict_asset["ParentId"] = asset["ParentId"]
dict_asset["WNSId"] = asset["WNSId"]
dict_asset["HandlerType"] = asset["HandlerType"]
dict_asset["WNFEventName"] = asset["WNFEventName"]
dict_asset["SystemDataPropertySet"] = asset["SystemDataPropertySet"]
dict_asset["CreatedTime"] = asset["CreatedTime"]
dict_asset["ModifiedTime"] = asset["ModifiedTime"]
dict_asset["OtherAssets"] = []
dict_asset["Notifications"] = []
process_asset_key(asset, dict_asset)
process_notification(asset, dict_asset)
processed_assets[id] = dict_asset
return processed_assets
def process_asset_key(asset, dict_asset):
if "AssetKey" not in asset:
return
asset_key = asset["AssetKey"]
if asset_key == "DisplayName":
dict_asset["AppName"] = asset["AssetValue"]
elif asset_key:
asset_pair = {asset_key: asset["AssetValue"]}
if asset_pair not in dict_asset["OtherAssets"]:
dict_asset["OtherAssets"].append(asset_pair)
def process_notification(asset, dict_asset):
if "Payload" not in asset:
return
payload = asset["Payload"]
if payload:
try:
root = etree.fromstring(payload)
payload = etree.tostring(root, pretty_print=True).decode()
except Exception as e:
print("Failed to format XML due to " + str(e) + ". Falling back newline after '>'")
try:
payload = re.sub(r'(>)', r'\1\n', payload.decode())
except Exception as e:
print("Failed to format XML due to " + str(e) + ". Falling back to Payload as string")
payload = str(payload)
notif = {
"Id": asset["Id"],
"Payload": payload,
"Type": asset["Type"],
"ExpiryTime": asset["ExpiryTime"],
"ArrivalTime": asset["ArrivalTime"],
"PayloadType": asset["PayloadType"]
}
# Verify that there are no duplicate notifications
if not any(x["Id"] == asset["Id"] for x in dict_asset["Notifications"]):
dict_asset["Notifications"].append(notif)
def setup_args():
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--path', type=str, help='Path to Notifications DB (wpndatabase.db)')
parser.add_argument('-j', '--json', type=str, help='Path to result file in JSON')
return parser.parse_args()
if __name__ == "__main__":
args = setup_args()
main(args)