Files
dgl/tests/python/common/test_heterograph-misc.py
2023-08-07 10:46:35 +08:00

539 lines
16 KiB
Python

import math
import numbers
import backend as F
import dgl
import networkx as nx
import numpy as np
import pytest
import scipy.sparse as sp
from dgl import DGLError
# graph generation: a random graph with 10 nodes
# and 20 edges.
# - has self loop
# - no multi edge
def edge_pair_input(sort=False):
if sort:
src = [0, 0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 4, 4, 5, 5, 6, 7, 7, 7, 9]
dst = [4, 6, 9, 3, 5, 3, 7, 5, 8, 1, 3, 4, 9, 1, 9, 6, 2, 8, 9, 2]
return src, dst
else:
src = [0, 0, 4, 5, 0, 4, 7, 4, 4, 3, 2, 7, 7, 5, 3, 2, 1, 9, 6, 1]
dst = [9, 6, 3, 9, 4, 4, 9, 9, 1, 8, 3, 2, 8, 1, 5, 7, 3, 2, 6, 5]
return src, dst
def nx_input():
g = nx.DiGraph()
src, dst = edge_pair_input()
for i, e in enumerate(zip(src, dst)):
g.add_edge(*e, id=i)
return g
def elist_input():
src, dst = edge_pair_input()
return list(zip(src, dst))
def scipy_coo_input():
src, dst = edge_pair_input()
return sp.coo_matrix((np.ones((20,)), (src, dst)), shape=(10, 10))
def scipy_csr_input():
src, dst = edge_pair_input()
csr = sp.coo_matrix((np.ones((20,)), (src, dst)), shape=(10, 10)).tocsr()
csr.sort_indices()
# src = [0 0 0 1 1 2 2 3 3 4 4 4 4 5 5 6 7 7 7 9]
# dst = [4 6 9 3 5 3 7 5 8 1 3 4 9 1 9 6 2 8 9 2]
return csr
def gen_by_mutation():
g = dgl.graph([])
src, dst = edge_pair_input()
g.add_nodes(10)
g.add_edges(src, dst)
return g
def test_query():
def _test_one(g):
assert g.num_nodes() == 10
assert g.num_edges() == 20
for i in range(10):
assert g.has_nodes(i)
assert not g.has_nodes(11)
assert F.allclose(g.has_nodes([0, 2, 10, 11]), F.tensor([1, 1, 0, 0]))
src, dst = edge_pair_input()
for u, v in zip(src, dst):
assert g.has_edges_between(u, v)
assert not g.has_edges_between(0, 0)
assert F.allclose(
g.has_edges_between([0, 0, 3], [0, 9, 8]), F.tensor([0, 1, 1])
)
assert set(F.asnumpy(g.predecessors(9))) == set([0, 5, 7, 4])
assert set(F.asnumpy(g.successors(2))) == set([7, 3])
assert g.edge_ids(4, 4) == 5
assert F.allclose(g.edge_ids([4, 0], [4, 9]), F.tensor([5, 0]))
src, dst = g.find_edges([3, 6, 5])
assert F.allclose(src, F.tensor([5, 7, 4]))
assert F.allclose(dst, F.tensor([9, 9, 4]))
src, dst, eid = g.in_edges(9, form="all")
tup = list(zip(F.asnumpy(src), F.asnumpy(dst), F.asnumpy(eid)))
assert set(tup) == set([(0, 9, 0), (5, 9, 3), (7, 9, 6), (4, 9, 7)])
src, dst, eid = g.in_edges(
[9, 0, 8], form="all"
) # test node#0 has no in edges
tup = list(zip(F.asnumpy(src), F.asnumpy(dst), F.asnumpy(eid)))
assert set(tup) == set(
[(0, 9, 0), (5, 9, 3), (7, 9, 6), (4, 9, 7), (3, 8, 9), (7, 8, 12)]
)
src, dst, eid = g.out_edges(0, form="all")
tup = list(zip(F.asnumpy(src), F.asnumpy(dst), F.asnumpy(eid)))
assert set(tup) == set([(0, 9, 0), (0, 6, 1), (0, 4, 4)])
src, dst, eid = g.out_edges(
[0, 4, 8], form="all"
) # test node#8 has no out edges
tup = list(zip(F.asnumpy(src), F.asnumpy(dst), F.asnumpy(eid)))
assert set(tup) == set(
[
(0, 9, 0),
(0, 6, 1),
(0, 4, 4),
(4, 3, 2),
(4, 4, 5),
(4, 9, 7),
(4, 1, 8),
]
)
src, dst, eid = g.edges("all", "eid")
t_src, t_dst = edge_pair_input()
t_tup = list(zip(t_src, t_dst, list(range(20))))
tup = list(zip(F.asnumpy(src), F.asnumpy(dst), F.asnumpy(eid)))
assert set(tup) == set(t_tup)
assert list(F.asnumpy(eid)) == list(range(20))
src, dst, eid = g.edges("all", "srcdst")
t_src, t_dst = edge_pair_input()
t_tup = list(zip(t_src, t_dst, list(range(20))))
tup = list(zip(F.asnumpy(src), F.asnumpy(dst), F.asnumpy(eid)))
assert set(tup) == set(t_tup)
assert list(F.asnumpy(src)) == sorted(list(F.asnumpy(src)))
assert g.in_degrees(0) == 0
assert g.in_degrees(9) == 4
assert F.allclose(g.in_degrees([0, 9]), F.tensor([0, 4]))
assert g.out_degrees(8) == 0
assert g.out_degrees(9) == 1
assert F.allclose(g.out_degrees([8, 9]), F.tensor([0, 1]))
assert np.array_equal(
F.sparse_to_numpy(g.adj_external(transpose=True)),
scipy_coo_input().toarray().T,
)
assert np.array_equal(
F.sparse_to_numpy(g.adj_external(transpose=False)),
scipy_coo_input().toarray(),
)
def _test(g):
# test twice to see whether the cached format works or not
_test_one(g)
_test_one(g)
def _test_csr_one(g):
assert g.num_nodes() == 10
assert g.num_edges() == 20
for i in range(10):
assert g.has_nodes(i)
assert not g.has_nodes(11)
assert F.allclose(g.has_nodes([0, 2, 10, 11]), F.tensor([1, 1, 0, 0]))
src, dst = edge_pair_input(sort=True)
for u, v in zip(src, dst):
assert g.has_edges_between(u, v)
assert not g.has_edges_between(0, 0)
assert F.allclose(
g.has_edges_between([0, 0, 3], [0, 9, 8]), F.tensor([0, 1, 1])
)
assert set(F.asnumpy(g.predecessors(9))) == set([0, 5, 7, 4])
assert set(F.asnumpy(g.successors(2))) == set([7, 3])
# src = [0 0 0 1 1 2 2 3 3 4 4 4 4 5 5 6 7 7 7 9]
# dst = [4 6 9 3 5 3 7 5 8 1 3 4 9 1 9 6 2 8 9 2]
# eid = [0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9]
assert g.edge_ids(4, 4) == 11
assert F.allclose(g.edge_ids([4, 0], [4, 9]), F.tensor([11, 2]))
src, dst = g.find_edges([3, 6, 5])
assert F.allclose(src, F.tensor([1, 2, 2]))
assert F.allclose(dst, F.tensor([3, 7, 3]))
src, dst, eid = g.in_edges(9, form="all")
tup = list(zip(F.asnumpy(src), F.asnumpy(dst), F.asnumpy(eid)))
assert set(tup) == set([(0, 9, 2), (5, 9, 14), (7, 9, 18), (4, 9, 12)])
src, dst, eid = g.in_edges(
[9, 0, 8], form="all"
) # test node#0 has no in edges
tup = list(zip(F.asnumpy(src), F.asnumpy(dst), F.asnumpy(eid)))
assert set(tup) == set(
[
(0, 9, 2),
(5, 9, 14),
(7, 9, 18),
(4, 9, 12),
(3, 8, 8),
(7, 8, 17),
]
)
src, dst, eid = g.out_edges(0, form="all")
tup = list(zip(F.asnumpy(src), F.asnumpy(dst), F.asnumpy(eid)))
assert set(tup) == set([(0, 9, 2), (0, 6, 1), (0, 4, 0)])
src, dst, eid = g.out_edges(
[0, 4, 8], form="all"
) # test node#8 has no out edges
tup = list(zip(F.asnumpy(src), F.asnumpy(dst), F.asnumpy(eid)))
assert set(tup) == set(
[
(0, 9, 2),
(0, 6, 1),
(0, 4, 0),
(4, 3, 10),
(4, 4, 11),
(4, 9, 12),
(4, 1, 9),
]
)
src, dst, eid = g.edges("all", "eid")
t_src, t_dst = edge_pair_input(sort=True)
t_tup = list(zip(t_src, t_dst, list(range(20))))
tup = list(zip(F.asnumpy(src), F.asnumpy(dst), F.asnumpy(eid)))
assert set(tup) == set(t_tup)
assert list(F.asnumpy(eid)) == list(range(20))
src, dst, eid = g.edges("all", "srcdst")
t_src, t_dst = edge_pair_input(sort=True)
t_tup = list(zip(t_src, t_dst, list(range(20))))
tup = list(zip(F.asnumpy(src), F.asnumpy(dst), F.asnumpy(eid)))
assert set(tup) == set(t_tup)
assert list(F.asnumpy(src)) == sorted(list(F.asnumpy(src)))
assert g.in_degrees(0) == 0
assert g.in_degrees(9) == 4
assert F.allclose(g.in_degrees([0, 9]), F.tensor([0, 4]))
assert g.out_degrees(8) == 0
assert g.out_degrees(9) == 1
assert F.allclose(g.out_degrees([8, 9]), F.tensor([0, 1]))
assert np.array_equal(
F.sparse_to_numpy(g.adj_external(transpose=True)),
scipy_coo_input().toarray().T,
)
assert np.array_equal(
F.sparse_to_numpy(g.adj_external(transpose=False)),
scipy_coo_input().toarray(),
)
def _test_csr(g):
# test twice to see whether the cached format works or not
_test_csr_one(g)
_test_csr_one(g)
def _test_edge_ids():
g = gen_by_mutation()
eids = g.edge_ids([4, 0], [4, 9])
assert eids.shape[0] == 2
eid = g.edge_ids(4, 4)
assert isinstance(eid, numbers.Number)
with pytest.raises(DGLError):
eids = g.edge_ids([9, 0], [4, 9])
with pytest.raises(DGLError):
eid = g.edge_ids(4, 5)
g.add_edges(0, 4)
eids = g.edge_ids([0, 0], [4, 9])
eid = g.edge_ids(0, 4)
_test(gen_by_mutation())
_test(dgl.graph(elist_input()))
_test(dgl.from_scipy(scipy_coo_input()))
_test_csr(dgl.from_scipy(scipy_csr_input()))
_test_edge_ids()
def test_mutation():
g = dgl.graph([])
g = g.to(F.ctx())
# test add nodes with data
g.add_nodes(5)
g.add_nodes(5, {"h": F.ones((5, 2))})
ans = F.cat([F.zeros((5, 2)), F.ones((5, 2))], 0)
assert F.allclose(ans, g.ndata["h"])
g.ndata["w"] = 2 * F.ones((10, 2))
assert F.allclose(2 * F.ones((10, 2)), g.ndata["w"])
# test add edges with data
g.add_edges([2, 3], [3, 4])
g.add_edges([0, 1], [1, 2], {"m": F.ones((2, 2))})
ans = F.cat([F.zeros((2, 2)), F.ones((2, 2))], 0)
assert F.allclose(ans, g.edata["m"])
def test_scipy_adjmat():
g = dgl.graph([])
g.add_nodes(10)
g.add_edges(range(9), range(1, 10))
adj_0 = g.adj_external(scipy_fmt="csr")
adj_1 = g.adj_external(scipy_fmt="coo")
assert np.array_equal(adj_0.toarray(), adj_1.toarray())
adj_t0 = g.adj_external(transpose=False, scipy_fmt="csr")
adj_t_1 = g.adj_external(transpose=False, scipy_fmt="coo")
assert np.array_equal(adj_0.toarray(), adj_1.toarray())
def test_incmat():
g = dgl.graph([])
g.add_nodes(4)
g.add_edges(0, 1) # 0
g.add_edges(0, 2) # 1
g.add_edges(0, 3) # 2
g.add_edges(2, 3) # 3
g.add_edges(1, 1) # 4
inc_in = F.sparse_to_numpy(g.incidence_matrix("in"))
inc_out = F.sparse_to_numpy(g.incidence_matrix("out"))
inc_both = F.sparse_to_numpy(g.incidence_matrix("both"))
print(inc_in)
print(inc_out)
print(inc_both)
assert np.allclose(
inc_in,
np.array(
[
[0.0, 0.0, 0.0, 0.0, 0.0],
[1.0, 0.0, 0.0, 0.0, 1.0],
[0.0, 1.0, 0.0, 0.0, 0.0],
[0.0, 0.0, 1.0, 1.0, 0.0],
]
),
)
assert np.allclose(
inc_out,
np.array(
[
[1.0, 1.0, 1.0, 0.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 1.0],
[0.0, 0.0, 0.0, 1.0, 0.0],
[0.0, 0.0, 0.0, 0.0, 0.0],
]
),
)
assert np.allclose(
inc_both,
np.array(
[
[-1.0, -1.0, -1.0, 0.0, 0.0],
[1.0, 0.0, 0.0, 0.0, 0.0],
[0.0, 1.0, 0.0, -1.0, 0.0],
[0.0, 0.0, 1.0, 1.0, 0.0],
]
),
)
def test_find_edges():
g = dgl.graph([])
g.add_nodes(10)
g.add_edges(range(9), range(1, 10))
e = g.find_edges([1, 3, 2, 4])
assert (
F.asnumpy(e[0][0]) == 1
and F.asnumpy(e[0][1]) == 3
and F.asnumpy(e[0][2]) == 2
and F.asnumpy(e[0][3]) == 4
)
assert (
F.asnumpy(e[1][0]) == 2
and F.asnumpy(e[1][1]) == 4
and F.asnumpy(e[1][2]) == 3
and F.asnumpy(e[1][3]) == 5
)
try:
g.find_edges([10])
fail = False
except DGLError:
fail = True
finally:
assert fail
def test_ismultigraph():
g = dgl.graph([])
g.add_nodes(10)
assert g.is_multigraph == False
g.add_edges([0], [0])
assert g.is_multigraph == False
g.add_edges([1], [2])
assert g.is_multigraph == False
g.add_edges([0, 2], [0, 3])
assert g.is_multigraph == True
def test_hypersparse_query():
g = dgl.graph([])
g = g.to(F.ctx())
g.add_nodes(1000001)
g.add_edges([0], [1])
for i in range(10):
assert g.has_nodes(i)
assert not g.has_nodes(1000002)
assert g.edge_ids(0, 1) == 0
src, dst = g.find_edges([0])
src, dst, eid = g.in_edges(1, form="all")
src, dst, eid = g.out_edges(0, form="all")
src, dst = g.edges()
assert g.in_degrees(0) == 0
assert g.in_degrees(1) == 1
assert g.out_degrees(0) == 1
assert g.out_degrees(1) == 0
def test_empty_data_initialized():
g = dgl.graph([])
g = g.to(F.ctx())
g.ndata["ha"] = F.tensor([])
g.add_nodes(1, {"hb": F.tensor([1])})
assert "ha" in g.ndata
assert len(g.ndata["ha"]) == 1
def test_is_sorted():
u_src, u_dst = edge_pair_input(False)
s_src, s_dst = edge_pair_input(True)
u_src = F.tensor(u_src, dtype=F.int32)
u_dst = F.tensor(u_dst, dtype=F.int32)
s_src = F.tensor(s_src, dtype=F.int32)
s_dst = F.tensor(s_dst, dtype=F.int32)
src_sorted, dst_sorted = dgl.utils.is_sorted_srcdst(u_src, u_dst)
assert src_sorted == False
assert dst_sorted == False
src_sorted, dst_sorted = dgl.utils.is_sorted_srcdst(s_src, s_dst)
assert src_sorted == True
assert dst_sorted == True
src_sorted, dst_sorted = dgl.utils.is_sorted_srcdst(u_src, u_dst)
assert src_sorted == False
assert dst_sorted == False
src_sorted, dst_sorted = dgl.utils.is_sorted_srcdst(s_src, u_dst)
assert src_sorted == True
assert dst_sorted == False
def test_default_types():
dg = dgl.graph([])
g = dgl.graph(([], []))
assert dg.ntypes == g.ntypes
assert dg.etypes == g.etypes
def test_formats():
g = dgl.rand_graph(10, 20)
# in_degrees works if coo or csc available
# out_degrees works if coo or csr available
try:
g.in_degrees()
g.out_degrees()
g.formats("coo").in_degrees()
g.formats("coo").out_degrees()
g.formats("csc").in_degrees()
g.formats("csr").out_degrees()
fail = False
except DGLError:
fail = True
finally:
assert not fail
# in_degrees NOT works if csc available only
try:
g.formats("csc").out_degrees()
fail = True
except DGLError:
fail = False
finally:
assert not fail
# out_degrees NOT works if csr available only
try:
g.formats("csr").in_degrees()
fail = True
except DGLError:
fail = False
finally:
assert not fail
# If the intersection of created formats and allowed formats is
# not empty, then retain the intersection.
# Case1: intersection is not empty and intersected is equal to
# created formats.
g = g.formats(["coo", "csr"])
g.create_formats_()
g = g.formats(["coo", "csr", "csc"])
assert sorted(g.formats()["created"]) == sorted(["coo", "csr"])
assert sorted(g.formats()["not created"]) == sorted(["csc"])
# Case2: intersection is not empty and intersected is not equal
# to created formats.
g = g.formats(["coo", "csr"])
g.create_formats_()
g = g.formats(["coo", "csc"])
assert sorted(g.formats()["created"]) == sorted(["coo"])
assert sorted(g.formats()["not created"]) == sorted(["csc"])
# If the intersection of created formats and allowed formats is
# empty, then create a format in the order of `coo` -> `csr` ->
# `csc`.
# Case1: intersection is empty and just one format is allowed.
g = g.formats(["coo", "csr"])
g.create_formats_()
g = g.formats(["csc"])
assert sorted(g.formats()["created"]) == sorted(["csc"])
assert sorted(g.formats()["not created"]) == sorted([])
# Case2: intersection is empty and more than one format is allowed.
g = g.formats("csc")
g.create_formats_()
g = g.formats(["csr", "coo"])
assert sorted(g.formats()["created"]) == sorted(["coo"])
assert sorted(g.formats()["not created"]) == sorted(["csr"])
if __name__ == "__main__":
test_query()
test_mutation()
test_scipy_adjmat()
test_incmat()
test_find_edges()
test_hypersparse_query()
test_is_sorted()
test_default_types()
test_formats()