-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDict_DataFrame_Sqlite.py
141 lines (128 loc) · 4.13 KB
/
Dict_DataFrame_Sqlite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import os
import sqlite3
import pandas as pd
def create_path(file_path):
if not os.path.exists(file_path):
os.makedirs(file_path)
os.chdir(file_path)
return file_path
# by pandas.to_sql
def dict_to_DataFrame(data_dict: dict):
"""
change dict form data to dataframe form and return
:param data_dict:
:return:
"""
new_dict = dict()
# change to the data from to standard dict {'key': pd.Series([list],index=[index])}
for key in data_dict:
num = len(data_dict[key])
new_dict[key] = pd.Series(list(data_dict[key]), index=list(range(num)))
return pd.DataFrame(new_dict)
def dict_to_SQLTable(data_dict: dict, table_name: str, db_path: os.getcwd(), db_name: str = 'data.db'):
"""
create a table containing dict_data in database=[db_path/db_name]
:param data_dict: dict form data
:param db_path: path to sqlite3 database
:param db_name: database name default='data.db
:param table_name: sql table name for dave
:return: true if success,else false
"""
status = True
if not os.path.exists(db_path):
db_path = os.getcwd()
sql_db = os.path.join(db_path, db_name)
# change data to dict form
pd_data = dict_to_DataFrame(data_dict)
# connect to database
try:
cxn = sqlite3.connect(sql_db)
# cursor = cxn.cursor()
except Exception as e:
print(e)
status = False
else:
pd_data.to_sql(name=table_name, con=cxn, if_exists='replace', index_label='id')
cxn.commit()
cxn.close()
return status
def get_sql_tables(db_path, db_name):
"""
get all tables from sqlite database
:param db_path:
:param db_name:
:return:
"""
table_list = []
if not os.path.exists(db_path):
db_path = os.getcwd()
sql_db = os.path.join(db_path, db_name)
sql_list_tables = "SELECT name FROM sqlite_master WHERE type= 'table' order by name"
try:
cxn = sqlite3.connect(sql_db)
cursor = cxn.cursor()
except Exception as e:
print(e)
else:
cursor.execute(sql_list_tables)
resp = cursor.fetchall()
if not resp:
print(f'cannot find any tables in:{sql_db} ')
else:
print(f'find record: {resp}')
table_list = [item[0] for item in resp]
cxn.commit()
cxn.close()
return table_list
def SQLTable_to_DataFrame(table_name, db_path, db_name):
f"""
extract one table{table_name} from database into pandas dataframe
:param table_name:
:param db_path:
:param db_name:
:return: pd data_frame data
"""
pd_data = pd.DataFrame()
if not os.path.exists(db_path):
db_path = os.getcwd()
sql_db = os.path.join(db_path, db_name)
try:
cxn = sqlite3.connect(sql_db)
# cursor = cxn.cursor()
except Exception as e:
print(e)
else:
sql_select_table = f"SELECT * From {table_name}"
pd_data = pd.read_sql(sql=sql_select_table, con=cxn)
if pd_data.empty:
print(f'cannot find any tables name{table_name} in:{sql_db} ')
else:
print(f'find record,will return')
cxn.commit()
cxn.close()
return pd_data
def save_pd_data(pd_data: pd.DataFrame, path, filename: str):
"""
save pandas DataForm data to excel/csv/json file by path/filename
:param pd_data: pandas dataFrame
:param path:
:param filename:
:return:
"""
save_path = path
if not os.path.isdir(path):
save_path = os.getcwd()
excel_file_path = os.path.join(save_path, filename + '.xlsx')
# excel writer
excel_writer = pd.ExcelWriter(excel_file_path)
pd_data.to_excel(excel_writer)
excel_writer.save()
print(f'save to excel xlsx file {excel_file_path} successfully')
# csv file
csv_file_path = os.path.join(save_path, filename + '.csv')
pd_data.to_csv(csv_file_path)
print(f'save to csv file {csv_file_path} successfully')
# json file
json_file_path = os.path.join(save_path, filename + '.json')
pd_data.to_json(json_file_path)
print(f'save to json file {json_file_path} successfully')