-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
169 lines (145 loc) · 5.48 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""
This is the main file of the project.
It runs different scheme in different configurations, and evaluates their errors.
It was tested in Pycharm on Windows.
"""
import os
import sys
import numpy as np
from numpy import linalg as la
from matplotlib import pyplot as plt
import toml
import logging
from finite_differences import FiniteDifferences
from fit import fit_linear
from problems.second_order import (
SecondOrderForwardEuler,
SecondOrderModifiedForwardEuler,
SecondOrderBackwardEuler,
SecondOrderCrankNicholson,
SecondOrderLeapFrog,
SecondOrderDuFort,
)
from utils import RESULTS_PATH
NDIM = 2
logging.basicConfig(
stream=sys.stdout, level=logging.INFO, format="%(asctime)s: %(message)s"
)
def get_time_step(fill_distance, config):
power = config["k_h_power"]
return config["ratio"] * (fill_distance ** power)
def get_config(filename="config.toml"):
config = toml.load(os.path.join("config", filename))
return config
def evaluate_on_sites(sites, solution, t):
# print("sites: ", sites[-1])
evaluation = np.array(
[solution(sites[index], t) for index in np.ndindex(sites.shape)]
)
return np.ravel(evaluation)
def evaluate_error(numerical, analytical, fill_distance):
"""
Evaluating the error between numerical and analytical solutions.
:param numerical:
:param analytical:
:param fill_distance: Required for norm_h calculation
:return:
"""
# print("shape: ", numerical.shape)
if NDIM > 1:
numerical.resize(int(numerical.shape[0] / NDIM), NDIM)
analytical.resize(int(analytical.shape[0] / NDIM), NDIM)
differences = la.norm(numerical - analytical, axis=1)
error = np.sqrt(np.sum((np.abs(differences) ** 2) * fill_distance))
plt.figure()
plt.plot(numerical.real[:, 0], label="real(numerical[0])")
plt.plot(analytical.real[:, 0], label="real(analytical[0])")
plt.legend()
plt.title(f"re 0 n = {numerical.shape[0]}, error = {error}")
plt.show(block=False)
plt.figure()
plt.plot(numerical.real[:, 1], label="real(numerical[1])")
plt.plot(analytical.real[:, 1], label="real(analytical[1])")
plt.legend()
plt.title(f"re 1 n = {numerical.shape[0]}, error = {error}")
plt.show(block=False)
plt.figure()
plt.plot(numerical.imag[:, 0], label="imag(Numerical[0])")
plt.plot(analytical.imag[:, 0], label="imag(Analytical[0])")
plt.legend()
plt.title(f"Im 0 N = {numerical.shape[0]}, error = {error}")
plt.show(block=False)
plt.figure()
plt.plot(numerical.imag[:, 0], 'o--', label="Numerical")
plt.plot(analytical.imag[:, 0], label="Analytical")
plt.legend()
plt.show(block=False)
plt.figure()
plt.plot(numerical.imag[:, 1], label="imag(Numerical[1])")
plt.plot(analytical.imag[:, 1], label="imag(Analytical[1])")
plt.legend()
plt.title(f"Im 1 N = {numerical.shape[0]}, error = {error}")
plt.show(block=True)
logging.info(f"N={numerical.shape[0]}")
return error
def run_problem(problem_type: type, config):
errors = list()
for number_of_sites in config["number_of_sites"]:
# print("start")
logging.info("============New cycle=============")
sites = np.linspace(0, 1, number_of_sites, endpoint=False)
fill_distance = sites[1] - sites[0]
time_step = get_time_step(fill_distance, config)
logging.info(f"h = {fill_distance}, k = {time_step}")
problem = problem_type(sites, fill_distance, time_step, config)
general_initial_function = lambda x, t: problem.initial_function(x)
initial_evaluation = evaluate_on_sites(sites, general_initial_function, 0)
engine = FiniteDifferences(
fill_distance,
time_step,
problem.operator,
initial_evaluation,
)
engine.run(config["t_f"])
final_state = engine.state
# print("t_f", engine.time)
# solution = evaluate_on_sites(sites, problem.solution, 1.0051)
solution = evaluate_on_sites(sites, problem.solution, config["t_f"])
error = evaluate_error(final_state, solution, fill_distance)
logging.info(f"Error: {error}")
errors.append(error)
fit_linear(
np.log(np.array(config["number_of_sites"])) / np.log(10),
np.log(errors) / np.log(10),
f"{config['name']}_lambda_{config['ratio']}_power_{'k_h_power'}_{problem.special_config}",
)
def run():
"""
This is the main runner of the problems.
It configures which scheme to run, and in what configuration.
"""
# To run a single experiment, You can comment out the others.
problems = [
# (SecondOrderForwardEuler, "so_forward_euler_2.toml"),
# To run the modified FE, You have to configure sigma
# (SecondOrderModifiedForwardEuler, "so_forward_euler_mod_2.toml"),
# (SecondOrderBackwardEuler, "so_backward_euler_2.toml"),
# (SecondOrderLeapFrog, "so_leap_frog.toml"),
# (SecondOrderCrankNicholson, "so_crank_2.toml"),
(SecondOrderDuFort, "so_du_fort.toml"),
]
for problem, config_file in problems:
config = get_config(config_file)
logging.info(f"=================Running {config['name']}===============")
try:
run_problem(problem, config)
except Exception as e:
logging.info(f"Error in run {config['name']}, with exception {e}")
def main():
path = RESULTS_PATH
if not os.path.isdir(path):
os.mkdir(path)
run()
if __name__ == "__main__":
main()
input("Done?")