[DistGB] modify partition.py to save FusedCSCSamplingGraph directly (#7727)

Co-authored-by: Ubuntu <ubuntu@ip-172-31-8-126.us-west-2.compute.internal>
Co-authored-by: Ubuntu <ubuntu@ip-172-31-52-174.us-west-2.compute.internal>
Co-authored-by: Rhett Ying <85214957+Rhett-Ying@users.noreply.github.com>
This commit is contained in:
Wenxuan Cao
2024-09-12 16:55:33 +08:00
committed by GitHub
parent 53e70c57fb
commit 165e2507e7

View File

@@ -88,24 +88,26 @@ def _dump_part_config(part_config, part_metadata):
json.dump(part_metadata, outfile, sort_keys=False, indent=4)
def _save_graphs(filename, g_list, formats=None, sort_etypes=False):
def process_partitions(g, formats=None, sort_etypes=False):
"""Preprocess partitions before saving:
1. format data types.
2. sort csc/csr by tag.
"""
for g in g_list:
for k, dtype in RESERVED_FIELD_DTYPE.items():
if k in g.ndata:
g.ndata[k] = F.astype(g.ndata[k], dtype)
if k in g.edata:
g.edata[k] = F.astype(g.edata[k], dtype)
for g in g_list:
if (not sort_etypes) or (formats is None):
continue
for k, dtype in RESERVED_FIELD_DTYPE.items():
if k in g.ndata:
g.ndata[k] = F.astype(g.ndata[k], dtype)
if k in g.edata:
g.edata[k] = F.astype(g.edata[k], dtype)
if (sort_etypes) and (formats is not None):
if "csr" in formats:
g = sort_csr_by_tag(g, tag=g.edata[ETYPE], tag_type="edge")
if "csc" in formats:
g = sort_csc_by_tag(g, tag=g.edata[ETYPE], tag_type="edge")
return g
def _save_dgl_graphs(filename, g_list, formats=None):
save_graphs(filename, g_list, formats=formats)
@@ -332,9 +334,10 @@ def load_partition(part_config, part_id, load_feats=True, use_graphbolt=False):
"part-{}".format(part_id) in part_metadata
), "part-{} does not exist".format(part_id)
part_files = part_metadata["part-{}".format(part_id)]
part_graph_field = "part_graph"
if use_graphbolt:
part_graph_field = "part_graph_graphbolt"
else:
part_graph_field = "part_graph"
assert (
part_graph_field in part_files
), f"the partition does not contain graph structure: {part_graph_field}"
@@ -461,7 +464,7 @@ def load_partition_feats(
return node_feats, edge_feats
def load_partition_book(part_config, part_id):
def load_partition_book(part_config, part_id, part_metadata=None):
"""Load a graph partition book from the partition config file.
Parameters
@@ -470,6 +473,8 @@ def load_partition_book(part_config, part_id):
The path of the partition config file.
part_id : int
The partition ID.
part_metadata : dict
The meta data of partition.
Returns
-------
@@ -482,7 +487,8 @@ def load_partition_book(part_config, part_id):
dict
The edge types
"""
part_metadata = _load_part_config(part_config)
if part_metadata is None:
part_metadata = _load_part_config(part_config)
assert "num_parts" in part_metadata, "num_parts does not exist."
assert (
part_metadata["num_parts"] > part_id
@@ -666,6 +672,38 @@ def _set_trainer_ids(g, sim_g, node_parts):
g.edges[c_etype].data["trainer_id"] = trainer_id
def _partition_to_graphbolt(
parts,
part_i,
part_config,
part_metadata,
*,
store_eids=True,
store_inner_node=False,
store_inner_edge=False,
graph_formats=None,
):
gpb, _, ntypes, etypes = load_partition_book(
part_config=part_config, part_id=part_i, part_metadata=part_metadata
)
graph = parts[part_i]
csc_graph = gb_convert_single_dgl_partition(
ntypes=ntypes,
etypes=etypes,
gpb=gpb,
part_meta=part_metadata,
graph=graph,
store_eids=store_eids,
store_inner_edge=store_inner_edge,
store_inner_node=store_inner_node,
graph_formats=graph_formats,
)
rel_path_result = _save_graph_gb(
part_config=part_config, part_id=part_i, csc_graph=csc_graph
)
part_metadata[f"part-{part_i}"]["part_graph_graphbolt"] = rel_path_result
def _update_node_edge_map(node_map_val, edge_map_val, g, num_parts):
"""
If the original graph contains few nodes or edges for specific node/edge
@@ -1303,6 +1341,7 @@ def partition_graph(
"ntypes": ntypes,
"etypes": etypes,
}
part_config = os.path.join(out_path, graph_name + ".json")
for part_id in range(num_parts):
part = parts[part_id]
@@ -1425,30 +1464,54 @@ def partition_graph(
part_dir = os.path.join(out_path, "part" + str(part_id))
node_feat_file = os.path.join(part_dir, "node_feat.dgl")
edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
part_graph_file = os.path.join(part_dir, "graph.dgl")
part_metadata["part-{}".format(part_id)] = {
"node_feats": os.path.relpath(node_feat_file, out_path),
"edge_feats": os.path.relpath(edge_feat_file, out_path),
"part_graph": os.path.relpath(part_graph_file, out_path),
}
os.makedirs(part_dir, mode=0o775, exist_ok=True)
save_tensors(node_feat_file, node_feats)
save_tensors(edge_feat_file, edge_feats)
part_metadata["part-{}".format(part_id)] = {
"node_feats": os.path.relpath(node_feat_file, out_path),
"edge_feats": os.path.relpath(edge_feat_file, out_path),
}
sort_etypes = len(g.etypes) > 1
_save_graphs(
part_graph_file,
[part],
formats=graph_formats,
sort_etypes=sort_etypes,
)
print(
"Save partitions: {:.3f} seconds, peak memory: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
part = process_partitions(part, graph_formats, sort_etypes)
# transmit to graphbolt and save graph
if use_graphbolt:
# save FusedCSCSamplingGraph
kwargs["graph_formats"] = graph_formats
n_jobs = kwargs.pop("n_jobs", 1)
mp_ctx = mp.get_context("spawn")
with concurrent.futures.ProcessPoolExecutor( # pylint: disable=unexpected-keyword-arg
max_workers=min(num_parts, n_jobs),
mp_context=mp_ctx,
) as executor:
for part_id in range(num_parts):
executor.submit(
_partition_to_graphbolt(
part_i=part_id,
part_config=part_config,
part_metadata=part_metadata,
parts=parts,
**kwargs,
)
)
part_metadata["node_map_dtype"] = "int64"
part_metadata["edge_map_dtype"] = "int64"
else:
for part_id, part in parts.items():
part_dir = os.path.join(out_path, "part" + str(part_id))
part_graph_file = os.path.join(part_dir, "graph.dgl")
part_metadata["part-{}".format(part_id)][
"part_graph"
] = os.path.relpath(part_graph_file, out_path)
# save DGLGraph
_save_dgl_graphs(
part_graph_file,
[part],
formats=graph_formats,
)
part_config = os.path.join(out_path, graph_name + ".json")
_dump_part_config(part_config, part_metadata)
num_cuts = sim_g.num_edges() - tot_num_inner_edges
@@ -1460,12 +1523,11 @@ def partition_graph(
)
)
if use_graphbolt:
kwargs["graph_formats"] = graph_formats
dgl_partition_to_graphbolt(
part_config,
**kwargs,
print(
"Save partitions: {:.3f} seconds, peak memory: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
if return_mapping:
return orig_nids, orig_eids
@@ -1513,62 +1575,81 @@ def init_type_per_edge(graph, gpb):
return etype_ids
def gb_convert_single_dgl_partition(
part_id,
graph_formats,
part_config,
store_eids,
def _load_part(part_config, part_id, parts=None):
"""load parts from variable or dist."""
if parts is None:
graph, _, _, _, _, _, _ = load_partition(
part_config, part_id, load_feats=False
)
else:
graph = parts[part_id]
return graph
def _save_graph_gb(part_config, part_id, csc_graph):
csc_graph_save_dir = os.path.join(
os.path.dirname(part_config),
f"part{part_id}",
)
csc_graph_path = os.path.join(
csc_graph_save_dir, "fused_csc_sampling_graph.pt"
)
torch.save(csc_graph, csc_graph_path)
return os.path.relpath(csc_graph_path, os.path.dirname(part_config))
def cast_various_to_minimum_dtype_gb(
graph,
part_meta,
num_parts,
indptr,
indices,
type_per_edge,
etypes,
ntypes,
node_attributes,
edge_attributes,
):
"""Cast various data to minimum dtype."""
# Cast 1: indptr.
indptr = _cast_to_minimum_dtype(graph.num_edges(), indptr)
# Cast 2: indices.
indices = _cast_to_minimum_dtype(graph.num_nodes(), indices)
# Cast 3: type_per_edge.
type_per_edge = _cast_to_minimum_dtype(
len(etypes), type_per_edge, field=ETYPE
)
# Cast 4: node/edge_attributes.
predicates = {
NID: part_meta["num_nodes"],
"part_id": num_parts,
NTYPE: len(ntypes),
EID: part_meta["num_edges"],
ETYPE: len(etypes),
DGL2GB_EID: part_meta["num_edges"],
GB_DST_ID: part_meta["num_nodes"],
}
for attributes in [node_attributes, edge_attributes]:
for key in attributes:
if key not in predicates:
continue
attributes[key] = _cast_to_minimum_dtype(
predicates[key], attributes[key], field=key
)
return indptr, indices, type_per_edge
def _create_attributes_gb(
graph,
gpb,
edge_ids,
is_homo,
store_inner_node,
store_inner_edge,
store_eids,
debug_mode,
):
"""Converts a single DGL partition to GraphBolt.
Parameters
----------
part_id : int
The numerical ID of the partition to convert.
graph_formats : str or list[str], optional
Save partitions in specified formats. It could be any combination of
`coo`, `csc`. As `csc` format is mandatory for `FusedCSCSamplingGraph`,
it is not necessary to specify this argument. It's mainly for
specifying `coo` format to save edge ID mapping and destination node
IDs. If not specified, whether to save `coo` format is determined by
the availability of the format in DGL partitions. Default: None.
store_eids : bool, optional
Whether to store edge IDs in the new graph. Default: True.
store_inner_node : bool, optional
Whether to store inner node mask in the new graph. Default: False.
store_inner_edge : bool, optional
Whether to store inner edge mask in the new graph. Default: False.
"""
debug_mode = "DGL_DIST_DEBUG" in os.environ
if debug_mode:
dgl_warning(
"Running in debug mode which means all attributes of DGL partitions"
" will be saved to the new format."
)
part_meta = _load_part_config(part_config)
num_parts = part_meta["num_parts"]
graph, _, _, gpb, _, _, _ = load_partition(
part_config, part_id, load_feats=False
)
_, _, ntypes, etypes = load_partition_book(part_config, part_id)
is_homo = is_homogeneous(ntypes, etypes)
node_type_to_id = (
None if is_homo else {ntype: ntid for ntid, ntype in enumerate(ntypes)}
)
edge_type_to_id = (
None
if is_homo
else {
gb.etype_tuple_to_str(etype): etid for etype, etid in etypes.items()
}
)
# Obtain CSC indtpr and indices.
indptr, indices, edge_ids = graph.adj_tensors("csc")
# Save node attributes. Detailed attributes are shown below.
# DGL_GB\Attributes dgl.NID("_ID") dgl.NTYPE("_TYPE") "inner_node" "part_id"
# DGL_Homograph ✅ 🚫 ✅ ✅
@@ -1602,6 +1683,80 @@ def gb_convert_single_dgl_partition(
edge_attributes = {
attr: graph.edata[attr][edge_ids] for attr in required_edge_attrs
}
return node_attributes, edge_attributes, type_per_edge
def gb_convert_single_dgl_partition(
ntypes,
etypes,
gpb,
part_meta,
graph,
graph_formats=None,
store_eids=False,
store_inner_node=False,
store_inner_edge=False,
):
"""Converts a single DGL partition to GraphBolt.
Parameters
----------
node types : dict
The node types
edge types : dict
The edge types
gpb : GraphPartitionBook
The global partition information.
part_meta : dict
Contain the meta data of the partition.
graph : DGLGraph
The graph to be converted to graphbolt graph.
graph_formats : str or list[str], optional
Save partitions in specified formats. It could be any combination of
`coo`, `csc`. As `csc` format is mandatory for `FusedCSCSamplingGraph`,
it is not necessary to specify this argument. It's mainly for
specifying `coo` format to save edge ID mapping and destination node
IDs. If not specified, whether to save `coo` format is determined by
the availability of the format in DGL partitions. Default: None.
store_eids : bool, optional
Whether to store edge IDs in the new graph. Default: True.
store_inner_node : bool, optional
Whether to store inner node mask in the new graph. Default: False.
store_inner_edge : bool, optional
Whether to store inner edge mask in the new graph. Default: False.
"""
debug_mode = "DGL_DIST_DEBUG" in os.environ
if debug_mode:
dgl_warning(
"Running in debug mode which means all attributes of DGL partitions"
" will be saved to the new format."
)
num_parts = part_meta["num_parts"]
is_homo = is_homogeneous(ntypes, etypes)
node_type_to_id = (
None if is_homo else {ntype: ntid for ntid, ntype in enumerate(ntypes)}
)
edge_type_to_id = (
None
if is_homo
else {
gb.etype_tuple_to_str(etype): etid for etype, etid in etypes.items()
}
)
# Obtain CSC indtpr and indices.
indptr, indices, edge_ids = graph.adj_tensors("csc")
node_attributes, edge_attributes, type_per_edge = _create_attributes_gb(
graph,
gpb,
edge_ids,
is_homo,
store_inner_node,
store_inner_edge,
store_eids,
debug_mode,
)
# When converting DGLGraph to FusedCSCSamplingGraph, edge IDs are
# re-ordered(actually FusedCSCSamplingGraph does not have edge IDs
# in nature). So we need to save such re-order info for any
@@ -1623,32 +1778,18 @@ def gb_convert_single_dgl_partition(
indptr, dtype=indices.dtype
)
# Cast various data to minimum dtype.
# Cast 1: indptr.
indptr = _cast_to_minimum_dtype(graph.num_edges(), indptr)
# Cast 2: indices.
indices = _cast_to_minimum_dtype(graph.num_nodes(), indices)
# Cast 3: type_per_edge.
type_per_edge = _cast_to_minimum_dtype(
len(etypes), type_per_edge, field=ETYPE
indptr, indices, type_per_edge = cast_various_to_minimum_dtype_gb(
graph,
part_meta,
num_parts,
indptr,
indices,
type_per_edge,
etypes,
ntypes,
node_attributes,
edge_attributes,
)
# Cast 4: node/edge_attributes.
predicates = {
NID: part_meta["num_nodes"],
"part_id": num_parts,
NTYPE: len(ntypes),
EID: part_meta["num_edges"],
ETYPE: len(etypes),
DGL2GB_EID: part_meta["num_edges"],
GB_DST_ID: part_meta["num_nodes"],
}
for attributes in [node_attributes, edge_attributes]:
for key in attributes:
if key not in predicates:
continue
attributes[key] = _cast_to_minimum_dtype(
predicates[key], attributes[key], field=key
)
csc_graph = gb.fused_csc_sampling_graph(
indptr,
@@ -1660,17 +1801,128 @@ def gb_convert_single_dgl_partition(
node_type_to_id=node_type_to_id,
edge_type_to_id=edge_type_to_id,
)
orig_graph_path = os.path.join(
os.path.dirname(part_config),
part_meta[f"part-{part_id}"]["part_graph"],
)
csc_graph_path = os.path.join(
os.path.dirname(orig_graph_path), "fused_csc_sampling_graph.pt"
)
torch.save(csc_graph, csc_graph_path)
return csc_graph
return os.path.relpath(csc_graph_path, os.path.dirname(part_config))
# Update graph path.
def _convert_partition_to_graphbolt(
part_config,
part_id,
graph_formats=None,
store_eids=False,
store_inner_node=False,
store_inner_edge=False,
):
"""
The pipeline converting signle partition to graphbolt.
Parameters
----------
part_config : str
The path of the partition config file.
part_id : int
The partition ID.
graph_formats : str or list[str], optional
Save partitions in specified formats. It could be any combination of
`coo`, `csc`. As `csc` format is mandatory for `FusedCSCSamplingGraph`,
it is not necessary to specify this argument. It's mainly for
specifying `coo` format to save edge ID mapping and destination node
IDs. If not specified, whether to save `coo` format is determined by
the availability of the format in DGL partitions. Default: None.
store_eids : bool, optional
Whether to store edge IDs in the new graph. Default: True.
store_inner_node : bool, optional
Whether to store inner node mask in the new graph. Default: False.
store_inner_edge : bool, optional
Whether to store inner edge mask in the new graph. Default: False.
Returns
-------
str
The path csc_graph to save.
"""
gpb, _, ntypes, etypes = load_partition_book(
part_config=part_config, part_id=part_id
)
part = _load_part(part_config, part_id)
part_meta = copy.deepcopy(_load_part_config(part_config))
csc_graph = gb_convert_single_dgl_partition(
graph=part,
ntypes=ntypes,
etypes=etypes,
gpb=gpb,
part_meta=part_meta,
graph_formats=graph_formats,
store_eids=store_eids,
store_inner_node=store_inner_node,
store_inner_edge=store_inner_edge,
)
rel_path = _save_graph_gb(part_config, part_id, csc_graph)
return rel_path
def _convert_partition_to_graphbolt_wrapper(
graph_formats,
part_config,
store_eids,
store_inner_node,
store_inner_edge,
n_jobs,
num_parts,
):
# [Rui] DGL partitions are always saved as homogeneous graphs even though
# the original graph is heterogeneous. But heterogeneous information like
# node/edge types are saved as node/edge data alongside with partitions.
# What needs more attention is that due to the existence of HALO nodes in
# each partition, the local node IDs are not sorted according to the node
# types. So we fail to assign ``node_type_offset`` as required by GraphBolt.
# But this is not a problem since such information is not used in sampling.
# We can simply pass None to it.
# Iterate over partitions.
convert_with_format = partial(
_convert_partition_to_graphbolt,
part_config=part_config,
graph_formats=graph_formats,
store_eids=store_eids,
store_inner_node=store_inner_node,
store_inner_edge=store_inner_edge,
)
# Need to create entirely new interpreters, because we call C++ downstream
# See https://docs.python.org/3.12/library/multiprocessing.html#contexts-and-start-methods
# and https://pybind11.readthedocs.io/en/stable/advanced/misc.html#global-interpreter-lock-gil
rel_path_results = []
if n_jobs > 1 and num_parts > 1:
mp_ctx = mp.get_context("spawn")
with concurrent.futures.ProcessPoolExecutor( # pylint: disable=unexpected-keyword-arg
max_workers=min(num_parts, n_jobs),
mp_context=mp_ctx,
) as executor:
for part_id in range(num_parts):
rel_path_results.append(
executor.submit(
convert_with_format, part_id=part_id
).result()
)
else:
# If running single-threaded, avoid spawning new interpreter, which is slow
for part_id in range(num_parts):
rel_path = convert_with_format(part_id=part_id)
rel_path_results.append(rel_path)
part_meta = _load_part_config(part_config)
for part_id in range(num_parts):
# Update graph path.
part_meta[f"part-{part_id}"]["part_graph_graphbolt"] = rel_path_results[
part_id
]
# Save dtype info into partition config.
# [TODO][Rui] Always use int64_t for node/edge IDs in GraphBolt. See more
# details in #7175.
part_meta["node_map_dtype"] = "int64"
part_meta["edge_map_dtype"] = "int64"
return part_meta
def dgl_partition_to_graphbolt(
@@ -1719,59 +1971,14 @@ def dgl_partition_to_graphbolt(
" will be saved to the new format."
)
part_meta = _load_part_config(part_config)
new_part_meta = copy.deepcopy(part_meta)
num_parts = part_meta["num_parts"]
# [Rui] DGL partitions are always saved as homogeneous graphs even though
# the original graph is heterogeneous. But heterogeneous information like
# node/edge types are saved as node/edge data alongside with partitions.
# What needs more attention is that due to the existence of HALO nodes in
# each partition, the local node IDs are not sorted according to the node
# types. So we fail to assign ``node_type_offset`` as required by GraphBolt.
# But this is not a problem since such information is not used in sampling.
# We can simply pass None to it.
# Iterate over partitions.
convert_with_format = partial(
gb_convert_single_dgl_partition,
part_meta = _convert_partition_to_graphbolt_wrapper(
graph_formats=graph_formats,
part_config=part_config,
store_eids=store_eids,
store_inner_node=store_inner_node,
store_inner_edge=store_inner_edge,
n_jobs=n_jobs,
num_parts=num_parts,
)
# Need to create entirely new interpreters, because we call C++ downstream
# See https://docs.python.org/3.12/library/multiprocessing.html#contexts-and-start-methods
# and https://pybind11.readthedocs.io/en/stable/advanced/misc.html#global-interpreter-lock-gil
rel_path_results = []
if n_jobs > 1 and num_parts > 1:
mp_ctx = mp.get_context("spawn")
with concurrent.futures.ProcessPoolExecutor( # pylint: disable=unexpected-keyword-arg
max_workers=min(num_parts, n_jobs),
mp_context=mp_ctx,
) as executor:
futures = []
for part_id in range(num_parts):
futures.append(executor.submit(convert_with_format, part_id))
for part_id in range(num_parts):
rel_path_results.append(futures[part_id].result())
else:
# If running single-threaded, avoid spawning new interpreter, which is slow
for part_id in range(num_parts):
rel_path_results.append(convert_with_format(part_id))
for part_id in range(num_parts):
# Update graph path.
new_part_meta[f"part-{part_id}"][
"part_graph_graphbolt"
] = rel_path_results[part_id]
# Save dtype info into partition config.
# [TODO][Rui] Always use int64_t for node/edge IDs in GraphBolt. See more
# details in #7175.
new_part_meta["node_map_dtype"] = "int64"
new_part_meta["edge_map_dtype"] = "int64"
_dump_part_config(part_config, new_part_meta)
print(f"Converted partitions to GraphBolt format into {part_config}")
_dump_part_config(part_config, part_meta)