Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup ndsl/dsl/dace/utils.py #96

Merged
merged 3 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ndsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .dsl.dace.utils import (
ArrayReport,
DaCeProgress,
MaxBandwithBenchmarkProgram,
MaxBandwidthBenchmarkProgram,
StorageReport,
)
from .dsl.dace.wrapped_halo_exchange import WrappedHaloUpdater
Expand Down
86 changes: 43 additions & 43 deletions ndsl/dsl/dace/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,22 @@
from ndsl.optional_imports import cupy as cp


# ----------------------------------------------------------
# Rough timer & log for major operations of DaCe build stack
# ----------------------------------------------------------
class DaCeProgress:
"""Timer and log to track build progress"""
"""Rough timer & log for major operations of DaCe build stack."""

def __init__(self, config: DaceConfig, label: str):
def __init__(self, config: DaceConfig, label: str) -> None:
self.prefix = DaCeProgress.default_prefix(config)
self.prefix = f"[{config.get_orchestrate()}]"
self.label = label

@classmethod
def default_prefix(cls, config: DaceConfig) -> str:
return f"[{config.get_orchestrate()}]"

def __enter__(self):
def __enter__(self) -> None:
ndsl_log.debug(f"{self.prefix} {self.label}...")
self.start = time.time()

def __exit__(self, _type, _val, _traceback):
def __exit__(self, _type, _val, _traceback) -> None:
elapsed = time.time() - self.start
ndsl_log.debug(f"{self.prefix} {self.label}...{elapsed}s.")

