mirror of
https://github.com/NVIDIA/cutlass.git
synced 2026-05-12 17:25:45 +00:00
CUTLASS 3.2.1 (#1113)
* Updates for 3.2.1 release. * Minor fix in gemm op profiler for raster order. * Add scheduler mapping for raster order in the kernels.
This commit is contained in:
284
test/python/cutlass/interface/conv2d_interface.py
Normal file
284
test/python/cutlass/interface/conv2d_interface.py
Normal file
@@ -0,0 +1,284 @@
|
||||
#################################################################################################
|
||||
#
|
||||
# Copyright (c) 2023 - 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions are met:
|
||||
#
|
||||
# 1. Redistributions of source code must retain the above copyright notice, this
|
||||
# list of conditions and the following disclaimer.
|
||||
#
|
||||
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
# this list of conditions and the following disclaimer in the documentation
|
||||
# and/or other materials provided with the distribution.
|
||||
#
|
||||
# 3. Neither the name of the copyright holder nor the names of its
|
||||
# contributors may be used to endorse or promote products derived from
|
||||
# this software without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
#
|
||||
#################################################################################################
|
||||
|
||||
"""
|
||||
Tests the high-level Conv2d interface
|
||||
"""
|
||||
|
||||
from math import ceil
|
||||
import unittest
|
||||
|
||||
import cutlass
|
||||
import cutlass.utils.datatypes as datatypes
|
||||
from cutlass.backend.utils.device import device_cc
|
||||
from utils import ExpectException
|
||||
import os
|
||||
|
||||
|
||||
class Conv2dEquivalence:
|
||||
"""
|
||||
Helper class for testing the equivalence of different constructions of the Conv2d interface
|
||||
"""
|
||||
def __init__(self, conv_kind, element_A, element_B, element_C, element_D, element_accumulator,
|
||||
alignment_A, alignment_B, alignment_C):
|
||||
|
||||
self.element_A = element_A
|
||||
self.element_B = element_B
|
||||
self.element_C = element_C
|
||||
self.element_D = element_D
|
||||
self.element_accumulator = element_accumulator
|
||||
self.alignment_A = alignment_A
|
||||
self.alignment_B = alignment_B
|
||||
self.alignment_C = alignment_C
|
||||
|
||||
self.conv_kind = conv_kind
|
||||
|
||||
self.plan = cutlass.op.Conv2d(
|
||||
kind=self.conv_kind, element_A=element_A, element_B=element_B, element_C=element_C,
|
||||
element_D=element_D, element_accumulator=element_accumulator)
|
||||
|
||||
self.op = self.plan.construct(
|
||||
alignment_A=self.alignment_A, alignment_B=self.alignment_B,
|
||||
alignment_C=self.alignment_C)
|
||||
|
||||
def _plans_equal(self, other_plan) -> bool:
|
||||
"""
|
||||
Compares whether two plans are equal
|
||||
|
||||
:param other_plan: plan to compare against the default Conv2d
|
||||
:type other_plan: cutlass.op.Conv2d
|
||||
|
||||
:return: whether `other_plan` is equivalent to `self.plan`
|
||||
:rtype: bool
|
||||
"""
|
||||
other_op = other_plan.construct(
|
||||
alignment_A=self.alignment_A, alignment_B=self.alignment_B,
|
||||
alignment_C=self.alignment_C)
|
||||
|
||||
return self.op.rt_module.emit() == other_op.rt_module.emit()
|
||||
|
||||
def generic_test(self):
|
||||
"""
|
||||
Tests the equivalence of various constructions of the Conv2d interface when using CUTLASS data types
|
||||
and layouts for constructing the Conv2d interface
|
||||
"""
|
||||
if not datatypes.numpy_available:
|
||||
return
|
||||
|
||||
# Test when specifying all parameters
|
||||
plan_other = cutlass.op.Conv2d(
|
||||
kind=self.conv_kind,
|
||||
element_A=self.element_A, element_B=self.element_B, element_C=self.element_C,
|
||||
element_D=self.element_D, element_accumulator=self.element_accumulator)
|
||||
assert self._plans_equal(plan_other)
|
||||
|
||||
# Test when specifying all parameters but A
|
||||
plan_other = cutlass.op.Conv2d(
|
||||
kind=self.conv_kind,
|
||||
element_B=self.element_B, element_C=self.element_C,
|
||||
element_D=self.element_D, element_accumulator=self.element_accumulator,
|
||||
element=self.element_A)
|
||||
assert self._plans_equal(plan_other)
|
||||
|
||||
# Test when specifying all parameters but A and B as tensors using generic element and output
|
||||
plan_other = cutlass.op.Conv2d(
|
||||
kind=self.conv_kind,
|
||||
element_C=self.element_C,
|
||||
element_D=self.element_D, element_accumulator=self.element_accumulator,
|
||||
element=self.element_A)
|
||||
assert self._plans_equal(plan_other)
|
||||
|
||||
# Test without explicit accumulator. Only run if the type of C and the accumulator are equal
|
||||
if self.element_C == self.element_accumulator:
|
||||
plan_other = cutlass.op.Conv2d(
|
||||
kind=self.conv_kind,
|
||||
element_C=self.element_C,
|
||||
element_D=self.element_D,
|
||||
element=self.element_A)
|
||||
assert self._plans_equal(plan_other)
|
||||
|
||||
# Test with only the generic types. Only rune if the types of A, B, C, and D are the same
|
||||
if (self.element_A == self.element_B and self.element_A == self.element_C and self.element_A == self.element_D
|
||||
and self.element_A == self.element_accumulator):
|
||||
plan_other = cutlass.op.Conv2d(kind=self.conv_kind, element=self.element_A)
|
||||
assert self._plans_equal(plan_other)
|
||||
|
||||
def numpy_test(self):
|
||||
"""
|
||||
Tests the equivalence of various constructions of the Conv2d interface when using numpy as a frontend
|
||||
"""
|
||||
if not datatypes.numpy_available:
|
||||
return
|
||||
|
||||
import numpy as np
|
||||
type_A = datatypes.numpy_type(self.element_A)
|
||||
type_B = datatypes.numpy_type(self.element_B)
|
||||
type_C = datatypes.numpy_type(self.element_C)
|
||||
type_D = datatypes.numpy_type(self.element_D)
|
||||
type_accum = datatypes.numpy_type(self.element_accumulator)
|
||||
|
||||
size = (2, 2)
|
||||
A = np.zeros(size, dtype=type_A)
|
||||
B = np.zeros(size, dtype=type_B)
|
||||
C = np.zeros(size, dtype=type_C)
|
||||
D = np.zeros(size, dtype=type_D)
|
||||
|
||||
return self.tensor_test(type_A, type_B, type_C, type_D, type_accum, A, B, C, D)
|
||||
|
||||
def torch_test(self):
|
||||
"""
|
||||
Tests the equivalence of various constructions of the Conv2d interface when using torch as a frontend
|
||||
"""
|
||||
if not datatypes.torch_available:
|
||||
return
|
||||
|
||||
import torch
|
||||
type_A = datatypes.torch_type(self.element_A)
|
||||
type_B = datatypes.torch_type(self.element_B)
|
||||
type_C = datatypes.torch_type(self.element_C)
|
||||
type_D = datatypes.torch_type(self.element_D)
|
||||
type_accum = datatypes.torch_type(self.element_accumulator)
|
||||
|
||||
size = (2, 2)
|
||||
|
||||
A = torch.empty(size, dtype=type_A)
|
||||
B = torch.empty(size, dtype=type_B)
|
||||
C = torch.empty(size, dtype=type_C)
|
||||
D = torch.empty(size, dtype=type_D)
|
||||
|
||||
return self.tensor_test(type_A, type_B, type_C, type_D, type_accum, A, B, C, D)
|
||||
|
||||
def tensor_test(self, type_A, type_B, type_C, type_D, type_accum, A, B, C, D):
|
||||
# Test when specifying all parameters via tensors
|
||||
plan_np = cutlass.op.Conv2d(kind=self.conv_kind, A=A, B=B, C=C, D=D, element_accumulator=type_accum)
|
||||
assert self._plans_equal(plan_np)
|
||||
|
||||
# Test when specifying all parameters but A as tensors
|
||||
plan_np = cutlass.op.Conv2d(kind=self.conv_kind, B=B, C=C, D=D, element_accumulator=type_accum, element_A=type_A)
|
||||
assert self._plans_equal(plan_np)
|
||||
|
||||
# Test when specifying all parameters but A and B as tensors and using generic element and output
|
||||
if type_A == type_B:
|
||||
plan_np = cutlass.op.Conv2d(kind=self.conv_kind, C=C, D=D, element_accumulator=type_accum, element=type_A)
|
||||
assert self._plans_equal(plan_np)
|
||||
|
||||
# Test without explicit accumulator. Only run if the type of C and the accumulator.
|
||||
if type_C == type_accum:
|
||||
plan_np = cutlass.op.Conv2d(kind=self.conv_kind, A=A, B=B, C=C, D=D)
|
||||
assert self._plans_equal(plan_np)
|
||||
|
||||
# Test with only the generic types and layouts. Only run if types and layouts of A, B, C, and D are the same.
|
||||
if (type_A == type_B and type_A == type_C and type_A == type_D and type_A == type_accum):
|
||||
plan_np = cutlass.op.Conv2d(kind=self.conv_kind, element=type_A)
|
||||
assert self._plans_equal(plan_np)
|
||||
|
||||
def test_all(self):
|
||||
"""
|
||||
Runs all tests on the Gemm interface
|
||||
"""
|
||||
self.generic_test()
|
||||
self.numpy_test()
|
||||
self.torch_test()
|
||||
|
||||
|
||||
@unittest.skipIf(device_cc() <= 80, 'Device compute capability is insufficient for SM80 tests.')
|
||||
class ConvEquivalenceTest(unittest.TestCase):
|
||||
"""
|
||||
Tests the equivalence of different constructions of the Conv2d interface
|
||||
"""
|
||||
pass
|
||||
|
||||
type2alignment = {
|
||||
cutlass.DataType.f16: 8,
|
||||
cutlass.DataType.f32: 4
|
||||
}
|
||||
|
||||
def add_test(conv_kind, element_A, element_B, element_C, element_D, element_accumulator):
|
||||
|
||||
test_name = f"test_conv2d_{conv_kind}_{element_A}_{element_B}_{element_C}_{element_D}_{element_accumulator}"
|
||||
|
||||
def run(self):
|
||||
conv2d_eq = Conv2dEquivalence(
|
||||
conv_kind=conv_kind,
|
||||
element_A=element_A, element_B=element_B,
|
||||
element_C=element_C, element_D=element_D,
|
||||
element_accumulator=element_accumulator,
|
||||
alignment_A=type2alignment[element_A], alignment_B=type2alignment[element_B],
|
||||
alignment_C=type2alignment[element_C]
|
||||
)
|
||||
conv2d_eq.test_all()
|
||||
|
||||
setattr(ConvEquivalenceTest, test_name, run)
|
||||
|
||||
for conv_kind in ["fprop", "wgrad", "dgrad"]:
|
||||
for types in [
|
||||
[cutlass.DataType.f16, cutlass.DataType.f16, cutlass.DataType.f16, cutlass.DataType.f16, cutlass.DataType.f16],
|
||||
[cutlass.DataType.f16, cutlass.DataType.f16, cutlass.DataType.f16, cutlass.DataType.f16, cutlass.DataType.f32],
|
||||
[cutlass.DataType.f16, cutlass.DataType.f16, cutlass.DataType.f32, cutlass.DataType.f32, cutlass.DataType.f16],
|
||||
[cutlass.DataType.f16, cutlass.DataType.f16, cutlass.DataType.f32, cutlass.DataType.f32, cutlass.DataType.f32],
|
||||
[cutlass.DataType.f32, cutlass.DataType.f32, cutlass.DataType.f32, cutlass.DataType.f32, cutlass.DataType.f32]
|
||||
]:
|
||||
add_test(conv_kind, types[0], types[1], types[2], types[3], types[4])
|
||||
|
||||
|
||||
@unittest.skipIf(device_cc() <= 80, 'Device compute capability is insufficient for SM80 tests.')
|
||||
class Conv2dErrorTests(unittest.TestCase):
|
||||
"""
|
||||
Tests various error scenarios that arise with the high-level Gemm interface
|
||||
"""
|
||||
|
||||
def test_alignment(self):
|
||||
"""
|
||||
Tests case in which the alignment specified is unsupported
|
||||
"""
|
||||
plan = cutlass.op.Conv2d(kind="fprop", element=cutlass.DataType.f16)
|
||||
|
||||
with ExpectException(True, 'Alignment 3 is not supported for F16. The construction should fail.'):
|
||||
op = plan.construct(alignment_A=3, alignment_B=3, alignment_C=3)
|
||||
|
||||
def test_invalid_tile_description(self):
|
||||
"""
|
||||
Tests scenarios in which an invalid tile description is provided for a given CC
|
||||
"""
|
||||
plan = cutlass.op.Conv2d(kind="fprop", element=cutlass.DataType.f16)
|
||||
|
||||
td = plan.tile_descriptions()[0]
|
||||
td.threadblock_shape=[17, 32, 5]
|
||||
|
||||
plan.tile_description = td
|
||||
with ExpectException(True, 'The threadblock shape is invalid. The compilation should fail.'):
|
||||
plan.compile()
|
||||
# Clean up the error message
|
||||
os.remove("./cutlass_python_compilation_device_error.txt")
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
245
test/python/cutlass/interface/evt_interface.py
Normal file
245
test/python/cutlass/interface/evt_interface.py
Normal file
@@ -0,0 +1,245 @@
|
||||
#################################################################################################
|
||||
#
|
||||
# Copyright (c) 2023 - 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions are met:
|
||||
#
|
||||
# 1. Redistributions of source code must retain the above copyright notice, this
|
||||
# list of conditions and the following disclaimer.
|
||||
#
|
||||
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
# this list of conditions and the following disclaimer in the documentation
|
||||
# and/or other materials provided with the distribution.
|
||||
#
|
||||
# 3. Neither the name of the copyright holder nor the names of its
|
||||
# contributors may be used to endorse or promote products derived from
|
||||
# this software without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
#
|
||||
#################################################################################################
|
||||
|
||||
"""
|
||||
Test the EVT interface
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import unittest
|
||||
|
||||
import cutlass
|
||||
from cutlass import LayoutType, Tensor
|
||||
from cutlass.backend.utils.device import device_cc
|
||||
from cutlass.epilogue import reshape, permute
|
||||
|
||||
from utils import ExpectException
|
||||
|
||||
|
||||
@unittest.skipIf(device_cc() not in [80, 90], "This unittest is for Sm80 and Sm90 only")
|
||||
class EVTErrorTests(unittest.TestCase):
|
||||
"""
|
||||
Tests various error scenarios that arise with the EVT interface
|
||||
"""
|
||||
@unittest.skipIf(device_cc() != 90, "Only Sm90 EVT requires root node be 'D'")
|
||||
def test_root_not_d(self):
|
||||
"""
|
||||
Test when "D" does not exist in Sm90 EVT
|
||||
"""
|
||||
def evt_root_not_d(accum, alpha):
|
||||
F = accum * alpha
|
||||
return F
|
||||
|
||||
example_tensors = {
|
||||
"accum": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"alpha": 1.2,
|
||||
"F": self.fake_tensor(np.float16, (6, 512, 512))
|
||||
}
|
||||
|
||||
with ExpectException(device_cc() == 90,
|
||||
"SyntaxError: Sm90 EVT requires the epilogue to have a returned tensor D, "
|
||||
"but the variable 'D' is not found in the return values.", True):
|
||||
|
||||
cutlass.epilogue.trace(evt_root_not_d, example_tensors)
|
||||
|
||||
def test_no_accum(self):
|
||||
"""
|
||||
Test when "accum" is not in input arguments
|
||||
"""
|
||||
def evt_no_accum(alpha, C):
|
||||
D = alpha * C
|
||||
return D
|
||||
|
||||
example_tensors = {
|
||||
"C": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"alpha": 1.2,
|
||||
"D": self.fake_tensor(np.float16, (6, 512, 512))
|
||||
}
|
||||
|
||||
with ExpectException(True, "SyntaxError: Cannot find 'accum' in the argument list.", True):
|
||||
cutlass.epilogue.trace(evt_no_accum, example_tensors)
|
||||
|
||||
@unittest.skipIf(device_cc() != 90, "Only Sm90 EVT has concern on smem size")
|
||||
def test_too_much_shared_memory(self):
|
||||
"""
|
||||
Test when the epilogue consumes too much shared memory
|
||||
"""
|
||||
def evt_too_much_shared_memory(accum, C1, C2, C3, C4, C5):
|
||||
D1 = accum + C1
|
||||
D2 = D1 + C2
|
||||
D3 = D2 + C3
|
||||
D4 = D3 + C4
|
||||
D = D4 + C5
|
||||
return D, D1, D2, D3, D4
|
||||
|
||||
example_tensors = {
|
||||
"accum": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"C1": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"C2": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"C3": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"C4": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"C5": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"D1": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"D2": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"D3": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"D4": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"D": self.fake_tensor(np.float16, (6, 512, 512))
|
||||
}
|
||||
|
||||
epilogue_visitor = cutlass.epilogue.trace(evt_too_much_shared_memory, example_tensors)
|
||||
|
||||
plan = cutlass.op.Gemm(
|
||||
element=np.float16, layout=cutlass.LayoutType.RowMajor,
|
||||
element_accumulator=np.float32
|
||||
)
|
||||
|
||||
with ExpectException(True,
|
||||
"RuntimeError: The epilogue consumes too much shared memory. "
|
||||
"No valid tile description is found in the generator.", True):
|
||||
plan.epilogue_visitor = epilogue_visitor
|
||||
|
||||
def test_not_ssa(self):
|
||||
"""
|
||||
Test when the epilogue is not in SSA
|
||||
"""
|
||||
def evt_redefine(accum, C, alpha):
|
||||
F = accum + C
|
||||
F = F * alpha
|
||||
D = F
|
||||
return D, F
|
||||
|
||||
example_tensors = {
|
||||
"accum": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"C": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"alpha": 1.5,
|
||||
"D": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"F": self.fake_tensor(np.float16, (6, 512, 512))
|
||||
}
|
||||
|
||||
with ExpectException(True, "SyntaxError: Variable 'F' cannot be defined twice.", True):
|
||||
cutlass.epilogue.trace(evt_redefine, example_tensors)
|
||||
|
||||
def evt_undefine(accum, alpha):
|
||||
F = accum + C
|
||||
D = F * alpha
|
||||
return D, F
|
||||
|
||||
example_tensors = {
|
||||
"accum": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"alpha": 1.5,
|
||||
"D": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"F": self.fake_tensor(np.float16, (6, 512, 512))
|
||||
}
|
||||
|
||||
with ExpectException(True, "SyntaxError: Variable 'C' is undefined.", True):
|
||||
cutlass.epilogue.trace(evt_undefine, example_tensors)
|
||||
|
||||
def test_missing_example_tensor(self):
|
||||
"""
|
||||
Test when the example tensor of an input/output variable is not provided
|
||||
"""
|
||||
def evt_missing_example_tensor(accum, C):
|
||||
D = accum + C
|
||||
return D
|
||||
|
||||
example_tensors = {
|
||||
"accum": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"C": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
}
|
||||
|
||||
with ExpectException(True, "RuntimeError: Example input for D is not provided.", True):
|
||||
cutlass.epilogue.trace(evt_missing_example_tensor, example_tensors)
|
||||
|
||||
example_tensors = {
|
||||
"accum": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"D": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
}
|
||||
|
||||
with ExpectException(True, "RuntimeError: Example input for C is not provided.", True):
|
||||
cutlass.epilogue.trace(evt_missing_example_tensor, example_tensors)
|
||||
|
||||
def test_return_expression(self):
|
||||
"""
|
||||
Test when the return value is an expression
|
||||
"""
|
||||
def evt_return_expr(accum, C):
|
||||
return accum + C
|
||||
|
||||
example_tensors = {
|
||||
"accum": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"C": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
}
|
||||
|
||||
with ExpectException(True, "SyntaxError: Return value cannot be an expression", True):
|
||||
cutlass.epilogue.trace(evt_return_expr, example_tensors)
|
||||
|
||||
def test_incompatible_shape(self):
|
||||
"""
|
||||
Test when the shape of example tensors are incompatible
|
||||
"""
|
||||
def evt_incompatible_shape(accum, C):
|
||||
D = accum + C
|
||||
return D
|
||||
|
||||
example_tensors = {
|
||||
"accum": self.fake_tensor(np.float16, (6, 256, 512)),
|
||||
"C": self.fake_tensor(np.float16, (6, 512, 512)),
|
||||
"D": self.fake_tensor(np.float16, (6, 512, 512))
|
||||
}
|
||||
|
||||
with ExpectException(True,
|
||||
"RuntimeError: Dimension mismatch between accum(6, 256, 512), C(6, 512, 512).", True):
|
||||
cutlass.epilogue.trace(evt_incompatible_shape, example_tensors)
|
||||
|
||||
def test_no_matching_impl(self):
|
||||
def evt_no_matching_impl(accum, bias):
|
||||
D = accum + reshape(permute(bias, indices=(1, 0)), new_shape=(512, 1))
|
||||
return D
|
||||
|
||||
example_tensors = {
|
||||
"accum": self.fake_tensor(np.float16, (6, 512, 256)),
|
||||
"bias": self.fake_tensor(np.float16, (16, 32)),
|
||||
"D": self.fake_tensor(np.float16, (6, 512, 256))
|
||||
}
|
||||
|
||||
with ExpectException(True, "NotImplementedError: No matching op for node bias with stride (0, (1, 32), 0).", True):
|
||||
cutlass.epilogue.trace(evt_no_matching_impl, example_tensors)
|
||||
#
|
||||
# Helper functions
|
||||
#
|
||||
|
||||
def fake_tensor(self, element, shape):
|
||||
return Tensor(element=element, shape=shape, layout_tag=LayoutType.RowMajor)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
351
test/python/cutlass/interface/gemm_interface.py
Normal file
351
test/python/cutlass/interface/gemm_interface.py
Normal file
@@ -0,0 +1,351 @@
|
||||
#################################################################################################
|
||||
#
|
||||
# Copyright (c) 2023 - 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions are met:
|
||||
#
|
||||
# 1. Redistributions of source code must retain the above copyright notice, this
|
||||
# list of conditions and the following disclaimer.
|
||||
#
|
||||
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
# this list of conditions and the following disclaimer in the documentation
|
||||
# and/or other materials provided with the distribution.
|
||||
#
|
||||
# 3. Neither the name of the copyright holder nor the names of its
|
||||
# contributors may be used to endorse or promote products derived from
|
||||
# this software without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
#
|
||||
#################################################################################################
|
||||
|
||||
"""
|
||||
Tests the high-level GEMM interface
|
||||
"""
|
||||
|
||||
from math import ceil
|
||||
import unittest
|
||||
|
||||
import cutlass
|
||||
import cutlass.utils.datatypes as datatypes
|
||||
from cutlass.backend.utils.device import device_cc
|
||||
from utils import ExpectException
|
||||
|
||||
|
||||
class GemmEquivalence:
|
||||
"""
|
||||
Helper class for testing the equivalence of different constructions of the Gemm interface
|
||||
"""
|
||||
def __init__(self, element_A, element_B, element_C, element_D, element_accumulator,
|
||||
layout_A, layout_B, layout_C, alignment_A, alignment_B, alignment_C):
|
||||
self.element_A = element_A
|
||||
self.element_B = element_B
|
||||
self.element_C = element_C
|
||||
self.element_D = element_D
|
||||
self.element_accumulator = element_accumulator
|
||||
self.layout_A = layout_A
|
||||
self.layout_B = layout_B
|
||||
self.layout_C = layout_C
|
||||
self.alignment_A = alignment_A
|
||||
self.alignment_B = alignment_B
|
||||
self.alignment_C = alignment_C
|
||||
self.plan = cutlass.op.Gemm(element_A=element_A, element_B=element_B, element_C=element_C,
|
||||
element_D=element_D, element_accumulator=element_accumulator,
|
||||
layout_A=layout_A, layout_B=layout_B, layout_C=layout_C)
|
||||
self.op = self.plan.construct(alignment_A=alignment_A, alignment_B=alignment_B, alignment_C=alignment_C)
|
||||
|
||||
def _plans_equal(self, other_plan) -> bool:
|
||||
"""
|
||||
Compares whether two plans are equal
|
||||
|
||||
:param other_plan: plan to compare against the default GEMM
|
||||
:type other_plan: cutlass.op.Gemm
|
||||
|
||||
:return: whether `other_plan` is equivalent to `self.plan`
|
||||
:rtype: bool
|
||||
"""
|
||||
other_op = other_plan.construct(alignment_A=self.alignment_A, alignment_B=self.alignment_B, alignment_C=self.alignment_C)
|
||||
|
||||
# Compare whether the operations are equal by comparing the C++ code that would be emitted for them
|
||||
return self.op.rt_module.emit() == other_op.rt_module.emit()
|
||||
|
||||
def generic_test(self):
|
||||
"""
|
||||
Tests the equivalence of various constructions of the Gemm interface when using CUTLASS data types
|
||||
and layouts for constructing the Gemm interface
|
||||
"""
|
||||
if not datatypes.numpy_available:
|
||||
return
|
||||
|
||||
# Test when specifying all parameters
|
||||
plan_other = cutlass.op.Gemm(element_A=self.element_A, element_B=self.element_B, element_C=self.element_C,
|
||||
element_D=self.element_D, element_accumulator=self.element_accumulator,
|
||||
layout_A=self.layout_A, layout_B=self.layout_B, layout_C=self.layout_C)
|
||||
assert self._plans_equal(plan_other)
|
||||
|
||||
# Test when specifying all parameters but A
|
||||
plan_other = cutlass.op.Gemm(element_B=self.element_B, element_C=self.element_C,
|
||||
element_D=self.element_D, element_accumulator=self.element_accumulator,
|
||||
layout_B=self.layout_B, layout_C=self.layout_C,
|
||||
element=self.element_A, layout=self.layout_A)
|
||||
assert self._plans_equal(plan_other)
|
||||
|
||||
# Test when specifying all parameters but A and B as tensors and using generic element and output
|
||||
# Only run this test if the layouts and types for A and B are equal.
|
||||
if self.element_A == self.element_B and self.layout_A == self.layout_B:
|
||||
plan_other = cutlass.op.Gemm(element_C=self.element_C, element_D=self.element_D, element_accumulator=self.element_accumulator,
|
||||
layout_C=self.layout_C, element=self.element_A, layout=self.layout_A)
|
||||
assert self._plans_equal(plan_other)
|
||||
|
||||
# Test without explicit accumulator. Only run if the type of C and the accumulator.
|
||||
if self.element_C == self.element_accumulator:
|
||||
plan_other = cutlass.op.Gemm(element_A=self.element_A, element_B=self.element_B, element_C=self.element_C,
|
||||
element_D=self.element_D, layout_A=self.layout_A, layout_B=self.layout_B,
|
||||
layout_C=self.layout_C)
|
||||
assert self._plans_equal(plan_other)
|
||||
|
||||
# Test with only the generic types and layouts. Only run if types and layouts of A, B, C, and D are the same.
|
||||
if (self.element_A == self.element_B and self.element_A == self.element_C and self.element_A == self.element_D
|
||||
and self.element_A == self.element_accumulator and
|
||||
self.layout_A == self.layout_B and self.layout_A == self.layout_C):
|
||||
plan_other = cutlass.op.Gemm(element=self.element_A, layout=self.layout_A)
|
||||
assert self._plans_equal(plan_other)
|
||||
|
||||
def numpy_test(self):
|
||||
"""
|
||||
Tests the equivalence of various constructions of the Gemm interface when using numpy as a frontend
|
||||
"""
|
||||
if not datatypes.numpy_available:
|
||||
return
|
||||
|
||||
import numpy as np
|
||||
type_A = datatypes.numpy_type(self.element_A)
|
||||
type_B = datatypes.numpy_type(self.element_B)
|
||||
type_C = datatypes.numpy_type(self.element_C)
|
||||
type_D = datatypes.numpy_type(self.element_D)
|
||||
type_accum = datatypes.numpy_type(self.element_accumulator)
|
||||
|
||||
layout_to_order = {
|
||||
cutlass.LayoutType.RowMajor: 'C',
|
||||
cutlass.LayoutType.ColumnMajor: 'F'
|
||||
}
|
||||
size = (2, 2)
|
||||
A = np.zeros(size, order=layout_to_order[self.layout_A], dtype=type_A)
|
||||
B = np.zeros(size, order=layout_to_order[self.layout_B], dtype=type_B)
|
||||
C = np.zeros(size, order=layout_to_order[self.layout_C], dtype=type_C)
|
||||
D = np.zeros(size, order=layout_to_order[self.layout_C], dtype=type_D)
|
||||
|
||||
# Test when specifying all parameters via tensors
|
||||
plan_np = cutlass.op.Gemm(A=A, B=B, C=C, D=D, element_accumulator=type_accum)
|
||||
assert self._plans_equal(plan_np)
|
||||
|
||||
# Test when specifying all parameters but A as tensors
|
||||
plan_np = cutlass.op.Gemm(B=B, C=C, D=D, element_accumulator=type_accum, element_A=type_A, layout_A=self.layout_A)
|
||||
assert self._plans_equal(plan_np)
|
||||
|
||||
# Test when specifying all parameters but A and B as tensors and using generic element and output
|
||||
# Only run this test if the layouts and types for A and B are equal.
|
||||
if type_A == type_B and self.layout_A == self.layout_B:
|
||||
plan_np = cutlass.op.Gemm(C=C, D=D, element_accumulator=type_accum, element=type_A, layout=self.layout_A)
|
||||
assert self._plans_equal(plan_np)
|
||||
|
||||
# Test without explicit accumulator. Only run if the type of C and the accumulator.
|
||||
if type_C == type_accum:
|
||||
plan_np = cutlass.op.Gemm(A=A, B=B, C=C, D=D)
|
||||
assert self._plans_equal(plan_np)
|
||||
|
||||
# Test with only the generic types and layouts. Only run if types and layouts of A, B, C, and D are the same.
|
||||
if (type_A == type_B and type_A == type_C and type_A == type_D and type_A == type_accum and
|
||||
self.layout_A == self.layout_B and self.layout_A == self.layout_C):
|
||||
plan_np = cutlass.op.Gemm(element=type_A, layout=self.layout_A)
|
||||
assert self._plans_equal(plan_np)
|
||||
|
||||
def test_all(self):
|
||||
"""
|
||||
Runs all tests on the Gemm interface
|
||||
"""
|
||||
self.generic_test()
|
||||
self.numpy_test()
|
||||
|
||||
|
||||
class GemmEquivalenceTest(unittest.TestCase):
|
||||
"""
|
||||
Tests the equivalence of different constructions of the Gemm interface
|
||||
"""
|
||||
@unittest.skipIf(device_cc() < 70, "Device compute capability is insufficient for FP16 Tensor Core tests.")
|
||||
def test_gemm_equivalence_f16_f16_f16_f16_f16_ttt_8_8_8(self):
|
||||
gemm_eq = GemmEquivalence(
|
||||
element_A=cutlass.DataType.f16, element_B=cutlass.DataType.f16, element_C=cutlass.DataType.f16,
|
||||
element_D=cutlass.DataType.f16, element_accumulator=cutlass.DataType.f16,
|
||||
layout_A=cutlass.LayoutType.RowMajor, layout_B=cutlass.LayoutType.RowMajor, layout_C=cutlass.LayoutType.RowMajor,
|
||||
alignment_A=8, alignment_B=8, alignment_C=8)
|
||||
gemm_eq.test_all()
|
||||
|
||||
@unittest.skipIf(device_cc() < 70, "Device compute capability is insufficient for FP16 Tensor Core tests.")
|
||||
def test_gemm_equivalence_f16_f16_f16_f16_f32_ntn_8_8_8(self):
|
||||
gemm_eq = GemmEquivalence(
|
||||
element_A=cutlass.DataType.f16, element_B=cutlass.DataType.f16, element_C=cutlass.DataType.f16,
|
||||
element_D=cutlass.DataType.f16, element_accumulator=cutlass.DataType.f32,
|
||||
layout_A=cutlass.LayoutType.ColumnMajor, layout_B=cutlass.LayoutType.RowMajor, layout_C=cutlass.LayoutType.ColumnMajor,
|
||||
alignment_A=8, alignment_B=8, alignment_C=8)
|
||||
gemm_eq.test_all()
|
||||
|
||||
@unittest.skipIf(device_cc() < 70, "Device compute capability is insufficient for FP16 Tensor Core tests.")
|
||||
def test_gemm_equivalence_f16_f16_f16_f16_f16_ttt_4_4_4(self):
|
||||
gemm_eq = GemmEquivalence(
|
||||
element_A=cutlass.DataType.f16, element_B=cutlass.DataType.f16, element_C=cutlass.DataType.f16,
|
||||
element_D=cutlass.DataType.f16, element_accumulator=cutlass.DataType.f16,
|
||||
layout_A=cutlass.LayoutType.RowMajor, layout_B=cutlass.LayoutType.RowMajor, layout_C=cutlass.LayoutType.RowMajor,
|
||||
alignment_A=8, alignment_B=8, alignment_C=8)
|
||||
gemm_eq.test_all()
|
||||
|
||||
@unittest.skipIf(device_cc() < 80, "Device compute capability is insufficient for F64 Tensor Core tests.")
|
||||
def test_gemm_equivalence_f64_f64_f64_f64_f64_tnt_1_1_1(self):
|
||||
gemm_eq = GemmEquivalence(
|
||||
element_A=cutlass.DataType.f64, element_B=cutlass.DataType.f64, element_C=cutlass.DataType.f64,
|
||||
element_D=cutlass.DataType.f64, element_accumulator=cutlass.DataType.f64,
|
||||
layout_A=cutlass.LayoutType.RowMajor, layout_B=cutlass.LayoutType.ColumnMajor, layout_C=cutlass.LayoutType.RowMajor,
|
||||
alignment_A=1, alignment_B=1, alignment_C=1)
|
||||
gemm_eq.test_all()
|
||||
|
||||
|
||||
class GemmErrorTests(unittest.TestCase):
|
||||
"""
|
||||
Tests various error scenarios that arise with the high-level Gemm interface
|
||||
"""
|
||||
|
||||
def test_alignment(self):
|
||||
"""
|
||||
Tests case in which the alignment specified is unsupported
|
||||
"""
|
||||
plan = cutlass.op.Gemm(element=cutlass.DataType.f16, layout=cutlass.LayoutType.RowMajor)
|
||||
|
||||
with ExpectException(True, 'Alignment 16 is not supported for F16. The construction should fail.'):
|
||||
op = plan.construct(alignment_A=16, alignment_B=16, alignment_C=16)
|
||||
|
||||
def test_tensorop_availability(self):
|
||||
"""
|
||||
Tests case in which only SIMT operations are available but TensorOp is requested
|
||||
"""
|
||||
cc = device_cc()
|
||||
|
||||
# F64 Tensor Core operations are only avaiable on devices with CC >= 80
|
||||
supports_tensorop_f64 = cc >= 80
|
||||
plan = cutlass.op.Gemm(cc=cc, element=cutlass.DataType.f64, layout=cutlass.LayoutType.RowMajor)
|
||||
|
||||
error_msg = f'Incorrectly raised an exception for availability of TensorOp with F64 operands on SM{cc}'
|
||||
with ExpectException(not supports_tensorop_f64, error_msg):
|
||||
plan.opclass = cutlass.OpcodeClass.TensorOp
|
||||
|
||||
expected_opclass = cutlass.OpcodeClass.TensorOp if supports_tensorop_f64 else cutlass.OpcodeClass.Simt
|
||||
assert plan.opclass == expected_opclass, f'Expected opclass to be {expected_opclass}, but received {plan.opclass} for SM{cc}'
|
||||
|
||||
@unittest.skipIf(device_cc() < 70, "Device compute capability is insufficient for F16 Tensor Core tests.")
|
||||
def test_opclass_switch(self):
|
||||
"""
|
||||
Tests cases in which the opcode class in question is switched (e.g., from TensorOp to SIMT)
|
||||
"""
|
||||
plan = cutlass.op.Gemm( element=cutlass.DataType.f16, layout=cutlass.LayoutType.RowMajor)
|
||||
assert plan.opclass == cutlass.OpcodeClass.TensorOp
|
||||
|
||||
# Ensure that all tile descriptions have opclass of TensorOp
|
||||
for td in plan.tile_descriptions():
|
||||
assert td.math_instruction.opcode_class == cutlass.OpcodeClass.TensorOp
|
||||
|
||||
plan.opclass = cutlass.OpcodeClass.Simt
|
||||
|
||||
# Ensure that all tile descriptions have opclass of Simt
|
||||
for td in plan.tile_descriptions():
|
||||
assert td.math_instruction.opcode_class == cutlass.OpcodeClass.Simt
|
||||
|
||||
def test_invalid_tile_description(self):
|
||||
"""
|
||||
Tests scenarios in which an invalid tile description is provided for a given CC
|
||||
"""
|
||||
cc = device_cc()
|
||||
plan = cutlass.op.Gemm(cc=cc, element=cutlass.DataType.f16, layout=cutlass.LayoutType.RowMajor)
|
||||
td = plan.tile_descriptions()[0]
|
||||
stages = td.stages
|
||||
|
||||
# Zero stage count is valid for SM90+, as this is used to indicate that the builder's auto stage
|
||||
# count should be used
|
||||
with ExpectException(cc < 90, f'Requested zero stages'):
|
||||
td.stages = 0
|
||||
plan.construct(td)
|
||||
|
||||
if cc < 90:
|
||||
with ExpectException(cc < 80, f'Requested more than 2 stages on SM{cc}'):
|
||||
td.stages = 3
|
||||
plan.construct(td)
|
||||
else:
|
||||
original_kschedule = td.kernel_schedule
|
||||
original_eschedule = td.epilogue_schedule
|
||||
with ExpectException(False, f'Incorrectly flagged an error for insufficient shared memory'):
|
||||
td.kernel_schedule = cutlass.KernelScheduleType.TmaWarpSpecializedPingpong
|
||||
td.epilogue_schedule = cutlass.EpilogueScheduleType.NoSmemWarpSpecialized
|
||||
td.stages = 3
|
||||
plan.construct(td)
|
||||
|
||||
# Reset schedules
|
||||
td.kernel_schedule = original_kschedule
|
||||
td.epilogue_schedule = original_eschedule
|
||||
|
||||
with ExpectException(True, f'Requested too many stages'):
|
||||
td.stages = 100
|
||||
plan.construct(td)
|
||||
|
||||
# Reset stage count
|
||||
td.stages = stages
|
||||
|
||||
cluster_shape = td.cluster_shape
|
||||
with ExpectException(cc < 90, f'Requested non-unit cluster shape on SM{cc}'):
|
||||
td.cluster_shape = [2, 1, 1]
|
||||
plan.construct(td)
|
||||
|
||||
# Reset cluster shape
|
||||
td.cluster_shape = cluster_shape
|
||||
|
||||
with ExpectException(cc < 90, f'Requested a non-auto schedule on SM{cc}'):
|
||||
td.kernel_schedule = cutlass.KernelScheduleType.TmaWarpSpecializedPingpong
|
||||
td.epilogue_schedule = cutlass.EpilogueScheduleType.TmaWarpSpecialized
|
||||
plan.construct(td)
|
||||
|
||||
with ExpectException(True, f'Requested a non-auto kernel schedule with an auto epilogue schedule'):
|
||||
td.kernel_schedule = cutlass.KernelScheduleType.TmaWarpSpecializedPingpong
|
||||
td.epilogue_schedule = cutlass.EpilogueScheduleType.ScheduleAuto
|
||||
plan.construct(td)
|
||||
|
||||
with ExpectException(True, f'Requested an auto kernel schedule with a non-auto epilogue schedule'):
|
||||
td.kernel_schedule = cutlass.KernelScheduleType.ScheduleAuto
|
||||
td.epilogue_schedule = cutlass.EpilogueScheduleType.TmaWarpSpecialized
|
||||
plan.construct(td)
|
||||
|
||||
with ExpectException(cc < 90, f'Requested a tile scheduler on SM{cc}'):
|
||||
td.kernel_schedule = cutlass.KernelScheduleType.TmaWarpSpecializedCooperative
|
||||
td.epilogue_schedule = cutlass.EpilogueScheduleType.TmaWarpSpecializedCooperative
|
||||
td.tile_scheduler = cutlass.TileSchedulerType.StreamK
|
||||
plan.construct(td)
|
||||
|
||||
# Ensure that all returned tile descriptions are unique
|
||||
ops = {}
|
||||
for i, td in enumerate(plan.tile_descriptions()):
|
||||
op = plan.construct(td)
|
||||
code_str = op.rt_module.emit()
|
||||
if code_str in ops:
|
||||
conflicting_td = ops[code_str]
|
||||
assert False, f'Multiple tile descriptions emitted {code_str}\nTile descriptions are:\n{td}\n{conflicting_td}'
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
69
test/python/cutlass/interface/utils.py
Normal file
69
test/python/cutlass/interface/utils.py
Normal file
@@ -0,0 +1,69 @@
|
||||
#################################################################################################
|
||||
#
|
||||
# Copyright (c) 2023 - 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions are met:
|
||||
#
|
||||
# 1. Redistributions of source code must retain the above copyright notice, this
|
||||
# list of conditions and the following disclaimer.
|
||||
#
|
||||
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
# this list of conditions and the following disclaimer in the documentation
|
||||
# and/or other materials provided with the distribution.
|
||||
#
|
||||
# 3. Neither the name of the copyright holder nor the names of its
|
||||
# contributors may be used to endorse or promote products derived from
|
||||
# this software without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
#
|
||||
#################################################################################################
|
||||
|
||||
"""
|
||||
Helper functions & classes for interface test
|
||||
"""
|
||||
class ExpectException:
|
||||
"""
|
||||
Utility class to assert that an exception was raised when expected
|
||||
|
||||
Example:
|
||||
|
||||
.. highlight:: python
|
||||
.. code-block:: python
|
||||
|
||||
with ExceptionExpected(True, 'Division by zero'):
|
||||
x = 1.0 / 0.0
|
||||
|
||||
:param exception_expected: whether an exception is expected to be raised
|
||||
:type exception_expected: bool
|
||||
:param message: message to print if an exception is raised when not expected or vice versa
|
||||
:type message: str
|
||||
"""
|
||||
def __init__(self, exception_expected: bool, message: str = '', verify_msg=False):
|
||||
self.exception_expected = exception_expected
|
||||
self.message = message
|
||||
self.verify_msg = verify_msg
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, traceback):
|
||||
exception_raised = exc_type is not None
|
||||
assert self.exception_expected == exception_raised, self.message
|
||||
if self.verify_msg:
|
||||
exc_message = f"{exc_type.__name__}: {exc_val}"
|
||||
assert exc_message == self.message, f"expect error message {self.message}, got {exc_message}"
|
||||
|
||||
# Suppress the exception
|
||||
return True
|
||||
Reference in New Issue
Block a user