mirror of
https://github.com/dmlc/dgl.git
synced 2026-06-04 19:44:23 +08:00
243 lines
7.1 KiB
Python
243 lines
7.1 KiB
Python
import operator
|
|
|
|
import backend as F
|
|
|
|
import dgl.sparse as dglsp
|
|
import pytest
|
|
import torch
|
|
|
|
from dgl.sparse import diag, power
|
|
|
|
|
|
@pytest.mark.parametrize("opname", ["add", "sub", "mul", "truediv"])
|
|
def test_diag_op_diag(opname):
|
|
op = getattr(operator, opname)
|
|
ctx = F.ctx()
|
|
shape = (3, 4)
|
|
D1 = diag(torch.arange(1, 4).to(ctx), shape=shape)
|
|
D2 = diag(torch.arange(10, 13).to(ctx), shape=shape)
|
|
result = op(D1, D2)
|
|
assert torch.allclose(result.val, op(D1.val, D2.val), rtol=1e-4, atol=1e-4)
|
|
assert result.shape == D1.shape
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"v_scalar", [2, 2.5, torch.tensor(2), torch.tensor(2.5)]
|
|
)
|
|
def test_diag_op_scalar(v_scalar):
|
|
ctx = F.ctx()
|
|
shape = (3, 4)
|
|
D1 = diag(torch.arange(1, 4).to(ctx), shape=shape)
|
|
|
|
# D * v
|
|
D2 = D1 * v_scalar
|
|
assert torch.allclose(D1.val * v_scalar, D2.val, rtol=1e-4, atol=1e-4)
|
|
assert D1.shape == D2.shape
|
|
|
|
# v * D
|
|
D2 = v_scalar * D1
|
|
assert torch.allclose(v_scalar * D1.val, D2.val, rtol=1e-4, atol=1e-4)
|
|
assert D1.shape == D2.shape
|
|
|
|
# D / v
|
|
D2 = D1 / v_scalar
|
|
assert torch.allclose(D1.val / v_scalar, D2.val, rtol=1e-4, atol=1e-4)
|
|
assert D1.shape == D2.shape
|
|
|
|
# D ^ v
|
|
D1 = diag(torch.arange(1, 4).to(ctx))
|
|
D2 = D1**v_scalar
|
|
assert torch.allclose(D1.val**v_scalar, D2.val, rtol=1e-4, atol=1e-4)
|
|
assert D1.shape == D2.shape
|
|
|
|
# pow(D, v)
|
|
D2 = power(D1, v_scalar)
|
|
assert torch.allclose(D1.val**v_scalar, D2.val, rtol=1e-4, atol=1e-4)
|
|
assert D1.shape == D2.shape
|
|
|
|
with pytest.raises(TypeError):
|
|
D1 + v_scalar
|
|
with pytest.raises(TypeError):
|
|
v_scalar + D1
|
|
|
|
with pytest.raises(TypeError):
|
|
D1 - v_scalar
|
|
with pytest.raises(TypeError):
|
|
v_scalar - D1
|
|
|
|
|
|
@pytest.mark.parametrize("val_shape", [(), (2,)])
|
|
@pytest.mark.parametrize("opname", ["add", "sub"])
|
|
def test_addsub_coo(val_shape, opname):
|
|
op = getattr(operator, opname)
|
|
func = getattr(dglsp, opname)
|
|
ctx = F.ctx()
|
|
row = torch.tensor([1, 0, 2]).to(ctx)
|
|
col = torch.tensor([0, 3, 2]).to(ctx)
|
|
val = torch.randn(row.shape + val_shape).to(ctx)
|
|
A = dglsp.from_coo(row, col, val)
|
|
|
|
row = torch.tensor([1, 0]).to(ctx)
|
|
col = torch.tensor([0, 2]).to(ctx)
|
|
val = torch.randn(row.shape + val_shape).to(ctx)
|
|
B = dglsp.from_coo(row, col, val, shape=A.shape)
|
|
|
|
C1 = op(A, B).to_dense()
|
|
C2 = func(A, B).to_dense()
|
|
dense_C = op(A.to_dense(), B.to_dense())
|
|
|
|
assert torch.allclose(dense_C, C1)
|
|
assert torch.allclose(dense_C, C2)
|
|
|
|
with pytest.raises(TypeError):
|
|
op(A, 2)
|
|
with pytest.raises(TypeError):
|
|
op(2, A)
|
|
|
|
|
|
@pytest.mark.parametrize("val_shape", [(), (2,)])
|
|
@pytest.mark.parametrize("opname", ["add", "sub"])
|
|
def test_addsub_csr(val_shape, opname):
|
|
op = getattr(operator, opname)
|
|
func = getattr(dglsp, opname)
|
|
ctx = F.ctx()
|
|
indptr = torch.tensor([0, 1, 2, 3]).to(ctx)
|
|
indices = torch.tensor([3, 0, 2]).to(ctx)
|
|
val = torch.randn(indices.shape + val_shape).to(ctx)
|
|
A = dglsp.from_csr(indptr, indices, val)
|
|
|
|
indptr = torch.tensor([0, 1, 2, 2]).to(ctx)
|
|
indices = torch.tensor([2, 0]).to(ctx)
|
|
val = torch.randn(indices.shape + val_shape).to(ctx)
|
|
B = dglsp.from_csr(indptr, indices, val, shape=A.shape)
|
|
|
|
C1 = op(A, B).to_dense()
|
|
C2 = func(A, B).to_dense()
|
|
dense_C = op(A.to_dense(), B.to_dense())
|
|
|
|
assert torch.allclose(dense_C, C1)
|
|
assert torch.allclose(dense_C, C2)
|
|
|
|
with pytest.raises(TypeError):
|
|
op(A, 2)
|
|
with pytest.raises(TypeError):
|
|
op(2, A)
|
|
|
|
|
|
@pytest.mark.parametrize("val_shape", [(), (2,)])
|
|
@pytest.mark.parametrize("opname", ["add", "sub"])
|
|
def test_addsub_csc(val_shape, opname):
|
|
op = getattr(operator, opname)
|
|
func = getattr(dglsp, opname)
|
|
ctx = F.ctx()
|
|
indptr = torch.tensor([0, 1, 1, 2, 3]).to(ctx)
|
|
indices = torch.tensor([1, 2, 0]).to(ctx)
|
|
val = torch.randn(indices.shape + val_shape).to(ctx)
|
|
A = dglsp.from_csc(indptr, indices, val)
|
|
|
|
indptr = torch.tensor([0, 1, 1, 2, 2]).to(ctx)
|
|
indices = torch.tensor([1, 0]).to(ctx)
|
|
val = torch.randn(indices.shape + val_shape).to(ctx)
|
|
B = dglsp.from_csc(indptr, indices, val, shape=A.shape)
|
|
|
|
C1 = op(A, B).to_dense()
|
|
C2 = func(A, B).to_dense()
|
|
dense_C = op(A.to_dense(), B.to_dense())
|
|
|
|
assert torch.allclose(dense_C, C1)
|
|
assert torch.allclose(dense_C, C2)
|
|
|
|
with pytest.raises(TypeError):
|
|
op(A, 2)
|
|
with pytest.raises(TypeError):
|
|
op(2, A)
|
|
|
|
|
|
@pytest.mark.parametrize("val_shape", [(), (2,)])
|
|
@pytest.mark.parametrize("opname", ["add", "sub"])
|
|
def test_addsub_diag(val_shape, opname):
|
|
op = getattr(operator, opname)
|
|
func = getattr(dglsp, opname)
|
|
ctx = F.ctx()
|
|
shape = (3, 4)
|
|
val_shape = (shape[0],) + val_shape
|
|
D1 = dglsp.diag(torch.randn(val_shape).to(ctx), shape=shape)
|
|
D2 = dglsp.diag(torch.randn(val_shape).to(ctx), shape=shape)
|
|
|
|
C1 = op(D1, D2).to_dense()
|
|
C2 = func(D1, D2).to_dense()
|
|
dense_C = op(D1.to_dense(), D2.to_dense())
|
|
|
|
assert torch.allclose(dense_C, C1)
|
|
assert torch.allclose(dense_C, C2)
|
|
|
|
with pytest.raises(TypeError):
|
|
op(D1, 2)
|
|
with pytest.raises(TypeError):
|
|
op(2, D1)
|
|
|
|
|
|
@pytest.mark.parametrize("val_shape", [(), (2,)])
|
|
def test_add_sparse_diag(val_shape):
|
|
ctx = F.ctx()
|
|
row = torch.tensor([1, 0, 2]).to(ctx)
|
|
col = torch.tensor([0, 3, 2]).to(ctx)
|
|
val = torch.randn(row.shape + val_shape).to(ctx)
|
|
A = dglsp.from_coo(row, col, val)
|
|
|
|
shape = (3, 4)
|
|
val_shape = (shape[0],) + val_shape
|
|
D = dglsp.diag(torch.randn(val_shape).to(ctx), shape=shape)
|
|
|
|
sum1 = (A + D).to_dense()
|
|
sum2 = (D + A).to_dense()
|
|
sum3 = dglsp.add(A, D).to_dense()
|
|
sum4 = dglsp.add(D, A).to_dense()
|
|
dense_sum = A.to_dense() + D.to_dense()
|
|
|
|
assert torch.allclose(dense_sum, sum1)
|
|
assert torch.allclose(dense_sum, sum2)
|
|
assert torch.allclose(dense_sum, sum3)
|
|
assert torch.allclose(dense_sum, sum4)
|
|
|
|
|
|
@pytest.mark.parametrize("val_shape", [(), (2,)])
|
|
def test_sub_sparse_diag(val_shape):
|
|
ctx = F.ctx()
|
|
row = torch.tensor([1, 0, 2]).to(ctx)
|
|
col = torch.tensor([0, 3, 2]).to(ctx)
|
|
val = torch.randn(row.shape + val_shape).to(ctx)
|
|
A = dglsp.from_coo(row, col, val)
|
|
|
|
shape = (3, 4)
|
|
val_shape = (shape[0],) + val_shape
|
|
D = dglsp.diag(torch.randn(val_shape).to(ctx), shape=shape)
|
|
|
|
diff1 = (A - D).to_dense()
|
|
diff2 = (D - A).to_dense()
|
|
diff3 = dglsp.sub(A, D).to_dense()
|
|
diff4 = dglsp.sub(D, A).to_dense()
|
|
dense_diff = A.to_dense() - D.to_dense()
|
|
|
|
assert torch.allclose(dense_diff, diff1)
|
|
assert torch.allclose(dense_diff, -diff2)
|
|
assert torch.allclose(dense_diff, diff3)
|
|
assert torch.allclose(dense_diff, -diff4)
|
|
|
|
|
|
@pytest.mark.parametrize("op", ["pow"])
|
|
def test_error_op_sparse_diag(op):
|
|
ctx = F.ctx()
|
|
row = torch.tensor([1, 0, 2]).to(ctx)
|
|
col = torch.tensor([0, 3, 2]).to(ctx)
|
|
val = torch.randn(row.shape).to(ctx)
|
|
A = dglsp.from_coo(row, col, val)
|
|
|
|
shape = (3, 4)
|
|
D = dglsp.diag(torch.randn(row.shape[0]).to(ctx), shape=shape)
|
|
|
|
with pytest.raises(TypeError):
|
|
getattr(operator, op)(A, D)
|
|
with pytest.raises(TypeError):
|
|
getattr(operator, op)(D, A)
|