Expand Down Expand Up @@ -81,7 +77,7 @@ def memory_static_analysis(
"""Analysis an SDFG for memory pressure.

The results split memory by type (dace.StorageType) and account for
allocated, unreferenced and top lovel (e.g. top-most SDFG) memory
allocated, unreferenced and top level (e.g. top-most SDFG) memory
"""
# We report all allocation type
allocations: Dict[dace.StorageType, StorageReport] = {}
Expand All @@ -92,7 +88,7 @@ def memory_static_analysis(
array_size_in_bytes = arr.total_size * arr.dtype.bytes
ref = _is_ref(sd, aname)

# Transient in maps (refrence and not referenced)
# Transient in maps (reference and not referenced)
if sd is not sdfg and arr.transient:
if arr.pool:
allocations[arr.storage].in_pooled_in_bytes += array_size_in_bytes
Expand All @@ -111,7 +107,7 @@ def memory_static_analysis(
else:
allocations[arr.storage].unreferenced_in_bytes += array_size_in_bytes

# SDFG-level memory (refrence, not referenced and pooled)
# SDFG-level memory (reference, not referenced and pooled)
elif sd is sdfg:
if arr.pool:
allocations[arr.storage].in_pooled_in_bytes += array_size_in_bytes
Expand All @@ -137,22 +133,22 @@ def memory_static_analysis(
def report_memory_static_analysis(
sdfg: dace.sdfg.SDFG,
allocations: Dict[dace.StorageType, StorageReport],
detail_report=False,
detail_report: bool = False,
) -> str:
"""Create a human readable report form the memory analysis results"""
report = f"{sdfg.name}:\n"
for storage, allocs in allocations.items():
alloc_in_mb = float(allocs.referenced_in_bytes / (1024 * 1024))
unref_alloc_in_mb = float(allocs.unreferenced_in_bytes / (1024 * 1024))
in_pooled_in_mb = float(allocs.in_pooled_in_bytes / (1024 * 1024))
toplvlalloc_in_mb = float(allocs.top_level_in_bytes / (1024 * 1024))
if alloc_in_mb or toplvlalloc_in_mb > 0:
top_level_alloc_in_mb = float(allocs.top_level_in_bytes / (1024 * 1024))
if alloc_in_mb or top_level_alloc_in_mb > 0:
report += (
f"{storage}:\n"
f" Alloc ref {alloc_in_mb:.2f} mb\n"
f" Alloc unref {unref_alloc_in_mb:.2f} mb\n"
f" Pooled {in_pooled_in_mb:.2f} mb\n"
f" Top lvl alloc: {toplvlalloc_in_mb:.2f}mb\n"
f" Top lvl alloc: {top_level_alloc_in_mb:.2f}mb\n"
)
if detail_report:
report += "\n"
Expand All @@ -172,7 +168,9 @@ def report_memory_static_analysis(
return report


def memory_static_analysis_from_path(sdfg_path: str, detail_report=False) -> str:
def memory_static_analysis_from_path(
sdfg_path: str, detail_report: bool = False
) -> str:
"""Open a SDFG and report the memory analysis"""
sdfg = dace.SDFG.from_file(sdfg_path)
return report_memory_static_analysis(
Expand All @@ -183,53 +181,55 @@ def memory_static_analysis_from_path(sdfg_path: str, detail_report=False) -> str


# ----------------------------------------------------------
# Theoritical bandwith from SDFG
# Theoretical bandwidth from SDFG
# ----------------------------------------------------------
def copy_defn(q_in: FloatField, q_out: FloatField):
def copy_kernel(q_in: FloatField, q_out: FloatField) -> None:
with computation(PARALLEL), interval(...):
q_in = q_out


class MaxBandwithBenchmarkProgram:
class MaxBandwidthBenchmarkProgram:
def __init__(self, size, backend) -> None:
from ndsl.dsl.dace.orchestration import DaCeOrchestration, orchestrate

dconfig = DaceConfig(None, backend, orchestration=DaCeOrchestration.BuildAndRun)
dace_config = DaceConfig(
None, backend, orchestration=DaCeOrchestration.BuildAndRun
)
c = CompilationConfig(backend=backend)
s = StencilConfig(dace_config=dconfig, compilation_config=c)
s = StencilConfig(dace_config=dace_config, compilation_config=c)
self.copy_stencil = FrozenStencil(
func=copy_defn,
func=copy_kernel,
origin=(0, 0, 0),
domain=size,
stencil_config=s,
)
orchestrate(obj=self, config=dconfig)
orchestrate(obj=self, config=dace_config)

def __call__(self, A, B, n: int):
def __call__(self, A, B, n: int) -> None:
for i in dace.nounroll(range(n)):
self.copy_stencil(A, B)


def kernel_theoretical_timing(
sdfg: dace.sdfg.SDFG,
hardware_bw_in_GB_s=None,
backend=None,
hardware_bw_in_GB_s: Optional[float] = None,
backend: Optional[str] = None,
) -> Dict[str, float]:
"""Compute a lower timing bound for kernels with the following hypothesis:

- Performance is memory bound, e.g. arithmetic intensity isn't counted
- Hardware bandwidth comes from a GT4Py/DaCe test rather than a spec sheet for
for higher accuracy. Best is to run a copy_stencils on a full domain
for higher accuracy. Best is to run a copy_stencil on a full domain
- Memory pressure is mostly in read/write from global memory, inner scalar & shared
memory is not counted towards memory movement.
"""
if not hardware_bw_in_GB_s:
if hardware_bw_in_GB_s is None:
size = np.array(sdfg.arrays["__g_self__w"].shape)
print(
f"Calculating experimental hardware bandwith on {size}"
f"Calculating experimental hardware bandwidth on {size}"
f" arrays at {Float} precision..."
)
bench = MaxBandwithBenchmarkProgram(size, backend)
bench = MaxBandwidthBenchmarkProgram(size, backend)
if backend == "dace:gpu":
A = cp.ones(size, dtype=Float)
B = cp.ones(size, dtype=Float)
Expand All @@ -248,13 +248,19 @@ def kernel_theoretical_timing(
bench(A, B, n)
dt.append((time.time() - s) / n)
memory_size_in_b = np.prod(size) * np.dtype(Float).itemsize * 8
bandwidth_in_bytes_s = memory_size_in_b / np.median(dt)
print(
f"Hardware bandwith computed: {bandwidth_in_bytes_s/(1024*1024*1024)} GB/s"
)
else:
bandwidth_in_bytes_s = hardware_bw_in_GB_s * 1024 * 1024 * 1024
print(f"Given hardware bandwith: {bandwidth_in_bytes_s/(1024*1024*1024)} GB/s")
measured_bandwidth_in_bytes_s = memory_size_in_b / np.median(dt)

bandwidth_in_bytes_s = (
measured_bandwidth_in_bytes_s
if hardware_bw_in_GB_s is None
else hardware_bw_in_GB_s * 1024 * 1024 * 1024
)
label = (
"Hardware bandwidth computed"
if hardware_bw_in_GB_s
else "Given hardware bandwidth"
)
print(f"{label}: {bandwidth_in_bytes_s/(1024*1024*1024)} GB/s")

allmaps = [
(me, state)
Expand Down Expand Up @@ -307,12 +313,6 @@ def kernel_theoretical_timing(
except TypeError:
pass

# Bad expansion
if not isinstance(newresult_in_us, sympy.core.numbers.Float) and not isinstance(
newresult_in_us, float
):
continue

romanc marked this conversation as resolved.
Show resolved Hide resolved
result[node.label] = float(newresult_in_us)

return result
Expand Down
Loading